zenoh_protocol/transport/fragment.rs
1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use zenoh_buffers::ZSlice;
15
16use crate::core::Reliability;
17pub use crate::transport::TransportSn;
18
19pub mod flag {
20 pub const R: u8 = 1 << 5; // 0x20 Reliable if R==1 then the frame is reliable
21 pub const M: u8 = 1 << 6; // 0x40 More if M==1 then another fragment will follow
22 pub const Z: u8 = 1 << 7; // 0x80 Extensions if Z==1 then an extension will follow
23}
24
25/// # Fragment message
26///
27/// The [`Fragment`] message is used to transmit on the wire large [`crate::network::NetworkMessage`]
28/// that require fragmentation because they are larger than the maximum batch size
29/// (i.e. 2^16-1) and/or the link MTU.
30///
31/// The [`Fragment`] message flow is the following:
32///
33/// ```text
34/// A B
35/// | FRAGMENT(MORE) |
36/// |------------------>|
37/// | FRAGMENT(MORE) |
38/// |------------------>|
39/// | FRAGMENT(MORE) |
40/// |------------------>|
41/// | FRAGMENT |
42/// |------------------>|
43/// | |
44/// ```
45///
46/// The [`Fragment`] message structure is defined as follows:
47///
48/// ```text
49/// Flags:
50/// - R: Reliable If R==1 it concerns the reliable channel, else the best-effort channel
51/// - M: More If M==1 then other fragments will follow
52/// - Z: Extensions If Z==1 then zenoh extensions will follow.
53///
54/// 7 6 5 4 3 2 1 0
55/// +-+-+-+-+-+-+-+-+
56/// |Z|M|R| FRAGMENT|
57/// +-+-+-+---------+
58/// % seq num %
59/// +---------------+
60/// ~ [FragExts] ~ if Flag(Z)==1
61/// +---------------+
62/// ~ [u8] ~
63/// +---------------+
64/// ```
65/// NOTE: 16 bits (2 bytes) may be prepended to the serialized message indicating the total length
66/// in bytes of the message, resulting in the maximum length of a message being 65535 bytes.
67/// This is necessary in those stream-oriented transports (e.g., TCP) that do not preserve
68/// the boundary of the serialized messages. The length is encoded as little-endian.
69/// In any case, the length of a message must not exceed 65535 bytes.
70///
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub struct Fragment {
73 pub reliability: Reliability,
74 pub more: bool,
75 pub sn: TransportSn,
76 pub payload: ZSlice,
77 pub ext_qos: ext::QoSType,
78 pub ext_first: Option<ext::First>,
79 pub ext_drop: Option<ext::Drop>,
80}
81
82// Extensions
83pub mod ext {
84 use crate::{
85 common::{ZExtUnit, ZExtZ64},
86 zextunit, zextz64,
87 };
88
89 pub type QoS = zextz64!(0x1, true);
90 pub type QoSType = crate::transport::ext::QoSType<{ QoS::ID }>;
91
92 /// # Start extension
93 /// Mark the first fragment of a fragmented message
94 pub type First = zextunit!(0x2, false);
95 /// # Stop extension
96 /// Indicate that the remaining fragments has been dropped
97 pub type Drop = zextunit!(0x3, false);
98}
99
100impl Fragment {
101 #[cfg(feature = "test")]
102 pub fn rand() -> Self {
103 use rand::Rng;
104
105 let mut rng = rand::thread_rng();
106
107 let reliability = Reliability::rand();
108 let more = rng.gen_bool(0.5);
109 let sn: TransportSn = rng.gen();
110 let payload = ZSlice::rand(rng.gen_range(8..128));
111 let ext_qos = ext::QoSType::rand();
112 let ext_first = rng.gen_bool(0.5).then(ext::First::rand);
113 let ext_drop = rng.gen_bool(0.5).then(ext::Drop::rand);
114
115 Fragment {
116 reliability,
117 sn,
118 more,
119 payload,
120 ext_qos,
121 ext_first,
122 ext_drop,
123 }
124 }
125}
126
127// FragmentHeader
128#[derive(Debug, Copy, Clone, PartialEq, Eq)]
129pub struct FragmentHeader {
130 pub reliability: Reliability,
131 pub more: bool,
132 pub sn: TransportSn,
133 pub ext_qos: ext::QoSType,
134 pub ext_first: Option<ext::First>,
135 pub ext_drop: Option<ext::Drop>,
136}
137
138impl FragmentHeader {
139 #[cfg(feature = "test")]
140 pub fn rand() -> Self {
141 use rand::Rng;
142
143 let mut rng = rand::thread_rng();
144
145 let reliability = Reliability::rand();
146 let more = rng.gen_bool(0.5);
147 let sn: TransportSn = rng.gen();
148 let ext_qos = ext::QoSType::rand();
149 let ext_first = rng.gen_bool(0.5).then(ext::First::rand);
150 let ext_drop = rng.gen_bool(0.5).then(ext::Drop::rand);
151
152 FragmentHeader {
153 reliability,
154 more,
155 sn,
156 ext_qos,
157 ext_first,
158 ext_drop,
159 }
160 }
161}