zenoh_protocol/transport/
mod.rs1pub mod close;
15pub mod fragment;
16pub mod frame;
17pub mod init;
18pub mod join;
19pub mod keepalive;
20pub mod oam;
21pub mod open;
22
23use core::fmt;
24
25pub use close::Close;
26pub use fragment::{Fragment, FragmentHeader};
27pub use frame::{Frame, FrameHeader};
28pub use init::{InitAck, InitSyn};
29pub use join::Join;
30pub use keepalive::KeepAlive;
31pub use oam::Oam;
32pub use open::{OpenAck, OpenSyn};
33
34use crate::network::{NetworkMessage, NetworkMessageRef};
35
36pub type BatchSize = u16;
42pub type AtomicBatchSize = core::sync::atomic::AtomicU16;
43
44pub mod batch_size {
45 use super::BatchSize;
46
47 pub const UNICAST: BatchSize = BatchSize::MAX;
48 pub const MULTICAST: BatchSize = 8_192;
49}
50
51pub mod id {
52 pub const OAM: u8 = 0x00;
55 pub const INIT: u8 = 0x01; pub const OPEN: u8 = 0x02; pub const CLOSE: u8 = 0x03;
58 pub const KEEP_ALIVE: u8 = 0x04;
59 pub const FRAME: u8 = 0x05;
60 pub const FRAGMENT: u8 = 0x06;
61 pub const JOIN: u8 = 0x07; }
63
64#[derive(Debug)]
65pub struct TransportMessageLowLatency {
66 pub body: TransportBodyLowLatency,
67}
68
69#[derive(Debug, Clone, Copy)]
70pub struct TransportMessageLowLatencyRef<'a> {
71 pub body: TransportBodyLowLatencyRef<'a>,
72}
73
74impl TryFrom<NetworkMessage> for TransportMessageLowLatency {
75 type Error = zenoh_result::Error;
76 fn try_from(msg: NetworkMessage) -> Result<Self, Self::Error> {
77 Ok(Self {
78 body: TransportBodyLowLatency::Network(msg),
79 })
80 }
81}
82
83#[allow(clippy::large_enum_variant)]
84#[derive(Debug)]
85pub enum TransportBodyLowLatency {
86 Close(Close),
87 KeepAlive(KeepAlive),
88 Network(NetworkMessage),
89}
90
91#[allow(clippy::large_enum_variant)]
92#[derive(Debug, Clone, Copy)]
93pub enum TransportBodyLowLatencyRef<'a> {
94 Close(Close),
95 KeepAlive(KeepAlive),
96 Network(NetworkMessageRef<'a>),
97}
98
99pub type TransportSn = u32;
100
101#[derive(Debug, Copy, Clone, PartialEq, Eq)]
102pub struct PrioritySn {
103 pub reliable: TransportSn,
104 pub best_effort: TransportSn,
105}
106
107impl PrioritySn {
108 pub const DEFAULT: Self = Self {
109 reliable: TransportSn::MIN,
110 best_effort: TransportSn::MIN,
111 };
112
113 #[cfg(feature = "test")]
114 pub fn rand() -> Self {
115 use rand::Rng;
116 let mut rng = rand::thread_rng();
117
118 Self {
119 reliable: rng.gen(),
120 best_effort: rng.gen(),
121 }
122 }
123}
124
125#[derive(Debug, Clone, PartialEq, Eq)]
127pub enum TransportBody {
128 InitSyn(InitSyn),
129 InitAck(InitAck),
130 OpenSyn(OpenSyn),
131 OpenAck(OpenAck),
132 Close(Close),
133 KeepAlive(KeepAlive),
134 Frame(Frame),
135 Fragment(Fragment),
136 OAM(Oam),
137 Join(Join),
138}
139
140#[derive(Debug, Clone, PartialEq, Eq)]
141pub struct TransportMessage {
142 pub body: TransportBody,
143}
144
145impl TransportMessage {
146 #[cfg(feature = "test")]
147 pub fn rand() -> Self {
148 use rand::Rng;
149
150 let mut rng = rand::thread_rng();
151
152 let body = match rng.gen_range(0..10) {
153 0 => TransportBody::InitSyn(InitSyn::rand()),
154 1 => TransportBody::InitAck(InitAck::rand()),
155 2 => TransportBody::OpenSyn(OpenSyn::rand()),
156 3 => TransportBody::OpenAck(OpenAck::rand()),
157 4 => TransportBody::Close(Close::rand()),
158 5 => TransportBody::KeepAlive(KeepAlive::rand()),
159 6 => TransportBody::Frame(Frame::rand()),
160 7 => TransportBody::Fragment(Fragment::rand()),
161 8 => TransportBody::OAM(Oam::rand()),
162 9 => TransportBody::Join(Join::rand()),
163 _ => unreachable!(),
164 };
165
166 Self { body }
167 }
168}
169
170impl From<TransportBody> for TransportMessage {
171 fn from(body: TransportBody) -> Self {
172 Self { body }
173 }
174}
175
176impl From<InitSyn> for TransportMessage {
177 fn from(init_syn: InitSyn) -> Self {
178 TransportBody::InitSyn(init_syn).into()
179 }
180}
181
182impl From<InitAck> for TransportMessage {
183 fn from(init_ack: InitAck) -> Self {
184 TransportBody::InitAck(init_ack).into()
185 }
186}
187
188impl From<OpenSyn> for TransportMessage {
189 fn from(open_syn: OpenSyn) -> Self {
190 TransportBody::OpenSyn(open_syn).into()
191 }
192}
193
194impl From<OpenAck> for TransportMessage {
195 fn from(open_ack: OpenAck) -> Self {
196 TransportBody::OpenAck(open_ack).into()
197 }
198}
199
200impl From<Close> for TransportMessage {
201 fn from(close: Close) -> Self {
202 TransportBody::Close(close).into()
203 }
204}
205
206impl From<KeepAlive> for TransportMessage {
207 fn from(keep_alive: KeepAlive) -> Self {
208 TransportBody::KeepAlive(keep_alive).into()
209 }
210}
211
212impl From<Frame> for TransportMessage {
213 fn from(frame: Frame) -> Self {
214 TransportBody::Frame(frame).into()
215 }
216}
217
218impl From<Fragment> for TransportMessage {
219 fn from(fragment: Fragment) -> Self {
220 TransportBody::Fragment(fragment).into()
221 }
222}
223
224impl From<Join> for TransportMessage {
225 fn from(join: Join) -> Self {
226 TransportBody::Join(join).into()
227 }
228}
229
230impl fmt::Display for TransportMessage {
231 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
232 use TransportBody::*;
233 match &self.body {
234 OAM(_) => write!(f, "OAM"),
235 InitSyn(_) => write!(f, "InitSyn"),
236 InitAck(_) => write!(f, "InitAck"),
237 OpenSyn(_) => write!(f, "OpenSyn"),
238 OpenAck(_) => write!(f, "OpenAck"),
239 Close(_) => write!(f, "Close"),
240 KeepAlive(_) => write!(f, "KeepAlive"),
241 Frame(m) => {
242 write!(f, "Frame[")?;
243 let mut netmsgs = m.payload.iter().peekable();
244 while let Some(m) = netmsgs.next() {
245 m.fmt(f)?;
246 if netmsgs.peek().is_some() {
247 write!(f, ", ")?;
248 }
249 }
250 write!(f, "]")
251 }
252 Fragment(_) => write!(f, "Fragment"),
253 Join(_) => write!(f, "Join"),
254 }
255 }
256}
257
258pub mod ext {
259 use crate::{common::ZExtZ64, core::Priority};
260
261 #[repr(transparent)]
269 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
270 pub struct QoSType<const ID: u8> {
271 inner: u8,
272 }
273
274 impl<const ID: u8> QoSType<{ ID }> {
275 const P_MASK: u8 = 0b00000111;
276 pub const DEFAULT: Self = Self::new(Priority::DEFAULT);
277
278 pub const fn new(priority: Priority) -> Self {
279 Self {
280 inner: priority as u8,
281 }
282 }
283
284 pub const fn priority(&self) -> Priority {
285 unsafe { core::mem::transmute(self.inner & Self::P_MASK) }
286 }
287
288 #[cfg(feature = "test")]
289 pub fn rand() -> Self {
290 use rand::Rng;
291 let mut rng = rand::thread_rng();
292
293 let inner: u8 = rng.gen();
294 Self { inner }
295 }
296 }
297
298 impl<const ID: u8> Default for QoSType<{ ID }> {
299 fn default() -> Self {
300 Self::DEFAULT
301 }
302 }
303
304 impl<const ID: u8> From<ZExtZ64<{ ID }>> for QoSType<{ ID }> {
305 fn from(ext: ZExtZ64<{ ID }>) -> Self {
306 Self {
307 inner: ext.value as u8,
308 }
309 }
310 }
311
312 impl<const ID: u8> From<QoSType<{ ID }>> for ZExtZ64<{ ID }> {
313 fn from(ext: QoSType<{ ID }>) -> Self {
314 ZExtZ64::new(ext.inner as u64)
315 }
316 }
317
318 #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
319 pub struct PatchType<const ID: u8>(u8);
320
321 impl<const ID: u8> PatchType<ID> {
322 pub const NONE: Self = Self(0);
323 pub const CURRENT: Self = Self(1);
324
325 pub fn new(int: u8) -> Self {
326 Self(int)
327 }
328
329 pub fn raw(self) -> u8 {
330 self.0
331 }
332
333 pub fn has_fragmentation_markers(&self) -> bool {
334 self.0 >= 1
335 }
336
337 #[cfg(feature = "test")]
338 pub fn rand() -> Self {
339 use rand::Rng;
340 Self(rand::thread_rng().gen())
341 }
342 }
343
344 impl<const ID: u8> From<ZExtZ64<ID>> for PatchType<ID> {
345 fn from(ext: ZExtZ64<ID>) -> Self {
346 Self(ext.value as u8)
347 }
348 }
349
350 impl<const ID: u8> From<PatchType<ID>> for ZExtZ64<ID> {
351 fn from(ext: PatchType<ID>) -> Self {
352 ZExtZ64::new(ext.0 as u64)
353 }
354 }
355}