zenoh_protocol/transport/
mod.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14pub mod close;
15pub mod fragment;
16pub mod frame;
17pub mod init;
18pub mod join;
19pub mod keepalive;
20pub mod oam;
21pub mod open;
22
23use core::fmt;
24
25pub use close::Close;
26pub use fragment::{Fragment, FragmentHeader};
27pub use frame::{Frame, FrameHeader};
28pub use init::{InitAck, InitSyn};
29pub use join::Join;
30pub use keepalive::KeepAlive;
31pub use oam::Oam;
32pub use open::{OpenAck, OpenSyn};
33
34use crate::network::NetworkMessage;
35
36/// NOTE: 16 bits (2 bytes) may be prepended to the serialized message indicating the total length
37///       in bytes of the message, resulting in the maximum length of a message being 65_535 bytes.
38///       This is necessary in those stream-oriented transports (e.g., TCP) that do not preserve
39///       the boundary of the serialized messages. The length is encoded as little-endian.
40///       In any case, the length of a message must not exceed 65_535 bytes.
41pub type BatchSize = u16;
42pub type AtomicBatchSize = core::sync::atomic::AtomicU16;
43
44pub mod batch_size {
45    use super::BatchSize;
46
47    pub const UNICAST: BatchSize = BatchSize::MAX;
48    pub const MULTICAST: BatchSize = 8_192;
49}
50
51pub mod id {
52    // WARNING: it's crucial that these IDs do NOT collide with the IDs
53    //          defined in `crate::network::id`.
54    pub const OAM: u8 = 0x00;
55    pub const INIT: u8 = 0x01; // For unicast communications only
56    pub const OPEN: u8 = 0x02; // For unicast communications only
57    pub const CLOSE: u8 = 0x03;
58    pub const KEEP_ALIVE: u8 = 0x04;
59    pub const FRAME: u8 = 0x05;
60    pub const FRAGMENT: u8 = 0x06;
61    pub const JOIN: u8 = 0x07; // For multicast communications only
62}
63
64#[derive(Debug)]
65pub struct TransportMessageLowLatency {
66    pub body: TransportBodyLowLatency,
67}
68
69impl TryFrom<NetworkMessage> for TransportMessageLowLatency {
70    type Error = zenoh_result::Error;
71    fn try_from(msg: NetworkMessage) -> Result<Self, Self::Error> {
72        Ok(Self {
73            body: TransportBodyLowLatency::Network(msg),
74        })
75    }
76}
77
78#[allow(clippy::large_enum_variant)]
79#[derive(Debug)]
80pub enum TransportBodyLowLatency {
81    Close(Close),
82    KeepAlive(KeepAlive),
83    Network(NetworkMessage),
84}
85
86pub type TransportSn = u32;
87
88#[derive(Debug, Copy, Clone, PartialEq, Eq)]
89pub struct PrioritySn {
90    pub reliable: TransportSn,
91    pub best_effort: TransportSn,
92}
93
94impl PrioritySn {
95    pub const DEFAULT: Self = Self {
96        reliable: TransportSn::MIN,
97        best_effort: TransportSn::MIN,
98    };
99
100    #[cfg(feature = "test")]
101    pub fn rand() -> Self {
102        use rand::Rng;
103        let mut rng = rand::thread_rng();
104
105        Self {
106            reliable: rng.gen(),
107            best_effort: rng.gen(),
108        }
109    }
110}
111
112// Zenoh messages at zenoh-transport level
113#[derive(Debug, Clone, PartialEq, Eq)]
114pub enum TransportBody {
115    InitSyn(InitSyn),
116    InitAck(InitAck),
117    OpenSyn(OpenSyn),
118    OpenAck(OpenAck),
119    Close(Close),
120    KeepAlive(KeepAlive),
121    Frame(Frame),
122    Fragment(Fragment),
123    OAM(Oam),
124    Join(Join),
125}
126
127#[derive(Debug, Clone, PartialEq, Eq)]
128pub struct TransportMessage {
129    pub body: TransportBody,
130    #[cfg(feature = "stats")]
131    pub size: Option<core::num::NonZeroUsize>,
132}
133
134impl TransportMessage {
135    #[cfg(feature = "test")]
136    pub fn rand() -> Self {
137        use rand::Rng;
138
139        let mut rng = rand::thread_rng();
140
141        let body = match rng.gen_range(0..10) {
142            0 => TransportBody::InitSyn(InitSyn::rand()),
143            1 => TransportBody::InitAck(InitAck::rand()),
144            2 => TransportBody::OpenSyn(OpenSyn::rand()),
145            3 => TransportBody::OpenAck(OpenAck::rand()),
146            4 => TransportBody::Close(Close::rand()),
147            5 => TransportBody::KeepAlive(KeepAlive::rand()),
148            6 => TransportBody::Frame(Frame::rand()),
149            7 => TransportBody::Fragment(Fragment::rand()),
150            8 => TransportBody::OAM(Oam::rand()),
151            9 => TransportBody::Join(Join::rand()),
152            _ => unreachable!(),
153        };
154
155        Self {
156            body,
157            #[cfg(feature = "stats")]
158            size: None,
159        }
160    }
161}
162
163impl From<TransportBody> for TransportMessage {
164    fn from(body: TransportBody) -> Self {
165        Self {
166            body,
167            #[cfg(feature = "stats")]
168            size: None,
169        }
170    }
171}
172
173impl From<InitSyn> for TransportMessage {
174    fn from(init_syn: InitSyn) -> Self {
175        TransportBody::InitSyn(init_syn).into()
176    }
177}
178
179impl From<InitAck> for TransportMessage {
180    fn from(init_ack: InitAck) -> Self {
181        TransportBody::InitAck(init_ack).into()
182    }
183}
184
185impl From<OpenSyn> for TransportMessage {
186    fn from(open_syn: OpenSyn) -> Self {
187        TransportBody::OpenSyn(open_syn).into()
188    }
189}
190
191impl From<OpenAck> for TransportMessage {
192    fn from(open_ack: OpenAck) -> Self {
193        TransportBody::OpenAck(open_ack).into()
194    }
195}
196
197impl From<Close> for TransportMessage {
198    fn from(close: Close) -> Self {
199        TransportBody::Close(close).into()
200    }
201}
202
203impl From<KeepAlive> for TransportMessage {
204    fn from(keep_alive: KeepAlive) -> Self {
205        TransportBody::KeepAlive(keep_alive).into()
206    }
207}
208
209impl From<Frame> for TransportMessage {
210    fn from(frame: Frame) -> Self {
211        TransportBody::Frame(frame).into()
212    }
213}
214
215impl From<Fragment> for TransportMessage {
216    fn from(fragment: Fragment) -> Self {
217        TransportBody::Fragment(fragment).into()
218    }
219}
220
221impl From<Join> for TransportMessage {
222    fn from(join: Join) -> Self {
223        TransportBody::Join(join).into()
224    }
225}
226
227impl fmt::Display for TransportMessage {
228    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
229        use TransportBody::*;
230        match &self.body {
231            OAM(_) => write!(f, "OAM"),
232            InitSyn(_) => write!(f, "InitSyn"),
233            InitAck(_) => write!(f, "InitAck"),
234            OpenSyn(_) => write!(f, "OpenSyn"),
235            OpenAck(_) => write!(f, "OpenAck"),
236            Close(_) => write!(f, "Close"),
237            KeepAlive(_) => write!(f, "KeepAlive"),
238            Frame(m) => {
239                write!(f, "Frame[")?;
240                let mut netmsgs = m.payload.iter().peekable();
241                while let Some(m) = netmsgs.next() {
242                    m.fmt(f)?;
243                    if netmsgs.peek().is_some() {
244                        write!(f, ", ")?;
245                    }
246                }
247                write!(f, "]")
248            }
249            Fragment(_) => write!(f, "Fragment"),
250            Join(_) => write!(f, "Join"),
251        }
252    }
253}
254
255pub mod ext {
256    use crate::{common::ZExtZ64, core::Priority};
257
258    /// ```text
259    ///  7 6 5 4 3 2 1 0
260    /// +-+-+-+-+-+-+-+-+
261    /// %0|  rsv  |prio %
262    /// +---------------+
263    /// - prio: Priority class
264    /// ```
265    #[repr(transparent)]
266    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
267    pub struct QoSType<const ID: u8> {
268        inner: u8,
269    }
270
271    impl<const ID: u8> QoSType<{ ID }> {
272        const P_MASK: u8 = 0b00000111;
273        pub const DEFAULT: Self = Self::new(Priority::DEFAULT);
274
275        pub const fn new(priority: Priority) -> Self {
276            Self {
277                inner: priority as u8,
278            }
279        }
280
281        pub const fn priority(&self) -> Priority {
282            unsafe { core::mem::transmute(self.inner & Self::P_MASK) }
283        }
284
285        #[cfg(feature = "test")]
286        pub fn rand() -> Self {
287            use rand::Rng;
288            let mut rng = rand::thread_rng();
289
290            let inner: u8 = rng.gen();
291            Self { inner }
292        }
293    }
294
295    impl<const ID: u8> Default for QoSType<{ ID }> {
296        fn default() -> Self {
297            Self::DEFAULT
298        }
299    }
300
301    impl<const ID: u8> From<ZExtZ64<{ ID }>> for QoSType<{ ID }> {
302        fn from(ext: ZExtZ64<{ ID }>) -> Self {
303            Self {
304                inner: ext.value as u8,
305            }
306        }
307    }
308
309    impl<const ID: u8> From<QoSType<{ ID }>> for ZExtZ64<{ ID }> {
310        fn from(ext: QoSType<{ ID }>) -> Self {
311            ZExtZ64::new(ext.inner as u64)
312        }
313    }
314
315    #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
316    pub struct PatchType<const ID: u8>(u8);
317
318    impl<const ID: u8> PatchType<ID> {
319        pub const NONE: Self = Self(0);
320        pub const CURRENT: Self = Self(1);
321
322        pub fn new(int: u8) -> Self {
323            Self(int)
324        }
325
326        pub fn raw(self) -> u8 {
327            self.0
328        }
329
330        pub fn has_fragmentation_markers(&self) -> bool {
331            self.0 >= 1
332        }
333
334        #[cfg(feature = "test")]
335        pub fn rand() -> Self {
336            use rand::Rng;
337            Self(rand::thread_rng().gen())
338        }
339    }
340
341    impl<const ID: u8> From<ZExtZ64<ID>> for PatchType<ID> {
342        fn from(ext: ZExtZ64<ID>) -> Self {
343            Self(ext.value as u8)
344        }
345    }
346
347    impl<const ID: u8> From<PatchType<ID>> for ZExtZ64<ID> {
348        fn from(ext: PatchType<ID>) -> Self {
349            ZExtZ64::new(ext.0 as u64)
350        }
351    }
352}