zenoh_protocol/transport/
mod.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14pub mod close;
15pub mod fragment;
16pub mod frame;
17pub mod init;
18pub mod join;
19pub mod keepalive;
20pub mod oam;
21pub mod open;
22
23use core::fmt;
24
25pub use close::Close;
26pub use fragment::{Fragment, FragmentHeader};
27pub use frame::{Frame, FrameHeader};
28pub use init::{InitAck, InitSyn};
29pub use join::Join;
30pub use keepalive::KeepAlive;
31pub use oam::Oam;
32pub use open::{OpenAck, OpenSyn};
33
34use crate::network::{NetworkMessage, NetworkMessageRef};
35
36/// NOTE: 16 bits (2 bytes) may be prepended to the serialized message indicating the total length
37///       in bytes of the message, resulting in the maximum length of a message being 65_535 bytes.
38///       This is necessary in those stream-oriented transports (e.g., TCP) that do not preserve
39///       the boundary of the serialized messages. The length is encoded as little-endian.
40///       In any case, the length of a message must not exceed 65_535 bytes.
41pub type BatchSize = u16;
42pub type AtomicBatchSize = core::sync::atomic::AtomicU16;
43
44pub mod batch_size {
45    use super::BatchSize;
46
47    pub const UNICAST: BatchSize = BatchSize::MAX;
48    pub const MULTICAST: BatchSize = 8_192;
49}
50
51pub mod id {
52    // WARNING: it's crucial that these IDs do NOT collide with the IDs
53    //          defined in `crate::network::id`.
54    pub const OAM: u8 = 0x00;
55    pub const INIT: u8 = 0x01; // For unicast communications only
56    pub const OPEN: u8 = 0x02; // For unicast communications only
57    pub const CLOSE: u8 = 0x03;
58    pub const KEEP_ALIVE: u8 = 0x04;
59    pub const FRAME: u8 = 0x05;
60    pub const FRAGMENT: u8 = 0x06;
61    pub const JOIN: u8 = 0x07; // For multicast communications only
62}
63
64#[derive(Debug)]
65pub struct TransportMessageLowLatency {
66    pub body: TransportBodyLowLatency,
67}
68
69#[derive(Debug, Clone, Copy)]
70pub struct TransportMessageLowLatencyRef<'a> {
71    pub body: TransportBodyLowLatencyRef<'a>,
72}
73
74impl TryFrom<NetworkMessage> for TransportMessageLowLatency {
75    type Error = zenoh_result::Error;
76    fn try_from(msg: NetworkMessage) -> Result<Self, Self::Error> {
77        Ok(Self {
78            body: TransportBodyLowLatency::Network(msg),
79        })
80    }
81}
82
83#[allow(clippy::large_enum_variant)]
84#[derive(Debug)]
85pub enum TransportBodyLowLatency {
86    Close(Close),
87    KeepAlive(KeepAlive),
88    Network(NetworkMessage),
89}
90
91#[allow(clippy::large_enum_variant)]
92#[derive(Debug, Clone, Copy)]
93pub enum TransportBodyLowLatencyRef<'a> {
94    Close(Close),
95    KeepAlive(KeepAlive),
96    Network(NetworkMessageRef<'a>),
97}
98
99pub type TransportSn = u32;
100
101#[derive(Debug, Copy, Clone, PartialEq, Eq)]
102pub struct PrioritySn {
103    pub reliable: TransportSn,
104    pub best_effort: TransportSn,
105}
106
107impl PrioritySn {
108    pub const DEFAULT: Self = Self {
109        reliable: TransportSn::MIN,
110        best_effort: TransportSn::MIN,
111    };
112
113    #[cfg(feature = "test")]
114    pub fn rand() -> Self {
115        use rand::Rng;
116        let mut rng = rand::thread_rng();
117
118        Self {
119            reliable: rng.gen(),
120            best_effort: rng.gen(),
121        }
122    }
123}
124
125// Zenoh messages at zenoh-transport level
126#[derive(Debug, Clone, PartialEq, Eq)]
127pub enum TransportBody {
128    InitSyn(InitSyn),
129    InitAck(InitAck),
130    OpenSyn(OpenSyn),
131    OpenAck(OpenAck),
132    Close(Close),
133    KeepAlive(KeepAlive),
134    Frame(Frame),
135    Fragment(Fragment),
136    OAM(Oam),
137    Join(Join),
138}
139
140#[derive(Debug, Clone, PartialEq, Eq)]
141pub struct TransportMessage {
142    pub body: TransportBody,
143    #[cfg(feature = "stats")]
144    pub size: Option<core::num::NonZeroUsize>,
145}
146
147impl TransportMessage {
148    #[cfg(feature = "test")]
149    pub fn rand() -> Self {
150        use rand::Rng;
151
152        let mut rng = rand::thread_rng();
153
154        let body = match rng.gen_range(0..10) {
155            0 => TransportBody::InitSyn(InitSyn::rand()),
156            1 => TransportBody::InitAck(InitAck::rand()),
157            2 => TransportBody::OpenSyn(OpenSyn::rand()),
158            3 => TransportBody::OpenAck(OpenAck::rand()),
159            4 => TransportBody::Close(Close::rand()),
160            5 => TransportBody::KeepAlive(KeepAlive::rand()),
161            6 => TransportBody::Frame(Frame::rand()),
162            7 => TransportBody::Fragment(Fragment::rand()),
163            8 => TransportBody::OAM(Oam::rand()),
164            9 => TransportBody::Join(Join::rand()),
165            _ => unreachable!(),
166        };
167
168        Self {
169            body,
170            #[cfg(feature = "stats")]
171            size: None,
172        }
173    }
174}
175
176impl From<TransportBody> for TransportMessage {
177    fn from(body: TransportBody) -> Self {
178        Self {
179            body,
180            #[cfg(feature = "stats")]
181            size: None,
182        }
183    }
184}
185
186impl From<InitSyn> for TransportMessage {
187    fn from(init_syn: InitSyn) -> Self {
188        TransportBody::InitSyn(init_syn).into()
189    }
190}
191
192impl From<InitAck> for TransportMessage {
193    fn from(init_ack: InitAck) -> Self {
194        TransportBody::InitAck(init_ack).into()
195    }
196}
197
198impl From<OpenSyn> for TransportMessage {
199    fn from(open_syn: OpenSyn) -> Self {
200        TransportBody::OpenSyn(open_syn).into()
201    }
202}
203
204impl From<OpenAck> for TransportMessage {
205    fn from(open_ack: OpenAck) -> Self {
206        TransportBody::OpenAck(open_ack).into()
207    }
208}
209
210impl From<Close> for TransportMessage {
211    fn from(close: Close) -> Self {
212        TransportBody::Close(close).into()
213    }
214}
215
216impl From<KeepAlive> for TransportMessage {
217    fn from(keep_alive: KeepAlive) -> Self {
218        TransportBody::KeepAlive(keep_alive).into()
219    }
220}
221
222impl From<Frame> for TransportMessage {
223    fn from(frame: Frame) -> Self {
224        TransportBody::Frame(frame).into()
225    }
226}
227
228impl From<Fragment> for TransportMessage {
229    fn from(fragment: Fragment) -> Self {
230        TransportBody::Fragment(fragment).into()
231    }
232}
233
234impl From<Join> for TransportMessage {
235    fn from(join: Join) -> Self {
236        TransportBody::Join(join).into()
237    }
238}
239
240impl fmt::Display for TransportMessage {
241    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
242        use TransportBody::*;
243        match &self.body {
244            OAM(_) => write!(f, "OAM"),
245            InitSyn(_) => write!(f, "InitSyn"),
246            InitAck(_) => write!(f, "InitAck"),
247            OpenSyn(_) => write!(f, "OpenSyn"),
248            OpenAck(_) => write!(f, "OpenAck"),
249            Close(_) => write!(f, "Close"),
250            KeepAlive(_) => write!(f, "KeepAlive"),
251            Frame(m) => {
252                write!(f, "Frame[")?;
253                let mut netmsgs = m.payload.iter().peekable();
254                while let Some(m) = netmsgs.next() {
255                    m.fmt(f)?;
256                    if netmsgs.peek().is_some() {
257                        write!(f, ", ")?;
258                    }
259                }
260                write!(f, "]")
261            }
262            Fragment(_) => write!(f, "Fragment"),
263            Join(_) => write!(f, "Join"),
264        }
265    }
266}
267
268pub mod ext {
269    use crate::{common::ZExtZ64, core::Priority};
270
271    /// ```text
272    ///  7 6 5 4 3 2 1 0
273    /// +-+-+-+-+-+-+-+-+
274    /// %0|  rsv  |prio %
275    /// +---------------+
276    /// - prio: Priority class
277    /// ```
278    #[repr(transparent)]
279    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
280    pub struct QoSType<const ID: u8> {
281        inner: u8,
282    }
283
284    impl<const ID: u8> QoSType<{ ID }> {
285        const P_MASK: u8 = 0b00000111;
286        pub const DEFAULT: Self = Self::new(Priority::DEFAULT);
287
288        pub const fn new(priority: Priority) -> Self {
289            Self {
290                inner: priority as u8,
291            }
292        }
293
294        pub const fn priority(&self) -> Priority {
295            unsafe { core::mem::transmute(self.inner & Self::P_MASK) }
296        }
297
298        #[cfg(feature = "test")]
299        pub fn rand() -> Self {
300            use rand::Rng;
301            let mut rng = rand::thread_rng();
302
303            let inner: u8 = rng.gen();
304            Self { inner }
305        }
306    }
307
308    impl<const ID: u8> Default for QoSType<{ ID }> {
309        fn default() -> Self {
310            Self::DEFAULT
311        }
312    }
313
314    impl<const ID: u8> From<ZExtZ64<{ ID }>> for QoSType<{ ID }> {
315        fn from(ext: ZExtZ64<{ ID }>) -> Self {
316            Self {
317                inner: ext.value as u8,
318            }
319        }
320    }
321
322    impl<const ID: u8> From<QoSType<{ ID }>> for ZExtZ64<{ ID }> {
323        fn from(ext: QoSType<{ ID }>) -> Self {
324            ZExtZ64::new(ext.inner as u64)
325        }
326    }
327
328    #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
329    pub struct PatchType<const ID: u8>(u8);
330
331    impl<const ID: u8> PatchType<ID> {
332        pub const NONE: Self = Self(0);
333        pub const CURRENT: Self = Self(1);
334
335        pub fn new(int: u8) -> Self {
336            Self(int)
337        }
338
339        pub fn raw(self) -> u8 {
340            self.0
341        }
342
343        pub fn has_fragmentation_markers(&self) -> bool {
344            self.0 >= 1
345        }
346
347        #[cfg(feature = "test")]
348        pub fn rand() -> Self {
349            use rand::Rng;
350            Self(rand::thread_rng().gen())
351        }
352    }
353
354    impl<const ID: u8> From<ZExtZ64<ID>> for PatchType<ID> {
355        fn from(ext: ZExtZ64<ID>) -> Self {
356            Self(ext.value as u8)
357        }
358    }
359
360    impl<const ID: u8> From<PatchType<ID>> for ZExtZ64<ID> {
361        fn from(ext: PatchType<ID>) -> Self {
362            ZExtZ64::new(ext.0 as u64)
363        }
364    }
365}