zenoh_protocol/transport/
fragment.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use zenoh_buffers::ZSlice;
15
16use crate::core::Reliability;
17pub use crate::transport::TransportSn;
18
19pub mod flag {
20    pub const R: u8 = 1 << 5; // 0x20 Reliable      if R==1 then the frame is reliable
21    pub const M: u8 = 1 << 6; // 0x40 More          if M==1 then another fragment will follow
22    pub const Z: u8 = 1 << 7; // 0x80 Extensions    if Z==1 then an extension will follow
23}
24
25/// # Fragment message
26///
27/// The [`Fragment`] message is used to transmit on the wire large [`NetworkMessage`](crate::network::NetworkMessage)
28/// that require fragmentation because they are larger than the maximum batch size
29/// (i.e. 2^16-1) and/or the link MTU.
30///
31/// The [`Fragment`] message flow is the following:
32///
33/// ```text
34///     A                   B
35///     |  FRAGMENT(MORE)   |
36///     |------------------>|
37///     |  FRAGMENT(MORE)   |
38///     |------------------>|
39///     |  FRAGMENT(MORE)   |
40///     |------------------>|
41///     |  FRAGMENT         |
42///     |------------------>|
43///     |                   |
44/// ```
45///
46/// The [`Fragment`] message structure is defined as follows:
47///
48/// ```text
49/// Flags:
50/// - R: Reliable       If R==1 it concerns the reliable channel, else the best-effort channel
51/// - M: More           If M==1 then other fragments will follow
52/// - Z: Extensions     If Z==1 then zenoh extensions will follow.
53///
54///  7 6 5 4 3 2 1 0
55/// +-+-+-+-+-+-+-+-+
56/// |Z|M|R| FRAGMENT|
57/// +-+-+-+---------+
58/// %    seq num    %
59/// +---------------+
60/// ~   [FragExts]  ~ if Flag(Z)==1
61/// +---------------+
62/// ~      [u8]     ~
63/// +---------------+
64/// ```
65/// NOTE: 16 bits (2 bytes) may be prepended to the serialized message indicating the total length
66///       in bytes of the message, resulting in the maximum length of a message being 65535 bytes.
67///       This is necessary in those stream-oriented transports (e.g., TCP) that do not preserve
68///       the boundary of the serialized messages. The length is encoded as little-endian.
69///       In any case, the length of a message must not exceed 65535 bytes.
70///
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub struct Fragment {
73    pub reliability: Reliability,
74    pub more: bool,
75    pub sn: TransportSn,
76    pub payload: ZSlice,
77    pub ext_qos: ext::QoSType,
78    pub ext_first: Option<ext::First>,
79    pub ext_drop: Option<ext::Drop>,
80}
81
82// Extensions
83pub mod ext {
84    use crate::{
85        common::{ZExtUnit, ZExtZ64},
86        zextunit, zextz64,
87    };
88
89    pub type QoS = zextz64!(0x1, true);
90    pub type QoSType = crate::transport::ext::QoSType<{ QoS::ID }>;
91
92    /// # Start extension
93    /// Mark the first fragment of a fragmented message
94    pub type First = zextunit!(0x2, false);
95    /// # Stop extension
96    /// Indicate that the remaining fragments has been dropped
97    pub type Drop = zextunit!(0x3, false);
98}
99
100impl Fragment {
101    #[cfg(feature = "test")]
102    #[doc(hidden)]
103    pub fn rand() -> Self {
104        use rand::Rng;
105
106        let mut rng = rand::thread_rng();
107
108        let reliability = Reliability::rand();
109        let more = rng.gen_bool(0.5);
110        let sn: TransportSn = rng.gen();
111        let payload = ZSlice::rand(rng.gen_range(8..128));
112        let ext_qos = ext::QoSType::rand();
113        let ext_first = rng.gen_bool(0.5).then(ext::First::rand);
114        let ext_drop = rng.gen_bool(0.5).then(ext::Drop::rand);
115
116        Fragment {
117            reliability,
118            sn,
119            more,
120            payload,
121            ext_qos,
122            ext_first,
123            ext_drop,
124        }
125    }
126}
127
128// FragmentHeader
129#[derive(Debug, Copy, Clone, PartialEq, Eq)]
130pub struct FragmentHeader {
131    pub reliability: Reliability,
132    pub more: bool,
133    pub sn: TransportSn,
134    pub ext_qos: ext::QoSType,
135    pub ext_first: Option<ext::First>,
136    pub ext_drop: Option<ext::Drop>,
137}
138
139impl FragmentHeader {
140    #[cfg(feature = "test")]
141    #[doc(hidden)]
142    pub fn rand() -> Self {
143        use rand::Rng;
144
145        let mut rng = rand::thread_rng();
146
147        let reliability = Reliability::rand();
148        let more = rng.gen_bool(0.5);
149        let sn: TransportSn = rng.gen();
150        let ext_qos = ext::QoSType::rand();
151        let ext_first = rng.gen_bool(0.5).then(ext::First::rand);
152        let ext_drop = rng.gen_bool(0.5).then(ext::Drop::rand);
153
154        FragmentHeader {
155            reliability,
156            more,
157            sn,
158            ext_qos,
159            ext_first,
160            ext_drop,
161        }
162    }
163}