zenoh_protocol/transport/
mod.rs1pub mod close;
15pub mod fragment;
16pub mod frame;
17pub mod init;
18pub mod join;
19pub mod keepalive;
20pub mod oam;
21pub mod open;
22
23use core::fmt;
24
25pub use close::Close;
26pub use fragment::{Fragment, FragmentHeader};
27pub use frame::{Frame, FrameHeader};
28pub use init::{InitAck, InitSyn};
29pub use join::Join;
30pub use keepalive::KeepAlive;
31pub use oam::Oam;
32pub use open::{OpenAck, OpenSyn};
33
34use crate::network::NetworkMessage;
35
36pub type BatchSize = u16;
42pub type AtomicBatchSize = core::sync::atomic::AtomicU16;
43
44pub mod batch_size {
45 use super::BatchSize;
46
47 pub const UNICAST: BatchSize = BatchSize::MAX;
48 pub const MULTICAST: BatchSize = 8_192;
49}
50
51pub mod id {
52 pub const OAM: u8 = 0x00;
55 pub const INIT: u8 = 0x01; pub const OPEN: u8 = 0x02; pub const CLOSE: u8 = 0x03;
58 pub const KEEP_ALIVE: u8 = 0x04;
59 pub const FRAME: u8 = 0x05;
60 pub const FRAGMENT: u8 = 0x06;
61 pub const JOIN: u8 = 0x07; }
63
64#[derive(Debug)]
65pub struct TransportMessageLowLatency {
66 pub body: TransportBodyLowLatency,
67}
68
69impl TryFrom<NetworkMessage> for TransportMessageLowLatency {
70 type Error = zenoh_result::Error;
71 fn try_from(msg: NetworkMessage) -> Result<Self, Self::Error> {
72 Ok(Self {
73 body: TransportBodyLowLatency::Network(msg),
74 })
75 }
76}
77
78#[allow(clippy::large_enum_variant)]
79#[derive(Debug)]
80pub enum TransportBodyLowLatency {
81 Close(Close),
82 KeepAlive(KeepAlive),
83 Network(NetworkMessage),
84}
85
86pub type TransportSn = u32;
87
88#[derive(Debug, Copy, Clone, PartialEq, Eq)]
89pub struct PrioritySn {
90 pub reliable: TransportSn,
91 pub best_effort: TransportSn,
92}
93
94impl PrioritySn {
95 pub const DEFAULT: Self = Self {
96 reliable: TransportSn::MIN,
97 best_effort: TransportSn::MIN,
98 };
99
100 #[cfg(feature = "test")]
101 pub fn rand() -> Self {
102 use rand::Rng;
103 let mut rng = rand::thread_rng();
104
105 Self {
106 reliable: rng.gen(),
107 best_effort: rng.gen(),
108 }
109 }
110}
111
112#[derive(Debug, Clone, PartialEq, Eq)]
114pub enum TransportBody {
115 InitSyn(InitSyn),
116 InitAck(InitAck),
117 OpenSyn(OpenSyn),
118 OpenAck(OpenAck),
119 Close(Close),
120 KeepAlive(KeepAlive),
121 Frame(Frame),
122 Fragment(Fragment),
123 OAM(Oam),
124 Join(Join),
125}
126
127#[derive(Debug, Clone, PartialEq, Eq)]
128pub struct TransportMessage {
129 pub body: TransportBody,
130 #[cfg(feature = "stats")]
131 pub size: Option<core::num::NonZeroUsize>,
132}
133
134impl TransportMessage {
135 #[cfg(feature = "test")]
136 pub fn rand() -> Self {
137 use rand::Rng;
138
139 let mut rng = rand::thread_rng();
140
141 let body = match rng.gen_range(0..10) {
142 0 => TransportBody::InitSyn(InitSyn::rand()),
143 1 => TransportBody::InitAck(InitAck::rand()),
144 2 => TransportBody::OpenSyn(OpenSyn::rand()),
145 3 => TransportBody::OpenAck(OpenAck::rand()),
146 4 => TransportBody::Close(Close::rand()),
147 5 => TransportBody::KeepAlive(KeepAlive::rand()),
148 6 => TransportBody::Frame(Frame::rand()),
149 7 => TransportBody::Fragment(Fragment::rand()),
150 8 => TransportBody::OAM(Oam::rand()),
151 9 => TransportBody::Join(Join::rand()),
152 _ => unreachable!(),
153 };
154
155 Self {
156 body,
157 #[cfg(feature = "stats")]
158 size: None,
159 }
160 }
161}
162
163impl From<TransportBody> for TransportMessage {
164 fn from(body: TransportBody) -> Self {
165 Self {
166 body,
167 #[cfg(feature = "stats")]
168 size: None,
169 }
170 }
171}
172
173impl From<InitSyn> for TransportMessage {
174 fn from(init_syn: InitSyn) -> Self {
175 TransportBody::InitSyn(init_syn).into()
176 }
177}
178
179impl From<InitAck> for TransportMessage {
180 fn from(init_ack: InitAck) -> Self {
181 TransportBody::InitAck(init_ack).into()
182 }
183}
184
185impl From<OpenSyn> for TransportMessage {
186 fn from(open_syn: OpenSyn) -> Self {
187 TransportBody::OpenSyn(open_syn).into()
188 }
189}
190
191impl From<OpenAck> for TransportMessage {
192 fn from(open_ack: OpenAck) -> Self {
193 TransportBody::OpenAck(open_ack).into()
194 }
195}
196
197impl From<Close> for TransportMessage {
198 fn from(close: Close) -> Self {
199 TransportBody::Close(close).into()
200 }
201}
202
203impl From<KeepAlive> for TransportMessage {
204 fn from(keep_alive: KeepAlive) -> Self {
205 TransportBody::KeepAlive(keep_alive).into()
206 }
207}
208
209impl From<Frame> for TransportMessage {
210 fn from(frame: Frame) -> Self {
211 TransportBody::Frame(frame).into()
212 }
213}
214
215impl From<Fragment> for TransportMessage {
216 fn from(fragment: Fragment) -> Self {
217 TransportBody::Fragment(fragment).into()
218 }
219}
220
221impl From<Join> for TransportMessage {
222 fn from(join: Join) -> Self {
223 TransportBody::Join(join).into()
224 }
225}
226
227impl fmt::Display for TransportMessage {
228 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
229 use TransportBody::*;
230 match &self.body {
231 OAM(_) => write!(f, "OAM"),
232 InitSyn(_) => write!(f, "InitSyn"),
233 InitAck(_) => write!(f, "InitAck"),
234 OpenSyn(_) => write!(f, "OpenSyn"),
235 OpenAck(_) => write!(f, "OpenAck"),
236 Close(_) => write!(f, "Close"),
237 KeepAlive(_) => write!(f, "KeepAlive"),
238 Frame(m) => {
239 write!(f, "Frame[")?;
240 let mut netmsgs = m.payload.iter().peekable();
241 while let Some(m) = netmsgs.next() {
242 m.fmt(f)?;
243 if netmsgs.peek().is_some() {
244 write!(f, ", ")?;
245 }
246 }
247 write!(f, "]")
248 }
249 Fragment(_) => write!(f, "Fragment"),
250 Join(_) => write!(f, "Join"),
251 }
252 }
253}
254
255pub mod ext {
256 use crate::{common::ZExtZ64, core::Priority};
257
258 #[repr(transparent)]
266 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
267 pub struct QoSType<const ID: u8> {
268 inner: u8,
269 }
270
271 impl<const ID: u8> QoSType<{ ID }> {
272 const P_MASK: u8 = 0b00000111;
273 pub const DEFAULT: Self = Self::new(Priority::DEFAULT);
274
275 pub const fn new(priority: Priority) -> Self {
276 Self {
277 inner: priority as u8,
278 }
279 }
280
281 pub const fn priority(&self) -> Priority {
282 unsafe { core::mem::transmute(self.inner & Self::P_MASK) }
283 }
284
285 #[cfg(feature = "test")]
286 pub fn rand() -> Self {
287 use rand::Rng;
288 let mut rng = rand::thread_rng();
289
290 let inner: u8 = rng.gen();
291 Self { inner }
292 }
293 }
294
295 impl<const ID: u8> Default for QoSType<{ ID }> {
296 fn default() -> Self {
297 Self::DEFAULT
298 }
299 }
300
301 impl<const ID: u8> From<ZExtZ64<{ ID }>> for QoSType<{ ID }> {
302 fn from(ext: ZExtZ64<{ ID }>) -> Self {
303 Self {
304 inner: ext.value as u8,
305 }
306 }
307 }
308
309 impl<const ID: u8> From<QoSType<{ ID }>> for ZExtZ64<{ ID }> {
310 fn from(ext: QoSType<{ ID }>) -> Self {
311 ZExtZ64::new(ext.inner as u64)
312 }
313 }
314
315 #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
316 pub struct PatchType<const ID: u8>(u8);
317
318 impl<const ID: u8> PatchType<ID> {
319 pub const NONE: Self = Self(0);
320 pub const CURRENT: Self = Self(1);
321
322 pub fn new(int: u8) -> Self {
323 Self(int)
324 }
325
326 pub fn raw(self) -> u8 {
327 self.0
328 }
329
330 pub fn has_fragmentation_markers(&self) -> bool {
331 self.0 >= 1
332 }
333
334 #[cfg(feature = "test")]
335 pub fn rand() -> Self {
336 use rand::Rng;
337 Self(rand::thread_rng().gen())
338 }
339 }
340
341 impl<const ID: u8> From<ZExtZ64<ID>> for PatchType<ID> {
342 fn from(ext: ZExtZ64<ID>) -> Self {
343 Self(ext.value as u8)
344 }
345 }
346
347 impl<const ID: u8> From<PatchType<ID>> for ZExtZ64<ID> {
348 fn from(ext: PatchType<ID>) -> Self {
349 ZExtZ64::new(ext.0 as u64)
350 }
351 }
352}