zenoh_protocol/transport/fragment.rs
1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use zenoh_buffers::ZSlice;
15
16use crate::core::Reliability;
17pub use crate::transport::TransportSn;
18
19pub mod flag {
20 pub const R: u8 = 1 << 5; // 0x20 Reliable if R==1 then the frame is reliable
21 pub const M: u8 = 1 << 6; // 0x40 More if M==1 then another fragment will follow
22 pub const Z: u8 = 1 << 7; // 0x80 Extensions if Z==1 then an extension will follow
23}
24
25/// # Fragment message
26///
27/// The [`Fragment`] message is used to transmit on the wire large [`NetworkMessage`](crate::network::NetworkMessage)
28/// that require fragmentation because they are larger than the maximum batch size
29/// (i.e. 2^16-1) and/or the link MTU.
30///
31/// The [`Fragment`] message flow is the following:
32///
33/// ```text
34/// A B
35/// | FRAGMENT(MORE) |
36/// |------------------>|
37/// | FRAGMENT(MORE) |
38/// |------------------>|
39/// | FRAGMENT(MORE) |
40/// |------------------>|
41/// | FRAGMENT |
42/// |------------------>|
43/// | |
44/// ```
45///
46/// The [`Fragment`] message structure is defined as follows:
47///
48/// ```text
49/// Flags:
50/// - R: Reliable If R==1 it concerns the reliable channel, else the best-effort channel
51/// - M: More If M==1 then other fragments will follow
52/// - Z: Extensions If Z==1 then zenoh extensions will follow.
53///
54/// 7 6 5 4 3 2 1 0
55/// +-+-+-+-+-+-+-+-+
56/// |Z|M|R| FRAGMENT|
57/// +-+-+-+---------+
58/// % seq num %
59/// +---------------+
60/// ~ [FragExts] ~ if Flag(Z)==1
61/// +---------------+
62/// ~ [u8] ~
63/// +---------------+
64/// ```
65/// NOTE: 16 bits (2 bytes) may be prepended to the serialized message indicating the total length
66/// in bytes of the message, resulting in the maximum length of a message being 65535 bytes.
67/// This is necessary in those stream-oriented transports (e.g., TCP) that do not preserve
68/// the boundary of the serialized messages. The length is encoded as little-endian.
69/// In any case, the length of a message must not exceed 65535 bytes.
70///
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub struct Fragment {
73 pub reliability: Reliability,
74 pub more: bool,
75 pub sn: TransportSn,
76 pub payload: ZSlice,
77 pub ext_qos: ext::QoSType,
78 pub ext_first: Option<ext::First>,
79 pub ext_drop: Option<ext::Drop>,
80}
81
82// Extensions
83pub mod ext {
84 use crate::{
85 common::{ZExtUnit, ZExtZ64},
86 zextunit, zextz64,
87 };
88
89 pub type QoS = zextz64!(0x1, true);
90 pub type QoSType = crate::transport::ext::QoSType<{ QoS::ID }>;
91
92 /// # Start extension
93 /// Mark the first fragment of a fragmented message
94 pub type First = zextunit!(0x2, false);
95 /// # Stop extension
96 /// Indicate that the remaining fragments has been dropped
97 pub type Drop = zextunit!(0x3, false);
98}
99
100impl Fragment {
101 #[cfg(feature = "test")]
102 #[doc(hidden)]
103 pub fn rand() -> Self {
104 use rand::Rng;
105
106 let mut rng = rand::thread_rng();
107
108 let reliability = Reliability::rand();
109 let more = rng.gen_bool(0.5);
110 let sn: TransportSn = rng.gen();
111 let payload = ZSlice::rand(rng.gen_range(8..128));
112 let ext_qos = ext::QoSType::rand();
113 let ext_first = rng.gen_bool(0.5).then(ext::First::rand);
114 let ext_drop = rng.gen_bool(0.5).then(ext::Drop::rand);
115
116 Fragment {
117 reliability,
118 sn,
119 more,
120 payload,
121 ext_qos,
122 ext_first,
123 ext_drop,
124 }
125 }
126}
127
128// FragmentHeader
129#[derive(Debug, Copy, Clone, PartialEq, Eq)]
130pub struct FragmentHeader {
131 pub reliability: Reliability,
132 pub more: bool,
133 pub sn: TransportSn,
134 pub ext_qos: ext::QoSType,
135 pub ext_first: Option<ext::First>,
136 pub ext_drop: Option<ext::Drop>,
137}
138
139impl FragmentHeader {
140 #[cfg(feature = "test")]
141 #[doc(hidden)]
142 pub fn rand() -> Self {
143 use rand::Rng;
144
145 let mut rng = rand::thread_rng();
146
147 let reliability = Reliability::rand();
148 let more = rng.gen_bool(0.5);
149 let sn: TransportSn = rng.gen();
150 let ext_qos = ext::QoSType::rand();
151 let ext_first = rng.gen_bool(0.5).then(ext::First::rand);
152 let ext_drop = rng.gen_bool(0.5).then(ext::Drop::rand);
153
154 FragmentHeader {
155 reliability,
156 more,
157 sn,
158 ext_qos,
159 ext_first,
160 ext_drop,
161 }
162 }
163}