zenoh_protocol/transport/
mod.rs1pub mod close;
15pub mod fragment;
16pub mod frame;
17pub mod init;
18pub mod join;
19pub mod keepalive;
20pub mod oam;
21pub mod open;
22
23use core::fmt;
24
25pub use close::Close;
26pub use fragment::{Fragment, FragmentHeader};
27pub use frame::{Frame, FrameHeader};
28pub use init::{InitAck, InitSyn};
29pub use join::Join;
30pub use keepalive::KeepAlive;
31pub use oam::Oam;
32pub use open::{OpenAck, OpenSyn};
33
34use crate::network::{NetworkMessage, NetworkMessageRef};
35
36pub type BatchSize = u16;
42pub type AtomicBatchSize = core::sync::atomic::AtomicU16;
43
44pub mod batch_size {
45 use super::BatchSize;
46
47 pub const UNICAST: BatchSize = BatchSize::MAX;
48 pub const MULTICAST: BatchSize = 8_192;
49}
50
51pub mod id {
52 pub const OAM: u8 = 0x00;
55 pub const INIT: u8 = 0x01; pub const OPEN: u8 = 0x02; pub const CLOSE: u8 = 0x03;
58 pub const KEEP_ALIVE: u8 = 0x04;
59 pub const FRAME: u8 = 0x05;
60 pub const FRAGMENT: u8 = 0x06;
61 pub const JOIN: u8 = 0x07; }
63
64#[derive(Debug)]
65pub struct TransportMessageLowLatency {
66 pub body: TransportBodyLowLatency,
67}
68
69#[derive(Debug, Clone, Copy)]
70pub struct TransportMessageLowLatencyRef<'a> {
71 pub body: TransportBodyLowLatencyRef<'a>,
72}
73
74impl TryFrom<NetworkMessage> for TransportMessageLowLatency {
75 type Error = zenoh_result::Error;
76 fn try_from(msg: NetworkMessage) -> Result<Self, Self::Error> {
77 Ok(Self {
78 body: TransportBodyLowLatency::Network(msg),
79 })
80 }
81}
82
83#[allow(clippy::large_enum_variant)]
84#[derive(Debug)]
85pub enum TransportBodyLowLatency {
86 Close(Close),
87 KeepAlive(KeepAlive),
88 Network(NetworkMessage),
89}
90
91#[allow(clippy::large_enum_variant)]
92#[derive(Debug, Clone, Copy)]
93pub enum TransportBodyLowLatencyRef<'a> {
94 Close(Close),
95 KeepAlive(KeepAlive),
96 Network(NetworkMessageRef<'a>),
97}
98
99pub type TransportSn = u32;
100
101#[derive(Debug, Copy, Clone, PartialEq, Eq)]
102pub struct PrioritySn {
103 pub reliable: TransportSn,
104 pub best_effort: TransportSn,
105}
106
107impl PrioritySn {
108 pub const DEFAULT: Self = Self {
109 reliable: TransportSn::MIN,
110 best_effort: TransportSn::MIN,
111 };
112
113 #[cfg(feature = "test")]
114 #[doc(hidden)]
115 pub fn rand() -> Self {
116 use rand::Rng;
117 let mut rng = rand::thread_rng();
118
119 Self {
120 reliable: rng.gen(),
121 best_effort: rng.gen(),
122 }
123 }
124}
125
126#[derive(Debug, Clone, PartialEq, Eq)]
128pub enum TransportBody {
129 InitSyn(InitSyn),
130 InitAck(InitAck),
131 OpenSyn(OpenSyn),
132 OpenAck(OpenAck),
133 Close(Close),
134 KeepAlive(KeepAlive),
135 Frame(Frame),
136 Fragment(Fragment),
137 OAM(Oam),
138 Join(Join),
139}
140
141#[derive(Debug, Clone, PartialEq, Eq)]
142pub struct TransportMessage {
143 pub body: TransportBody,
144}
145
146impl TransportMessage {
147 #[cfg(feature = "test")]
148 #[doc(hidden)]
149 pub fn rand() -> Self {
150 use rand::Rng;
151
152 let mut rng = rand::thread_rng();
153
154 let body = match rng.gen_range(0..10) {
155 0 => TransportBody::InitSyn(InitSyn::rand()),
156 1 => TransportBody::InitAck(InitAck::rand()),
157 2 => TransportBody::OpenSyn(OpenSyn::rand()),
158 3 => TransportBody::OpenAck(OpenAck::rand()),
159 4 => TransportBody::Close(Close::rand()),
160 5 => TransportBody::KeepAlive(KeepAlive::rand()),
161 6 => TransportBody::Frame(Frame::rand()),
162 7 => TransportBody::Fragment(Fragment::rand()),
163 8 => TransportBody::OAM(Oam::rand()),
164 9 => TransportBody::Join(Join::rand()),
165 _ => unreachable!(),
166 };
167
168 Self { body }
169 }
170}
171
172impl From<TransportBody> for TransportMessage {
173 fn from(body: TransportBody) -> Self {
174 Self { body }
175 }
176}
177
178impl From<InitSyn> for TransportMessage {
179 fn from(init_syn: InitSyn) -> Self {
180 TransportBody::InitSyn(init_syn).into()
181 }
182}
183
184impl From<InitAck> for TransportMessage {
185 fn from(init_ack: InitAck) -> Self {
186 TransportBody::InitAck(init_ack).into()
187 }
188}
189
190impl From<OpenSyn> for TransportMessage {
191 fn from(open_syn: OpenSyn) -> Self {
192 TransportBody::OpenSyn(open_syn).into()
193 }
194}
195
196impl From<OpenAck> for TransportMessage {
197 fn from(open_ack: OpenAck) -> Self {
198 TransportBody::OpenAck(open_ack).into()
199 }
200}
201
202impl From<Close> for TransportMessage {
203 fn from(close: Close) -> Self {
204 TransportBody::Close(close).into()
205 }
206}
207
208impl From<KeepAlive> for TransportMessage {
209 fn from(keep_alive: KeepAlive) -> Self {
210 TransportBody::KeepAlive(keep_alive).into()
211 }
212}
213
214impl From<Frame> for TransportMessage {
215 fn from(frame: Frame) -> Self {
216 TransportBody::Frame(frame).into()
217 }
218}
219
220impl From<Fragment> for TransportMessage {
221 fn from(fragment: Fragment) -> Self {
222 TransportBody::Fragment(fragment).into()
223 }
224}
225
226impl From<Join> for TransportMessage {
227 fn from(join: Join) -> Self {
228 TransportBody::Join(join).into()
229 }
230}
231
232impl fmt::Display for TransportMessage {
233 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
234 use TransportBody::*;
235 match &self.body {
236 OAM(_) => write!(f, "OAM"),
237 InitSyn(_) => write!(f, "InitSyn"),
238 InitAck(_) => write!(f, "InitAck"),
239 OpenSyn(_) => write!(f, "OpenSyn"),
240 OpenAck(_) => write!(f, "OpenAck"),
241 Close(_) => write!(f, "Close"),
242 KeepAlive(_) => write!(f, "KeepAlive"),
243 Frame(m) => {
244 write!(f, "Frame[")?;
245 let mut netmsgs = m.payload.iter().peekable();
246 while let Some(m) = netmsgs.next() {
247 m.fmt(f)?;
248 if netmsgs.peek().is_some() {
249 write!(f, ", ")?;
250 }
251 }
252 write!(f, "]")
253 }
254 Fragment(_) => write!(f, "Fragment"),
255 Join(_) => write!(f, "Join"),
256 }
257 }
258}
259
260pub mod ext {
261 use crate::{common::ZExtZ64, core::Priority};
262
263 #[repr(transparent)]
271 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
272 pub struct QoSType<const ID: u8> {
273 inner: u8,
274 }
275
276 impl<const ID: u8> QoSType<{ ID }> {
277 const P_MASK: u8 = 0b00000111;
278 pub const DEFAULT: Self = Self::new(Priority::DEFAULT);
279
280 pub const fn new(priority: Priority) -> Self {
281 Self {
282 inner: priority as u8,
283 }
284 }
285
286 pub const fn priority(&self) -> Priority {
287 unsafe { core::mem::transmute(self.inner & Self::P_MASK) }
288 }
289
290 #[cfg(feature = "test")]
291 #[doc(hidden)]
292 pub fn rand() -> Self {
293 use rand::Rng;
294 let mut rng = rand::thread_rng();
295
296 let inner: u8 = rng.gen();
297 Self { inner }
298 }
299 }
300
301 impl<const ID: u8> Default for QoSType<{ ID }> {
302 fn default() -> Self {
303 Self::DEFAULT
304 }
305 }
306
307 impl<const ID: u8> From<ZExtZ64<{ ID }>> for QoSType<{ ID }> {
308 fn from(ext: ZExtZ64<{ ID }>) -> Self {
309 Self {
310 inner: ext.value as u8,
311 }
312 }
313 }
314
315 impl<const ID: u8> From<QoSType<{ ID }>> for ZExtZ64<{ ID }> {
316 fn from(ext: QoSType<{ ID }>) -> Self {
317 ZExtZ64::new(ext.inner as u64)
318 }
319 }
320
321 #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
322 pub struct PatchType<const ID: u8>(u8);
323
324 impl<const ID: u8> PatchType<ID> {
325 pub const NONE: Self = Self(0);
326 pub const CURRENT: Self = Self(1);
327
328 pub fn new(int: u8) -> Self {
329 Self(int)
330 }
331
332 pub fn raw(self) -> u8 {
333 self.0
334 }
335
336 pub fn has_fragmentation_markers(&self) -> bool {
337 self.0 >= 1
338 }
339
340 #[cfg(feature = "test")]
341 #[doc(hidden)]
342 pub fn rand() -> Self {
343 use rand::Rng;
344 Self(rand::thread_rng().gen())
345 }
346 }
347
348 impl<const ID: u8> From<ZExtZ64<ID>> for PatchType<ID> {
349 fn from(ext: ZExtZ64<ID>) -> Self {
350 Self(ext.value as u8)
351 }
352 }
353
354 impl<const ID: u8> From<PatchType<ID>> for ZExtZ64<ID> {
355 fn from(ext: PatchType<ID>) -> Self {
356 ZExtZ64::new(ext.0 as u64)
357 }
358 }
359}