vortex_protocol/
lib.rs

1/*
2 *     Copyright 2025 The Dragonfly Authors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17use crate::error::{Error, Result};
18use bytes::{BufMut, Bytes, BytesMut};
19use rand::prelude::*;
20
21pub mod error;
22pub mod tlv;
23
24/// HEADER_SIZE is the size of the Vortex packet header including the packet identifier, tag, and
25/// length.
26pub const HEADER_SIZE: usize = 6;
27
28/// MAX_VALUE_SIZE is the maximum size of the value field (4 GiB).
29const MAX_VALUE_SIZE: usize = 4 * 1024 * 1024 * 1024;
30
31/// Header represents the Vortex packet header.
32#[derive(Debug, Clone)]
33pub struct Header {
34    id: u8,
35    tag: tlv::Tag,
36    length: u32,
37}
38
39/// Header implements the Header functions.
40impl Header {
41    /// new creates a new Vortex packet header.
42    pub fn new(tag: tlv::Tag, value_length: u32) -> Self {
43        let mut rng = thread_rng();
44        Self {
45            id: rng.gen(),
46            tag,
47            length: value_length,
48        }
49    }
50
51    /// new_download_piece creates a new Vortex packet header for download piece request.
52    pub fn new_download_piece() -> Self {
53        let mut rng = thread_rng();
54        Self {
55            id: rng.gen(),
56            tag: tlv::Tag::DownloadPiece,
57            length: (tlv::download_piece::TASK_ID_SIZE + tlv::download_piece::PIECE_NUMBER_SIZE)
58                as u32,
59        }
60    }
61
62    /// new_download_cache_piece creates a new Vortex packet header for download cache piece request.
63    pub fn new_download_cache_piece() -> Self {
64        let mut rng = thread_rng();
65        Self {
66            id: rng.gen(),
67            tag: tlv::Tag::DownloadCachePiece,
68            length: (tlv::download_cache_piece::TASK_ID_SIZE
69                + tlv::download_cache_piece::PIECE_NUMBER_SIZE) as u32,
70        }
71    }
72
73    /// new_download_persistent_piece creates a new Vortex packet header for download persistent piece request.
74    pub fn new_download_persistent_piece() -> Self {
75        let mut rng = thread_rng();
76        Self {
77            id: rng.gen(),
78            tag: tlv::Tag::DownloadPersistentPiece,
79            length: (tlv::download_persistent_piece::TASK_ID_SIZE
80                + tlv::download_persistent_piece::PIECE_NUMBER_SIZE) as u32,
81        }
82    }
83
84    /// new_download_persistent_cache_piece creates a new Vortex packet header for download persistent cache piece request.
85    pub fn new_download_persistent_cache_piece() -> Self {
86        let mut rng = thread_rng();
87        Self {
88            id: rng.gen(),
89            tag: tlv::Tag::DownloadPersistentCachePiece,
90            length: (tlv::download_persistent_cache_piece::TASK_ID_SIZE
91                + tlv::download_persistent_cache_piece::PIECE_NUMBER_SIZE)
92                as u32,
93        }
94    }
95
96    /// new_close creates a new Vortex packet header for close message.
97    pub fn new_close() -> Self {
98        let mut rng = thread_rng();
99        Self {
100            id: rng.gen(),
101            tag: tlv::Tag::Close,
102            length: 0,
103        }
104    }
105
106    /// new_piece_content creates a new Vortex packet header for piece content.
107    pub fn new_piece_content(value_length: u32) -> Self {
108        let mut rng = thread_rng();
109        Self {
110            id: rng.gen(),
111            tag: tlv::Tag::PieceContent,
112            length: value_length,
113        }
114    }
115
116    /// new_cache_piece_content creates a new Vortex packet header for cache piece content.
117    pub fn new_cache_piece_content(value_length: u32) -> Self {
118        let mut rng = thread_rng();
119        Self {
120            id: rng.gen(),
121            tag: tlv::Tag::CachePieceContent,
122            length: value_length,
123        }
124    }
125
126    /// new_persistent_piece_content creates a new Vortex packet header for persistent piece content.
127    pub fn new_persistent_piece_content(value_length: u32) -> Self {
128        let mut rng = thread_rng();
129        Self {
130            id: rng.gen(),
131            tag: tlv::Tag::PersistentPieceContent,
132            length: value_length,
133        }
134    }
135
136    /// new_persistent_cache_piece_content creates a new Vortex packet header for persistent cache piece content.
137    pub fn new_persistent_cache_piece_content(value_length: u32) -> Self {
138        let mut rng = thread_rng();
139        Self {
140            id: rng.gen(),
141            tag: tlv::Tag::PersistentCachePieceContent,
142            length: value_length,
143        }
144    }
145
146    /// new_error creates a new Vortex packet header for error.
147    pub fn new_error(value_length: u32) -> Self {
148        let mut rng = thread_rng();
149        Self {
150            id: rng.gen(),
151            tag: tlv::Tag::Error,
152            length: value_length,
153        }
154    }
155
156    /// id returns the packet identifier.
157    pub fn id(&self) -> u8 {
158        self.id
159    }
160
161    /// tag returns the tag.
162    pub fn tag(&self) -> tlv::Tag {
163        self.tag
164    }
165
166    /// length returns the length of the value field.
167    pub fn length(&self) -> u32 {
168        self.length
169    }
170}
171
172/// Implement TryFrom<Bytes> for Header.
173impl TryFrom<Bytes> for Header {
174    type Error = Error;
175
176    /// try_from converts a Bytes into a Header.
177    fn try_from(bytes: Bytes) -> Result<Self> {
178        if bytes.len() != HEADER_SIZE {
179            return Err(Error::InvalidPacket(format!(
180                "expected min {HEADER_SIZE} bytes, got {}",
181                bytes.len()
182            )));
183        }
184
185        let id = bytes
186            .first()
187            .ok_or(Error::InvalidPacket(
188                "insufficient bytes for id".to_string(),
189            ))?
190            .to_owned();
191
192        let tag = bytes
193            .get(1)
194            .ok_or(Error::InvalidPacket(
195                "insufficient bytes for tag".to_string(),
196            ))?
197            .to_owned()
198            .into();
199
200        let length = u32::from_be_bytes(
201            bytes
202                .get(2..HEADER_SIZE)
203                .ok_or(Error::InvalidPacket(
204                    "insufficient bytes for length".to_string(),
205                ))?
206                .try_into()?,
207        );
208        Ok(Header { id, tag, length })
209    }
210}
211
212/// Implement From<Header> for Bytes.
213impl From<Header> for Bytes {
214    /// from converts a Header into Bytes.
215    fn from(header: Header) -> Self {
216        let mut bytes = BytesMut::with_capacity(HEADER_SIZE);
217        bytes.put_u8(header.id);
218        bytes.put_u8(header.tag.into());
219        bytes.put_u32(header.length);
220        bytes.freeze()
221    }
222}
223
224/// Vortex Protocol
225///
226/// Vortex is a peer-to-peer (P2P) file transfer protocol using TLV (Tag-Length-Value) format for
227/// efficient and flexible data transmission. Designed for reliable and scalable file sharing.
228///
229/// Packet Format:
230///     - Packet Identifier (1 bytes): Uniquely identifies each packet
231///     - Tag (1 bytes): Specifies data type in value field
232///     - Length (8 bytes): Indicates Value field length, up to 4 GiB
233///     - Value (variable): Actual data content, maximum 1 GiB
234///
235/// Protocol Format:
236///
237/// ```text
238/// ---------------------------------------------------------------------------------------------------
239/// |                             |                    |                    |                         |
240/// | Packet Identifier (1 bytes) |    Tag (1 bytes)   |  Length (8 bytes)  |   Value (up to 4 GiB)   |
241/// |                             |                    |                    |                         |
242/// ---------------------------------------------------------------------------------------------------
243/// ```
244///
245/// For more information, please refer to the [Vortex Protocol](https://github.com/dragonflyoss/vortex/blob/main/docs/README.md).
246#[derive(Debug, Clone)]
247pub enum Vortex {
248    DownloadPiece(Header, tlv::download_piece::DownloadPiece),
249    PieceContent(Header, tlv::piece_content::PieceContent),
250    DownloadCachePiece(Header, tlv::download_cache_piece::DownloadCachePiece),
251    CachePieceContent(Header, tlv::cache_piece_content::CachePieceContent),
252    DownloadPersistentPiece(
253        Header,
254        tlv::download_persistent_piece::DownloadPersistentPiece,
255    ),
256    PersistentPieceContent(
257        Header,
258        tlv::persistent_piece_content::PersistentPieceContent,
259    ),
260    DownloadPersistentCachePiece(
261        Header,
262        tlv::download_persistent_cache_piece::DownloadPersistentCachePiece,
263    ),
264    PersistentCachePieceContent(
265        Header,
266        tlv::persistent_cache_piece_content::PersistentCachePieceContent,
267    ),
268    Reserved(Header),
269    Close(Header),
270    Error(Header, tlv::error::Error),
271}
272
273/// Vortex implements the Vortex functions.
274impl Vortex {
275    /// Creates a new Vortex packet.
276    pub fn new(tag: tlv::Tag, value: Bytes) -> Result<Self> {
277        (tag, Header::new(tag, value.len() as u32), value).try_into()
278    }
279
280    /// id returns the packet identifier of the Vortex packet.
281    #[inline]
282    pub fn id(&self) -> u8 {
283        match self {
284            Vortex::DownloadPiece(header, _) => header.id,
285            Vortex::PieceContent(header, _) => header.id,
286            Vortex::DownloadCachePiece(header, _) => header.id,
287            Vortex::CachePieceContent(header, _) => header.id,
288            Vortex::DownloadPersistentPiece(header, _) => header.id,
289            Vortex::PersistentPieceContent(header, _) => header.id,
290            Vortex::DownloadPersistentCachePiece(header, _) => header.id,
291            Vortex::PersistentCachePieceContent(header, _) => header.id,
292            Vortex::Reserved(header) => header.id,
293            Vortex::Close(header) => header.id,
294            Vortex::Error(header, _) => header.id,
295        }
296    }
297
298    /// tag returns the tag of the Vortex packet.
299    #[inline]
300    pub fn tag(&self) -> tlv::Tag {
301        match self {
302            Vortex::DownloadPiece(header, _) => header.tag,
303            Vortex::PieceContent(header, _) => header.tag,
304            Vortex::DownloadCachePiece(header, _) => header.tag,
305            Vortex::CachePieceContent(header, _) => header.tag,
306            Vortex::DownloadPersistentPiece(header, _) => header.tag,
307            Vortex::PersistentPieceContent(header, _) => header.tag,
308            Vortex::DownloadPersistentCachePiece(header, _) => header.tag,
309            Vortex::PersistentCachePieceContent(header, _) => header.tag,
310            Vortex::Reserved(header) => header.tag,
311            Vortex::Close(header) => header.tag,
312            Vortex::Error(header, _) => header.tag,
313        }
314    }
315
316    /// length returns the length of the value field.
317    #[inline]
318    pub fn length(&self) -> usize {
319        match self {
320            Vortex::DownloadPiece(header, _) => header.length as usize,
321            Vortex::PieceContent(header, _) => header.length as usize,
322            Vortex::DownloadCachePiece(header, _) => header.length as usize,
323            Vortex::CachePieceContent(header, _) => header.length as usize,
324            Vortex::DownloadPersistentPiece(header, _) => header.length as usize,
325            Vortex::PersistentPieceContent(header, _) => header.length as usize,
326            Vortex::DownloadPersistentCachePiece(header, _) => header.length as usize,
327            Vortex::PersistentCachePieceContent(header, _) => header.length as usize,
328            Vortex::Reserved(header) => header.length as usize,
329            Vortex::Close(header) => header.length as usize,
330            Vortex::Error(header, _) => header.length as usize,
331        }
332    }
333
334    /// header returns a reference to the packet header.
335    #[inline]
336    pub fn header(&self) -> &Header {
337        match self {
338            Vortex::DownloadPiece(header, _) => header,
339            Vortex::PieceContent(header, _) => header,
340            Vortex::DownloadCachePiece(header, _) => header,
341            Vortex::CachePieceContent(header, _) => header,
342            Vortex::DownloadPersistentPiece(header, _) => header,
343            Vortex::PersistentPieceContent(header, _) => header,
344            Vortex::DownloadPersistentCachePiece(header, _) => header,
345            Vortex::PersistentCachePieceContent(header, _) => header,
346            Vortex::Reserved(header) => header,
347            Vortex::Close(header) => header,
348            Vortex::Error(header, _) => header,
349        }
350    }
351}
352
353/// Implement TryFrom<Bytes> for Vortex.
354impl TryFrom<Bytes> for Vortex {
355    type Error = Error;
356
357    /// try_from converts a Bytes into a Vortex packet.
358    fn try_from(bytes: Bytes) -> Result<Self> {
359        let mut bytes = BytesMut::from(bytes);
360        let header = bytes.split_to(HEADER_SIZE);
361        let value = bytes.freeze();
362        let header: Header = header.freeze().try_into()?;
363
364        // Check if the value length matches the specified length.
365        if value.len() != header.length as usize {
366            return Err(Error::InvalidLength(format!(
367                "value len {} != declared length {}",
368                value.len(),
369                header.length
370            )));
371        }
372
373        (header.tag, header, value).try_into()
374    }
375}
376
377/// Implement From<PieceContent> for Bytes.
378impl From<Vortex> for Bytes {
379    /// from converts a Vortex packet to Bytes.
380    fn from(packet: Vortex) -> Self {
381        let (header, value) = match packet {
382            Vortex::DownloadPiece(header, download_piece) => (header, download_piece.into()),
383            Vortex::DownloadCachePiece(header, download_cache_piece) => {
384                (header, download_cache_piece.into())
385            }
386            Vortex::DownloadPersistentPiece(header, download_persistent_piece) => {
387                (header, download_persistent_piece.into())
388            }
389            Vortex::DownloadPersistentCachePiece(header, download_persistent_cache_piece) => {
390                (header, download_persistent_cache_piece.into())
391            }
392            Vortex::Reserved(header) => (header, Bytes::new()),
393            Vortex::Close(header) => (header, Bytes::new()),
394            Vortex::Error(header, err) => (header, err.into()),
395            _ => panic!("unsupported packet type for conversion to Bytes"),
396        };
397
398        let mut bytes = BytesMut::with_capacity(HEADER_SIZE + value.len());
399        bytes.put_u8(header.id);
400        bytes.put_u8(header.tag.into());
401        bytes.put_u32(value.len() as u32);
402        bytes.extend_from_slice(&value);
403        bytes.freeze()
404    }
405}
406
407/// Implement TryFrom<(tlv::Tag, Header, Bytes)> for Vortex.
408impl TryFrom<(tlv::Tag, Header, Bytes)> for Vortex {
409    type Error = Error;
410
411    /// try_from converts a tuple of Tag, Header, and Bytes into a Vortex packet.
412    fn try_from((tag, header, value): (tlv::Tag, Header, Bytes)) -> Result<Self> {
413        if value.len() > MAX_VALUE_SIZE {
414            return Err(Error::InvalidLength(format!(
415                "value length {} exceeds maximum allowed size of {} bytes",
416                value.len(),
417                MAX_VALUE_SIZE
418            )));
419        }
420
421        match tag {
422            tlv::Tag::DownloadPiece => {
423                let download_piece = tlv::download_piece::DownloadPiece::try_from(value)?;
424                Ok(Vortex::DownloadPiece(header, download_piece))
425            }
426            tlv::Tag::DownloadCachePiece => {
427                let download_cache_piece =
428                    tlv::download_cache_piece::DownloadCachePiece::try_from(value)?;
429                Ok(Vortex::DownloadCachePiece(header, download_cache_piece))
430            }
431            tlv::Tag::DownloadPersistentPiece => {
432                let download_persistent_piece =
433                    tlv::download_persistent_piece::DownloadPersistentPiece::try_from(value)?;
434                Ok(Vortex::DownloadPersistentPiece(
435                    header,
436                    download_persistent_piece,
437                ))
438            }
439            tlv::Tag::DownloadPersistentCachePiece => {
440                let download_persistent_cache_piece =
441                    tlv::download_persistent_cache_piece::DownloadPersistentCachePiece::try_from(
442                        value,
443                    )?;
444                Ok(Vortex::DownloadPersistentCachePiece(
445                    header,
446                    download_persistent_cache_piece,
447                ))
448            }
449            tlv::Tag::Reserved(_) => Ok(Vortex::Reserved(header)),
450            tlv::Tag::Close => Ok(Vortex::Close(header)),
451            tlv::Tag::Error => {
452                let err = tlv::error::Error::try_from(value)?;
453                Ok(Vortex::Error(header, err))
454            }
455            _ => panic!("unsupported tag for Vortex packet"),
456        }
457    }
458}
459
460#[cfg(test)]
461mod tests {
462    use super::*;
463    use crate::tlv::Tag;
464    use bytes::Bytes;
465
466    #[test]
467    fn test_header_new() {
468        let tag = Tag::DownloadPiece;
469        let value_length = 1024;
470        let header = Header::new(tag, value_length);
471
472        assert_eq!(header.tag, tag);
473        assert_eq!(header.length, value_length);
474        assert!(header.id <= 254);
475    }
476
477    #[test]
478    fn test_header_try_from_bytes_success() {
479        let mut bytes = BytesMut::with_capacity(HEADER_SIZE);
480        bytes.put_u8(42);
481        bytes.put_u8(Tag::DownloadPiece.into());
482        bytes.put_u32(1024);
483        let bytes = bytes.freeze();
484        let header = Header::try_from(bytes).unwrap();
485
486        assert_eq!(header.id, 42);
487        assert_eq!(header.tag, Tag::DownloadPiece);
488        assert_eq!(header.length, 1024);
489    }
490
491    #[test]
492    fn test_header_try_from_bytes_invalid_size() {
493        let bytes = Bytes::from(vec![1, 2, 3]);
494        let result = Header::try_from(bytes);
495        assert!(matches!(result, Err(Error::InvalidPacket(_))));
496    }
497
498    #[test]
499    fn test_header_to_bytes() {
500        let tag = Tag::Close;
501        let header = Header {
502            id: 123,
503            tag,
504            length: 2048,
505        };
506        let bytes: Bytes = header.into();
507
508        assert_eq!(bytes.len(), HEADER_SIZE);
509        assert_eq!(bytes[0], 123);
510        assert_eq!(bytes[1], tag.into());
511        assert_eq!(
512            u32::from_be_bytes(bytes[2..HEADER_SIZE].try_into().unwrap()),
513            2048
514        );
515    }
516
517    #[test]
518    fn test_new_download_piece() {
519        let tag = Tag::DownloadPiece;
520        let mut value = BytesMut::with_capacity(68);
521        value.extend_from_slice("a".repeat(64).as_bytes());
522        value.put_u32(42);
523        let packet = Vortex::new(tag, value.clone().freeze()).unwrap();
524
525        assert_eq!(packet.id(), packet.id());
526        assert_eq!(packet.tag(), tag);
527        assert_eq!(packet.length(), value.len());
528    }
529
530    #[test]
531    fn test_close() {
532        let tag = Tag::Close;
533        let value = Bytes::new();
534        let packet = Vortex::new(tag, value.clone()).unwrap();
535
536        assert_eq!(packet.tag(), tag);
537        assert_eq!(packet.length(), value.len());
538    }
539
540    #[test]
541    fn test_error_handling() {
542        let value = vec![0; MAX_VALUE_SIZE + 1];
543        let result = Vortex::new(Tag::PieceContent, value.into());
544
545        assert!(matches!(result, Err(Error::InvalidLength(_))));
546    }
547
548    #[test]
549    fn test_vortex_try_from_bytes_success() {
550        let tag = Tag::Close;
551        let header = Header::new(tag, 0);
552        let header_bytes: Bytes = header.clone().into();
553        let value = Bytes::new();
554
555        let mut packet_bytes = BytesMut::new();
556        packet_bytes.extend_from_slice(&header_bytes);
557        packet_bytes.extend_from_slice(&value);
558        let packet = Vortex::try_from(packet_bytes.freeze()).unwrap();
559
560        assert_eq!(packet.tag(), tag);
561        assert_eq!(packet.length(), 0);
562    }
563
564    #[test]
565    fn test_vortex_try_from_bytes_length_mismatch() {
566        let tag = Tag::Close;
567        let header = Header {
568            id: 1,
569            tag,
570            length: 5,
571        };
572        let header_bytes: Bytes = header.into();
573        let value = Bytes::from("test");
574
575        let mut packet_bytes = BytesMut::new();
576        packet_bytes.extend_from_slice(&header_bytes);
577        packet_bytes.extend_from_slice(&value);
578        let result = Vortex::try_from(packet_bytes.freeze());
579
580        assert!(matches!(result, Err(Error::InvalidLength(_))));
581    }
582
583    #[test]
584    fn test_vortex_to_bytes_download_piece() {
585        let tag = Tag::DownloadPiece;
586        let mut value = BytesMut::with_capacity(68);
587        value.extend_from_slice("a".repeat(64).as_bytes());
588        value.put_u32(42);
589        let packet = Vortex::new(tag, value.clone().freeze()).unwrap();
590        let bytes: Bytes = packet.into();
591
592        assert_eq!(bytes.len(), HEADER_SIZE + value.len());
593    }
594
595    #[test]
596    fn test_vortex_to_bytes_download_cache_piece() {
597        let tag = Tag::DownloadCachePiece;
598        let mut value = BytesMut::with_capacity(68);
599        value.extend_from_slice("a".repeat(64).as_bytes());
600        value.put_u32(42);
601        let packet = Vortex::new(tag, value.clone().freeze()).unwrap();
602        let bytes: Bytes = packet.into();
603
604        assert_eq!(bytes.len(), HEADER_SIZE + value.len());
605    }
606
607    #[test]
608    fn test_vortex_to_bytes_download_persistent_piece() {
609        let tag = Tag::DownloadPersistentPiece;
610        let mut value = BytesMut::with_capacity(68);
611        value.extend_from_slice("a".repeat(64).as_bytes());
612        value.put_u32(42);
613        let packet = Vortex::new(tag, value.clone().freeze()).unwrap();
614        let bytes: Bytes = packet.into();
615
616        assert_eq!(bytes.len(), HEADER_SIZE + value.len());
617    }
618
619    #[test]
620    fn test_vortex_to_bytes_download_persistent_cache_piece() {
621        let tag = Tag::DownloadPersistentCachePiece;
622        let mut value = BytesMut::with_capacity(68);
623        value.extend_from_slice("a".repeat(64).as_bytes());
624        value.put_u32(42);
625        let packet = Vortex::new(tag, value.clone().freeze()).unwrap();
626        let bytes: Bytes = packet.into();
627
628        assert_eq!(bytes.len(), HEADER_SIZE + value.len());
629    }
630
631    #[test]
632    fn test_vortex_to_bytes_reserved() {
633        let tag = Tag::Reserved(50);
634        let packet = Vortex::new(tag, Bytes::new()).unwrap();
635        let bytes: Bytes = packet.into();
636
637        assert_eq!(bytes.len(), HEADER_SIZE);
638    }
639
640    #[test]
641    fn test_vortex_to_bytes_close() {
642        let tag = Tag::Close;
643        let packet = Vortex::new(tag, Bytes::new()).unwrap();
644        let bytes: Bytes = packet.into();
645
646        assert_eq!(bytes.len(), HEADER_SIZE);
647    }
648
649    #[test]
650    fn test_vortex_to_bytes_error() {
651        let tag = Tag::Error;
652        let value = Bytes::from("error details");
653        let packet = Vortex::new(tag, value.clone()).unwrap();
654        let bytes: Bytes = packet.into();
655
656        assert_eq!(bytes.len(), HEADER_SIZE + value.len());
657    }
658
659    #[test]
660    fn test_max_value_size_boundary() {
661        let tag = Tag::Reserved(50);
662        let value = vec![0; MAX_VALUE_SIZE];
663        let result = Vortex::new(tag, value.into());
664
665        assert!(result.is_ok());
666    }
667}