zenoh_protocol/transport/
mod.rs1pub mod close;
15pub mod fragment;
16pub mod frame;
17pub mod init;
18pub mod join;
19pub mod keepalive;
20pub mod oam;
21pub mod open;
22
23use core::fmt;
24
25pub use close::Close;
26pub use fragment::{Fragment, FragmentHeader};
27pub use frame::{Frame, FrameHeader};
28pub use init::{InitAck, InitSyn};
29pub use join::Join;
30pub use keepalive::KeepAlive;
31pub use oam::Oam;
32pub use open::{OpenAck, OpenSyn};
33
34use crate::network::{NetworkMessage, NetworkMessageRef};
35
36pub type BatchSize = u16;
42pub type AtomicBatchSize = core::sync::atomic::AtomicU16;
43
44pub mod batch_size {
45 use super::BatchSize;
46
47 pub const UNICAST: BatchSize = BatchSize::MAX;
48 pub const MULTICAST: BatchSize = 8_192;
49}
50
51pub mod id {
52 pub const OAM: u8 = 0x00;
55 pub const INIT: u8 = 0x01; pub const OPEN: u8 = 0x02; pub const CLOSE: u8 = 0x03;
58 pub const KEEP_ALIVE: u8 = 0x04;
59 pub const FRAME: u8 = 0x05;
60 pub const FRAGMENT: u8 = 0x06;
61 pub const JOIN: u8 = 0x07; }
63
64#[derive(Debug)]
65pub struct TransportMessageLowLatency {
66 pub body: TransportBodyLowLatency,
67}
68
69#[derive(Debug, Clone, Copy)]
70pub struct TransportMessageLowLatencyRef<'a> {
71 pub body: TransportBodyLowLatencyRef<'a>,
72}
73
74impl TryFrom<NetworkMessage> for TransportMessageLowLatency {
75 type Error = zenoh_result::Error;
76 fn try_from(msg: NetworkMessage) -> Result<Self, Self::Error> {
77 Ok(Self {
78 body: TransportBodyLowLatency::Network(msg),
79 })
80 }
81}
82
83#[allow(clippy::large_enum_variant)]
84#[derive(Debug)]
85pub enum TransportBodyLowLatency {
86 Close(Close),
87 KeepAlive(KeepAlive),
88 Network(NetworkMessage),
89}
90
91#[allow(clippy::large_enum_variant)]
92#[derive(Debug, Clone, Copy)]
93pub enum TransportBodyLowLatencyRef<'a> {
94 Close(Close),
95 KeepAlive(KeepAlive),
96 Network(NetworkMessageRef<'a>),
97}
98
99pub type TransportSn = u32;
100
101#[derive(Debug, Copy, Clone, PartialEq, Eq)]
102pub struct PrioritySn {
103 pub reliable: TransportSn,
104 pub best_effort: TransportSn,
105}
106
107impl PrioritySn {
108 pub const DEFAULT: Self = Self {
109 reliable: TransportSn::MIN,
110 best_effort: TransportSn::MIN,
111 };
112
113 #[cfg(feature = "test")]
114 pub fn rand() -> Self {
115 use rand::Rng;
116 let mut rng = rand::thread_rng();
117
118 Self {
119 reliable: rng.gen(),
120 best_effort: rng.gen(),
121 }
122 }
123}
124
125#[derive(Debug, Clone, PartialEq, Eq)]
127pub enum TransportBody {
128 InitSyn(InitSyn),
129 InitAck(InitAck),
130 OpenSyn(OpenSyn),
131 OpenAck(OpenAck),
132 Close(Close),
133 KeepAlive(KeepAlive),
134 Frame(Frame),
135 Fragment(Fragment),
136 OAM(Oam),
137 Join(Join),
138}
139
140#[derive(Debug, Clone, PartialEq, Eq)]
141pub struct TransportMessage {
142 pub body: TransportBody,
143 #[cfg(feature = "stats")]
144 pub size: Option<core::num::NonZeroUsize>,
145}
146
147impl TransportMessage {
148 #[cfg(feature = "test")]
149 pub fn rand() -> Self {
150 use rand::Rng;
151
152 let mut rng = rand::thread_rng();
153
154 let body = match rng.gen_range(0..10) {
155 0 => TransportBody::InitSyn(InitSyn::rand()),
156 1 => TransportBody::InitAck(InitAck::rand()),
157 2 => TransportBody::OpenSyn(OpenSyn::rand()),
158 3 => TransportBody::OpenAck(OpenAck::rand()),
159 4 => TransportBody::Close(Close::rand()),
160 5 => TransportBody::KeepAlive(KeepAlive::rand()),
161 6 => TransportBody::Frame(Frame::rand()),
162 7 => TransportBody::Fragment(Fragment::rand()),
163 8 => TransportBody::OAM(Oam::rand()),
164 9 => TransportBody::Join(Join::rand()),
165 _ => unreachable!(),
166 };
167
168 Self {
169 body,
170 #[cfg(feature = "stats")]
171 size: None,
172 }
173 }
174}
175
176impl From<TransportBody> for TransportMessage {
177 fn from(body: TransportBody) -> Self {
178 Self {
179 body,
180 #[cfg(feature = "stats")]
181 size: None,
182 }
183 }
184}
185
186impl From<InitSyn> for TransportMessage {
187 fn from(init_syn: InitSyn) -> Self {
188 TransportBody::InitSyn(init_syn).into()
189 }
190}
191
192impl From<InitAck> for TransportMessage {
193 fn from(init_ack: InitAck) -> Self {
194 TransportBody::InitAck(init_ack).into()
195 }
196}
197
198impl From<OpenSyn> for TransportMessage {
199 fn from(open_syn: OpenSyn) -> Self {
200 TransportBody::OpenSyn(open_syn).into()
201 }
202}
203
204impl From<OpenAck> for TransportMessage {
205 fn from(open_ack: OpenAck) -> Self {
206 TransportBody::OpenAck(open_ack).into()
207 }
208}
209
210impl From<Close> for TransportMessage {
211 fn from(close: Close) -> Self {
212 TransportBody::Close(close).into()
213 }
214}
215
216impl From<KeepAlive> for TransportMessage {
217 fn from(keep_alive: KeepAlive) -> Self {
218 TransportBody::KeepAlive(keep_alive).into()
219 }
220}
221
222impl From<Frame> for TransportMessage {
223 fn from(frame: Frame) -> Self {
224 TransportBody::Frame(frame).into()
225 }
226}
227
228impl From<Fragment> for TransportMessage {
229 fn from(fragment: Fragment) -> Self {
230 TransportBody::Fragment(fragment).into()
231 }
232}
233
234impl From<Join> for TransportMessage {
235 fn from(join: Join) -> Self {
236 TransportBody::Join(join).into()
237 }
238}
239
240impl fmt::Display for TransportMessage {
241 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
242 use TransportBody::*;
243 match &self.body {
244 OAM(_) => write!(f, "OAM"),
245 InitSyn(_) => write!(f, "InitSyn"),
246 InitAck(_) => write!(f, "InitAck"),
247 OpenSyn(_) => write!(f, "OpenSyn"),
248 OpenAck(_) => write!(f, "OpenAck"),
249 Close(_) => write!(f, "Close"),
250 KeepAlive(_) => write!(f, "KeepAlive"),
251 Frame(m) => {
252 write!(f, "Frame[")?;
253 let mut netmsgs = m.payload.iter().peekable();
254 while let Some(m) = netmsgs.next() {
255 m.fmt(f)?;
256 if netmsgs.peek().is_some() {
257 write!(f, ", ")?;
258 }
259 }
260 write!(f, "]")
261 }
262 Fragment(_) => write!(f, "Fragment"),
263 Join(_) => write!(f, "Join"),
264 }
265 }
266}
267
268pub mod ext {
269 use crate::{common::ZExtZ64, core::Priority};
270
271 #[repr(transparent)]
279 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
280 pub struct QoSType<const ID: u8> {
281 inner: u8,
282 }
283
284 impl<const ID: u8> QoSType<{ ID }> {
285 const P_MASK: u8 = 0b00000111;
286 pub const DEFAULT: Self = Self::new(Priority::DEFAULT);
287
288 pub const fn new(priority: Priority) -> Self {
289 Self {
290 inner: priority as u8,
291 }
292 }
293
294 pub const fn priority(&self) -> Priority {
295 unsafe { core::mem::transmute(self.inner & Self::P_MASK) }
296 }
297
298 #[cfg(feature = "test")]
299 pub fn rand() -> Self {
300 use rand::Rng;
301 let mut rng = rand::thread_rng();
302
303 let inner: u8 = rng.gen();
304 Self { inner }
305 }
306 }
307
308 impl<const ID: u8> Default for QoSType<{ ID }> {
309 fn default() -> Self {
310 Self::DEFAULT
311 }
312 }
313
314 impl<const ID: u8> From<ZExtZ64<{ ID }>> for QoSType<{ ID }> {
315 fn from(ext: ZExtZ64<{ ID }>) -> Self {
316 Self {
317 inner: ext.value as u8,
318 }
319 }
320 }
321
322 impl<const ID: u8> From<QoSType<{ ID }>> for ZExtZ64<{ ID }> {
323 fn from(ext: QoSType<{ ID }>) -> Self {
324 ZExtZ64::new(ext.inner as u64)
325 }
326 }
327
328 #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
329 pub struct PatchType<const ID: u8>(u8);
330
331 impl<const ID: u8> PatchType<ID> {
332 pub const NONE: Self = Self(0);
333 pub const CURRENT: Self = Self(1);
334
335 pub fn new(int: u8) -> Self {
336 Self(int)
337 }
338
339 pub fn raw(self) -> u8 {
340 self.0
341 }
342
343 pub fn has_fragmentation_markers(&self) -> bool {
344 self.0 >= 1
345 }
346
347 #[cfg(feature = "test")]
348 pub fn rand() -> Self {
349 use rand::Rng;
350 Self(rand::thread_rng().gen())
351 }
352 }
353
354 impl<const ID: u8> From<ZExtZ64<ID>> for PatchType<ID> {
355 fn from(ext: ZExtZ64<ID>) -> Self {
356 Self(ext.value as u8)
357 }
358 }
359
360 impl<const ID: u8> From<PatchType<ID>> for ZExtZ64<ID> {
361 fn from(ext: PatchType<ID>) -> Self {
362 ZExtZ64::new(ext.0 as u64)
363 }
364 }
365}