vortex_protocol/
lib.rs

1/*
2 *     Copyright 2025 The Dragonfly Authors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17use crate::error::{Error, Result};
18use bytes::{BufMut, Bytes, BytesMut};
19use rand::prelude::*;
20
21pub mod error;
22pub mod tlv;
23
24/// HEADER_SIZE is the size of the Vortex packet header including the packet identifier, tag, and
25/// length.
26pub const HEADER_SIZE: usize = 6;
27
28/// MAX_VALUE_SIZE is the maximum size of the value field (4 GiB).
29const MAX_VALUE_SIZE: usize = 4 * 1024 * 1024 * 1024;
30
31/// Header represents the Vortex packet header.
32#[derive(Debug, Clone)]
33pub struct Header {
34    id: u8,
35    tag: tlv::Tag,
36    length: u32,
37}
38
39/// Header implements the Header functions.
40impl Header {
41    /// new creates a new Vortex packet header.
42    pub fn new(tag: tlv::Tag, value_length: u32) -> Self {
43        let mut rng = thread_rng();
44        Self {
45            id: rng.gen(),
46            tag,
47            length: value_length,
48        }
49    }
50
51    /// new_download_piece creates a new Vortex packet header for download piece request.
52    pub fn new_download_piece() -> Self {
53        let mut rng = thread_rng();
54        Self {
55            id: rng.gen(),
56            tag: tlv::Tag::DownloadPiece,
57            length: (tlv::download_piece::TASK_ID_SIZE + tlv::download_piece::PIECE_NUMBER_SIZE)
58                as u32,
59        }
60    }
61
62    /// new_download_cache_piece creates a new Vortex packet header for download cache piece request.
63    pub fn new_download_cache_piece() -> Self {
64        let mut rng = thread_rng();
65        Self {
66            id: rng.gen(),
67            tag: tlv::Tag::DownloadCachePiece,
68            length: (tlv::download_cache_piece::TASK_ID_SIZE
69                + tlv::download_cache_piece::PIECE_NUMBER_SIZE) as u32,
70        }
71    }
72
73    /// new_download_persistent_cache_piece creates a new Vortex packet header for download persistent cache piece request.
74    pub fn new_download_persistent_cache_piece() -> Self {
75        let mut rng = thread_rng();
76        Self {
77            id: rng.gen(),
78            tag: tlv::Tag::DownloadPersistentCachePiece,
79            length: (tlv::download_persistent_cache_piece::TASK_ID_SIZE
80                + tlv::download_persistent_cache_piece::PIECE_NUMBER_SIZE)
81                as u32,
82        }
83    }
84
85    /// new_close creates a new Vortex packet header for close message.
86    pub fn new_close() -> Self {
87        let mut rng = thread_rng();
88        Self {
89            id: rng.gen(),
90            tag: tlv::Tag::Close,
91            length: 0,
92        }
93    }
94
95    /// new_piece_content creates a new Vortex packet header for piece content.
96    pub fn new_piece_content(value_length: u32) -> Self {
97        let mut rng = thread_rng();
98        Self {
99            id: rng.gen(),
100            tag: tlv::Tag::PieceContent,
101            length: value_length,
102        }
103    }
104
105    /// new_cache_piece_content creates a new Vortex packet header for cache piece content.
106    pub fn new_cache_piece_content(value_length: u32) -> Self {
107        let mut rng = thread_rng();
108        Self {
109            id: rng.gen(),
110            tag: tlv::Tag::CachePieceContent,
111            length: value_length,
112        }
113    }
114
115    /// new_persistent_cache_piece_content creates a new Vortex packet header for persistent cache piece content.
116    pub fn new_persistent_cache_piece_content(value_length: u32) -> Self {
117        let mut rng = thread_rng();
118        Self {
119            id: rng.gen(),
120            tag: tlv::Tag::PersistentCachePieceContent,
121            length: value_length,
122        }
123    }
124
125    /// new_error creates a new Vortex packet header for error.
126    pub fn new_error(value_length: u32) -> Self {
127        let mut rng = thread_rng();
128        Self {
129            id: rng.gen(),
130            tag: tlv::Tag::Error,
131            length: value_length,
132        }
133    }
134
135    /// id returns the packet identifier.
136    pub fn id(&self) -> u8 {
137        self.id
138    }
139
140    /// tag returns the tag.
141    pub fn tag(&self) -> tlv::Tag {
142        self.tag
143    }
144
145    /// length returns the length of the value field.
146    pub fn length(&self) -> u32 {
147        self.length
148    }
149}
150
151/// Implement TryFrom<Bytes> for Header.
152impl TryFrom<Bytes> for Header {
153    type Error = Error;
154
155    /// try_from converts a Bytes into a Header.
156    fn try_from(bytes: Bytes) -> Result<Self> {
157        if bytes.len() != HEADER_SIZE {
158            return Err(Error::InvalidPacket(format!(
159                "expected min {HEADER_SIZE} bytes, got {}",
160                bytes.len()
161            )));
162        }
163
164        let id = bytes
165            .first()
166            .ok_or(Error::InvalidPacket(
167                "insufficient bytes for id".to_string(),
168            ))?
169            .to_owned();
170
171        let tag = bytes
172            .get(1)
173            .ok_or(Error::InvalidPacket(
174                "insufficient bytes for tag".to_string(),
175            ))?
176            .to_owned()
177            .into();
178
179        let length = u32::from_be_bytes(
180            bytes
181                .get(2..HEADER_SIZE)
182                .ok_or(Error::InvalidPacket(
183                    "insufficient bytes for length".to_string(),
184                ))?
185                .try_into()?,
186        );
187        Ok(Header { id, tag, length })
188    }
189}
190
191/// Implement From<Header> for Bytes.
192impl From<Header> for Bytes {
193    /// from converts a Header into Bytes.
194    fn from(header: Header) -> Self {
195        let mut bytes = BytesMut::with_capacity(HEADER_SIZE);
196        bytes.put_u8(header.id);
197        bytes.put_u8(header.tag.into());
198        bytes.put_u32(header.length);
199        bytes.freeze()
200    }
201}
202
203/// Vortex Protocol
204///
205/// Vortex is a peer-to-peer (P2P) file transfer protocol using TLV (Tag-Length-Value) format for
206/// efficient and flexible data transmission. Designed for reliable and scalable file sharing.
207///
208/// Packet Format:
209///     - Packet Identifier (1 bytes): Uniquely identifies each packet
210///     - Tag (1 bytes): Specifies data type in value field
211///     - Length (8 bytes): Indicates Value field length, up to 4 GiB
212///     - Value (variable): Actual data content, maximum 1 GiB
213///
214/// Protocol Format:
215///
216/// ```text
217/// ---------------------------------------------------------------------------------------------------
218/// |                             |                    |                    |                         |
219/// | Packet Identifier (1 bytes) |    Tag (1 bytes)   |  Length (8 bytes)  |   Value (up to 4 GiB)   |
220/// |                             |                    |                    |                         |
221/// ---------------------------------------------------------------------------------------------------
222/// ```
223///
224/// For more information, please refer to the [Vortex Protocol](https://github.com/dragonflyoss/vortex/blob/main/docs/README.md).
225#[derive(Debug, Clone)]
226pub enum Vortex {
227    DownloadPiece(Header, tlv::download_piece::DownloadPiece),
228    PieceContent(Header, tlv::piece_content::PieceContent),
229    DownloadCachePiece(Header, tlv::download_cache_piece::DownloadCachePiece),
230    CachePieceContent(Header, tlv::cache_piece_content::CachePieceContent),
231    DownloadPersistentCachePiece(
232        Header,
233        tlv::download_persistent_cache_piece::DownloadPersistentCachePiece,
234    ),
235    PersistentCachePieceContent(
236        Header,
237        tlv::persistent_cache_piece_content::PersistentCachePieceContent,
238    ),
239    Reserved(Header),
240    Close(Header),
241    Error(Header, tlv::error::Error),
242}
243
244/// Vortex implements the Vortex functions.
245impl Vortex {
246    /// Creates a new Vortex packet.
247    pub fn new(tag: tlv::Tag, value: Bytes) -> Result<Self> {
248        (tag, Header::new(tag, value.len() as u32), value).try_into()
249    }
250
251    /// id returns the packet identifier of the Vortex packet.
252    #[inline]
253    pub fn id(&self) -> u8 {
254        match self {
255            Vortex::DownloadPiece(header, _) => header.id,
256            Vortex::PieceContent(header, _) => header.id,
257            Vortex::DownloadCachePiece(header, _) => header.id,
258            Vortex::CachePieceContent(header, _) => header.id,
259            Vortex::DownloadPersistentCachePiece(header, _) => header.id,
260            Vortex::PersistentCachePieceContent(header, _) => header.id,
261            Vortex::Reserved(header) => header.id,
262            Vortex::Close(header) => header.id,
263            Vortex::Error(header, _) => header.id,
264        }
265    }
266
267    /// tag returns the tag of the Vortex packet.
268    #[inline]
269    pub fn tag(&self) -> tlv::Tag {
270        match self {
271            Vortex::DownloadPiece(header, _) => header.tag,
272            Vortex::PieceContent(header, _) => header.tag,
273            Vortex::DownloadCachePiece(header, _) => header.tag,
274            Vortex::CachePieceContent(header, _) => header.tag,
275            Vortex::DownloadPersistentCachePiece(header, _) => header.tag,
276            Vortex::PersistentCachePieceContent(header, _) => header.tag,
277            Vortex::Reserved(header) => header.tag,
278            Vortex::Close(header) => header.tag,
279            Vortex::Error(header, _) => header.tag,
280        }
281    }
282
283    /// length returns the length of the value field.
284    #[inline]
285    pub fn length(&self) -> usize {
286        match self {
287            Vortex::DownloadPiece(header, _) => header.length as usize,
288            Vortex::PieceContent(header, _) => header.length as usize,
289            Vortex::DownloadCachePiece(header, _) => header.length as usize,
290            Vortex::CachePieceContent(header, _) => header.length as usize,
291            Vortex::DownloadPersistentCachePiece(header, _) => header.length as usize,
292            Vortex::PersistentCachePieceContent(header, _) => header.length as usize,
293            Vortex::Reserved(header) => header.length as usize,
294            Vortex::Close(header) => header.length as usize,
295            Vortex::Error(header, _) => header.length as usize,
296        }
297    }
298
299    /// header returns a reference to the packet header.
300    #[inline]
301    pub fn header(&self) -> &Header {
302        match self {
303            Vortex::DownloadPiece(header, _) => header,
304            Vortex::PieceContent(header, _) => header,
305            Vortex::DownloadCachePiece(header, _) => header,
306            Vortex::CachePieceContent(header, _) => header,
307            Vortex::DownloadPersistentCachePiece(header, _) => header,
308            Vortex::PersistentCachePieceContent(header, _) => header,
309            Vortex::Reserved(header) => header,
310            Vortex::Close(header) => header,
311            Vortex::Error(header, _) => header,
312        }
313    }
314}
315
316/// Implement TryFrom<Bytes> for Vortex.
317impl TryFrom<Bytes> for Vortex {
318    type Error = Error;
319
320    /// try_from converts a Bytes into a Vortex packet.
321    fn try_from(bytes: Bytes) -> Result<Self> {
322        let mut bytes = BytesMut::from(bytes);
323        let header = bytes.split_to(HEADER_SIZE);
324        let value = bytes.freeze();
325        let header: Header = header.freeze().try_into()?;
326
327        // Check if the value length matches the specified length.
328        if value.len() != header.length as usize {
329            return Err(Error::InvalidLength(format!(
330                "value len {} != declared length {}",
331                value.len(),
332                header.length
333            )));
334        }
335
336        (header.tag, header, value).try_into()
337    }
338}
339
340/// Implement From<PieceContent> for Bytes.
341impl From<Vortex> for Bytes {
342    /// from converts a Vortex packet to Bytes.
343    fn from(packet: Vortex) -> Self {
344        let (header, value) = match packet {
345            Vortex::DownloadPiece(header, download_piece) => (header, download_piece.into()),
346            Vortex::DownloadCachePiece(header, download_cache_piece) => {
347                (header, download_cache_piece.into())
348            }
349            Vortex::DownloadPersistentCachePiece(header, download_persistent_cache_piece) => {
350                (header, download_persistent_cache_piece.into())
351            }
352            Vortex::Reserved(header) => (header, Bytes::new()),
353            Vortex::Close(header) => (header, Bytes::new()),
354            Vortex::Error(header, err) => (header, err.into()),
355            _ => panic!("unsupported packet type for conversion to Bytes"),
356        };
357
358        let mut bytes = BytesMut::with_capacity(HEADER_SIZE + value.len());
359        bytes.put_u8(header.id);
360        bytes.put_u8(header.tag.into());
361        bytes.put_u32(value.len() as u32);
362        bytes.extend_from_slice(&value);
363        bytes.freeze()
364    }
365}
366
367/// Implement TryFrom<(tlv::Tag, Header, Bytes)> for Vortex.
368impl TryFrom<(tlv::Tag, Header, Bytes)> for Vortex {
369    type Error = Error;
370
371    /// try_from converts a tuple of Tag, Header, and Bytes into a Vortex packet.
372    fn try_from((tag, header, value): (tlv::Tag, Header, Bytes)) -> Result<Self> {
373        if value.len() > MAX_VALUE_SIZE {
374            return Err(Error::InvalidLength(format!(
375                "value length {} exceeds maximum allowed size of {} bytes",
376                value.len(),
377                MAX_VALUE_SIZE
378            )));
379        }
380
381        match tag {
382            tlv::Tag::DownloadPiece => {
383                let download_piece = tlv::download_piece::DownloadPiece::try_from(value)?;
384                Ok(Vortex::DownloadPiece(header, download_piece))
385            }
386            tlv::Tag::DownloadCachePiece => {
387                let download_cache_piece =
388                    tlv::download_cache_piece::DownloadCachePiece::try_from(value)?;
389                Ok(Vortex::DownloadCachePiece(header, download_cache_piece))
390            }
391            tlv::Tag::DownloadPersistentCachePiece => {
392                let download_persistent_cache_piece =
393                    tlv::download_persistent_cache_piece::DownloadPersistentCachePiece::try_from(
394                        value,
395                    )?;
396                Ok(Vortex::DownloadPersistentCachePiece(
397                    header,
398                    download_persistent_cache_piece,
399                ))
400            }
401            tlv::Tag::Reserved(_) => Ok(Vortex::Reserved(header)),
402            tlv::Tag::Close => Ok(Vortex::Close(header)),
403            tlv::Tag::Error => {
404                let err = tlv::error::Error::try_from(value)?;
405                Ok(Vortex::Error(header, err))
406            }
407            _ => panic!("unsupported tag for Vortex packet"),
408        }
409    }
410}
411
412#[cfg(test)]
413mod tests {
414    use super::*;
415    use crate::tlv::Tag;
416    use bytes::Bytes;
417
418    #[test]
419    fn test_header_new() {
420        let tag = Tag::DownloadPiece;
421        let value_length = 1024;
422        let header = Header::new(tag, value_length);
423
424        assert_eq!(header.tag, tag);
425        assert_eq!(header.length, value_length);
426        assert!(header.id <= 254);
427    }
428
429    #[test]
430    fn test_header_try_from_bytes_success() {
431        let mut bytes = BytesMut::with_capacity(HEADER_SIZE);
432        bytes.put_u8(42);
433        bytes.put_u8(Tag::DownloadPiece.into());
434        bytes.put_u32(1024);
435        let bytes = bytes.freeze();
436        let header = Header::try_from(bytes).unwrap();
437
438        assert_eq!(header.id, 42);
439        assert_eq!(header.tag, Tag::DownloadPiece);
440        assert_eq!(header.length, 1024);
441    }
442
443    #[test]
444    fn test_header_try_from_bytes_invalid_size() {
445        let bytes = Bytes::from(vec![1, 2, 3]);
446        let result = Header::try_from(bytes);
447        assert!(matches!(result, Err(Error::InvalidPacket(_))));
448    }
449
450    #[test]
451    fn test_header_to_bytes() {
452        let tag = Tag::Close;
453        let header = Header {
454            id: 123,
455            tag,
456            length: 2048,
457        };
458        let bytes: Bytes = header.into();
459
460        assert_eq!(bytes.len(), HEADER_SIZE);
461        assert_eq!(bytes[0], 123);
462        assert_eq!(bytes[1], tag.into());
463        assert_eq!(
464            u32::from_be_bytes(bytes[2..HEADER_SIZE].try_into().unwrap()),
465            2048
466        );
467    }
468
469    #[test]
470    fn test_new_download_piece() {
471        let tag = Tag::DownloadPiece;
472        let mut value = BytesMut::with_capacity(68);
473        value.extend_from_slice("a".repeat(64).as_bytes());
474        value.put_u32(42);
475        let packet = Vortex::new(tag, value.clone().freeze()).unwrap();
476
477        assert_eq!(packet.id(), packet.id());
478        assert_eq!(packet.tag(), tag);
479        assert_eq!(packet.length(), value.len());
480    }
481
482    #[test]
483    fn test_close() {
484        let tag = Tag::Close;
485        let value = Bytes::new();
486        let packet = Vortex::new(tag, value.clone()).unwrap();
487
488        assert_eq!(packet.tag(), tag);
489        assert_eq!(packet.length(), value.len());
490    }
491
492    #[test]
493    fn test_error_handling() {
494        let value = vec![0; MAX_VALUE_SIZE + 1];
495        let result = Vortex::new(Tag::PieceContent, value.into());
496
497        assert!(matches!(result, Err(Error::InvalidLength(_))));
498    }
499
500    #[test]
501    fn test_vortex_try_from_bytes_success() {
502        let tag = Tag::Close;
503        let header = Header::new(tag, 0);
504        let header_bytes: Bytes = header.clone().into();
505        let value = Bytes::new();
506
507        let mut packet_bytes = BytesMut::new();
508        packet_bytes.extend_from_slice(&header_bytes);
509        packet_bytes.extend_from_slice(&value);
510        let packet = Vortex::try_from(packet_bytes.freeze()).unwrap();
511
512        assert_eq!(packet.tag(), tag);
513        assert_eq!(packet.length(), 0);
514    }
515
516    #[test]
517    fn test_vortex_try_from_bytes_length_mismatch() {
518        let tag = Tag::Close;
519        let header = Header {
520            id: 1,
521            tag,
522            length: 5,
523        };
524        let header_bytes: Bytes = header.into();
525        let value = Bytes::from("test");
526
527        let mut packet_bytes = BytesMut::new();
528        packet_bytes.extend_from_slice(&header_bytes);
529        packet_bytes.extend_from_slice(&value);
530        let result = Vortex::try_from(packet_bytes.freeze());
531
532        assert!(matches!(result, Err(Error::InvalidLength(_))));
533    }
534
535    #[test]
536    fn test_vortex_to_bytes_download_piece() {
537        let tag = Tag::DownloadPiece;
538        let mut value = BytesMut::with_capacity(68);
539        value.extend_from_slice("a".repeat(64).as_bytes());
540        value.put_u32(42);
541        let packet = Vortex::new(tag, value.clone().freeze()).unwrap();
542        let bytes: Bytes = packet.into();
543
544        assert_eq!(bytes.len(), HEADER_SIZE + value.len());
545    }
546
547    #[test]
548    fn test_vortex_to_bytes_download_cache_piece() {
549        let tag = Tag::DownloadCachePiece;
550        let mut value = BytesMut::with_capacity(68);
551        value.extend_from_slice("a".repeat(64).as_bytes());
552        value.put_u32(42);
553        let packet = Vortex::new(tag, value.clone().freeze()).unwrap();
554        let bytes: Bytes = packet.into();
555
556        assert_eq!(bytes.len(), HEADER_SIZE + value.len());
557    }
558
559    #[test]
560    fn test_vortex_to_bytes_download_persistent_cache_piece() {
561        let tag = Tag::DownloadPersistentCachePiece;
562        let mut value = BytesMut::with_capacity(68);
563        value.extend_from_slice("a".repeat(64).as_bytes());
564        value.put_u32(42);
565        let packet = Vortex::new(tag, value.clone().freeze()).unwrap();
566        let bytes: Bytes = packet.into();
567
568        assert_eq!(bytes.len(), HEADER_SIZE + value.len());
569    }
570
571    #[test]
572    fn test_vortex_to_bytes_reserved() {
573        let tag = Tag::Reserved(50);
574        let packet = Vortex::new(tag, Bytes::new()).unwrap();
575        let bytes: Bytes = packet.into();
576
577        assert_eq!(bytes.len(), HEADER_SIZE);
578    }
579
580    #[test]
581    fn test_vortex_to_bytes_close() {
582        let tag = Tag::Close;
583        let packet = Vortex::new(tag, Bytes::new()).unwrap();
584        let bytes: Bytes = packet.into();
585
586        assert_eq!(bytes.len(), HEADER_SIZE);
587    }
588
589    #[test]
590    fn test_vortex_to_bytes_error() {
591        let tag = Tag::Error;
592        let value = Bytes::from("error details");
593        let packet = Vortex::new(tag, value.clone()).unwrap();
594        let bytes: Bytes = packet.into();
595
596        assert_eq!(bytes.len(), HEADER_SIZE + value.len());
597    }
598
599    #[test]
600    fn test_max_value_size_boundary() {
601        let tag = Tag::Reserved(50);
602        let value = vec![0; MAX_VALUE_SIZE];
603        let result = Vortex::new(tag, value.into());
604
605        assert!(result.is_ok());
606    }
607}