zenoh_protocol/transport/
fragment.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use zenoh_buffers::ZSlice;
15
16use crate::core::Reliability;
17pub use crate::transport::TransportSn;
18
19pub mod flag {
20    pub const R: u8 = 1 << 5; // 0x20 Reliable      if R==1 then the frame is reliable
21    pub const M: u8 = 1 << 6; // 0x40 More          if M==1 then another fragment will follow
22    pub const Z: u8 = 1 << 7; // 0x80 Extensions    if Z==1 then an extension will follow
23}
24
25/// # Fragment message
26///
27/// The [`Fragment`] message is used to transmit on the wire large [`crate::network::NetworkMessage`]
28/// that require fragmentation because they are larger than the maximum batch size
29/// (i.e. 2^16-1) and/or the link MTU.
30///
31/// The [`Fragment`] message flow is the following:
32///
33/// ```text
34///     A                   B
35///     |  FRAGMENT(MORE)   |
36///     |------------------>|
37///     |  FRAGMENT(MORE)   |
38///     |------------------>|
39///     |  FRAGMENT(MORE)   |
40///     |------------------>|
41///     |  FRAGMENT         |
42///     |------------------>|
43///     |                   |
44/// ```
45///
46/// The [`Fragment`] message structure is defined as follows:
47///
48/// ```text
49/// Flags:
50/// - R: Reliable       If R==1 it concerns the reliable channel, else the best-effort channel
51/// - M: More           If M==1 then other fragments will follow
52/// - Z: Extensions     If Z==1 then zenoh extensions will follow.
53///
54///  7 6 5 4 3 2 1 0
55/// +-+-+-+-+-+-+-+-+
56/// |Z|M|R| FRAGMENT|
57/// +-+-+-+---------+
58/// %    seq num    %
59/// +---------------+
60/// ~   [FragExts]  ~ if Flag(Z)==1
61/// +---------------+
62/// ~      [u8]     ~
63/// +---------------+
64/// ```
65/// NOTE: 16 bits (2 bytes) may be prepended to the serialized message indicating the total length
66///       in bytes of the message, resulting in the maximum length of a message being 65535 bytes.
67///       This is necessary in those stream-oriented transports (e.g., TCP) that do not preserve
68///       the boundary of the serialized messages. The length is encoded as little-endian.
69///       In any case, the length of a message must not exceed 65535 bytes.
70///
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub struct Fragment {
73    pub reliability: Reliability,
74    pub more: bool,
75    pub sn: TransportSn,
76    pub payload: ZSlice,
77    pub ext_qos: ext::QoSType,
78    pub ext_first: Option<ext::First>,
79    pub ext_drop: Option<ext::Drop>,
80}
81
82// Extensions
83pub mod ext {
84    use crate::{
85        common::{ZExtUnit, ZExtZ64},
86        zextunit, zextz64,
87    };
88
89    pub type QoS = zextz64!(0x1, true);
90    pub type QoSType = crate::transport::ext::QoSType<{ QoS::ID }>;
91
92    /// # Start extension
93    /// Mark the first fragment of a fragmented message
94    pub type First = zextunit!(0x2, false);
95    /// # Stop extension
96    /// Indicate that the remaining fragments has been dropped
97    pub type Drop = zextunit!(0x3, false);
98}
99
100impl Fragment {
101    #[cfg(feature = "test")]
102    pub fn rand() -> Self {
103        use rand::Rng;
104
105        let mut rng = rand::thread_rng();
106
107        let reliability = Reliability::rand();
108        let more = rng.gen_bool(0.5);
109        let sn: TransportSn = rng.gen();
110        let payload = ZSlice::rand(rng.gen_range(8..128));
111        let ext_qos = ext::QoSType::rand();
112        let ext_first = rng.gen_bool(0.5).then(ext::First::rand);
113        let ext_drop = rng.gen_bool(0.5).then(ext::Drop::rand);
114
115        Fragment {
116            reliability,
117            sn,
118            more,
119            payload,
120            ext_qos,
121            ext_first,
122            ext_drop,
123        }
124    }
125}
126
127// FragmentHeader
128#[derive(Debug, Copy, Clone, PartialEq, Eq)]
129pub struct FragmentHeader {
130    pub reliability: Reliability,
131    pub more: bool,
132    pub sn: TransportSn,
133    pub ext_qos: ext::QoSType,
134    pub ext_first: Option<ext::First>,
135    pub ext_drop: Option<ext::Drop>,
136}
137
138impl FragmentHeader {
139    #[cfg(feature = "test")]
140    pub fn rand() -> Self {
141        use rand::Rng;
142
143        let mut rng = rand::thread_rng();
144
145        let reliability = Reliability::rand();
146        let more = rng.gen_bool(0.5);
147        let sn: TransportSn = rng.gen();
148        let ext_qos = ext::QoSType::rand();
149        let ext_first = rng.gen_bool(0.5).then(ext::First::rand);
150        let ext_drop = rng.gen_bool(0.5).then(ext::Drop::rand);
151
152        FragmentHeader {
153            reliability,
154            more,
155            sn,
156            ext_qos,
157            ext_first,
158            ext_drop,
159        }
160    }
161}