zenoh_protocol/transport/
mod.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14pub mod close;
15pub mod fragment;
16pub mod frame;
17pub mod init;
18pub mod join;
19pub mod keepalive;
20pub mod oam;
21pub mod open;
22
23use core::fmt;
24
25pub use close::Close;
26pub use fragment::{Fragment, FragmentHeader};
27pub use frame::{Frame, FrameHeader};
28pub use init::{InitAck, InitSyn};
29pub use join::Join;
30pub use keepalive::KeepAlive;
31pub use oam::Oam;
32pub use open::{OpenAck, OpenSyn};
33
34use crate::network::{NetworkMessage, NetworkMessageRef};
35
36/// NOTE: 16 bits (2 bytes) may be prepended to the serialized message indicating the total length
37///       in bytes of the message, resulting in the maximum length of a message being 65_535 bytes.
38///       This is necessary in those stream-oriented transports (e.g., TCP) that do not preserve
39///       the boundary of the serialized messages. The length is encoded as little-endian.
40///       In any case, the length of a message must not exceed 65_535 bytes.
41pub type BatchSize = u16;
42pub type AtomicBatchSize = core::sync::atomic::AtomicU16;
43
44pub mod batch_size {
45    use super::BatchSize;
46
47    pub const UNICAST: BatchSize = BatchSize::MAX;
48    pub const MULTICAST: BatchSize = 8_192;
49}
50
51pub mod id {
52    // WARNING: it's crucial that these IDs do NOT collide with the IDs
53    //          defined in `crate::network::id`.
54    pub const OAM: u8 = 0x00;
55    pub const INIT: u8 = 0x01; // For unicast communications only
56    pub const OPEN: u8 = 0x02; // For unicast communications only
57    pub const CLOSE: u8 = 0x03;
58    pub const KEEP_ALIVE: u8 = 0x04;
59    pub const FRAME: u8 = 0x05;
60    pub const FRAGMENT: u8 = 0x06;
61    pub const JOIN: u8 = 0x07; // For multicast communications only
62}
63
64#[derive(Debug)]
65pub struct TransportMessageLowLatency {
66    pub body: TransportBodyLowLatency,
67}
68
69#[derive(Debug, Clone, Copy)]
70pub struct TransportMessageLowLatencyRef<'a> {
71    pub body: TransportBodyLowLatencyRef<'a>,
72}
73
74impl TryFrom<NetworkMessage> for TransportMessageLowLatency {
75    type Error = zenoh_result::Error;
76    fn try_from(msg: NetworkMessage) -> Result<Self, Self::Error> {
77        Ok(Self {
78            body: TransportBodyLowLatency::Network(msg),
79        })
80    }
81}
82
83#[allow(clippy::large_enum_variant)]
84#[derive(Debug)]
85pub enum TransportBodyLowLatency {
86    Close(Close),
87    KeepAlive(KeepAlive),
88    Network(NetworkMessage),
89}
90
91#[allow(clippy::large_enum_variant)]
92#[derive(Debug, Clone, Copy)]
93pub enum TransportBodyLowLatencyRef<'a> {
94    Close(Close),
95    KeepAlive(KeepAlive),
96    Network(NetworkMessageRef<'a>),
97}
98
99pub type TransportSn = u32;
100
101#[derive(Debug, Copy, Clone, PartialEq, Eq)]
102pub struct PrioritySn {
103    pub reliable: TransportSn,
104    pub best_effort: TransportSn,
105}
106
107impl PrioritySn {
108    pub const DEFAULT: Self = Self {
109        reliable: TransportSn::MIN,
110        best_effort: TransportSn::MIN,
111    };
112
113    #[cfg(feature = "test")]
114    #[doc(hidden)]
115    pub fn rand() -> Self {
116        use rand::Rng;
117        let mut rng = rand::thread_rng();
118
119        Self {
120            reliable: rng.gen(),
121            best_effort: rng.gen(),
122        }
123    }
124}
125
126// Zenoh messages at zenoh-transport level
127#[derive(Debug, Clone, PartialEq, Eq)]
128pub enum TransportBody {
129    InitSyn(InitSyn),
130    InitAck(InitAck),
131    OpenSyn(OpenSyn),
132    OpenAck(OpenAck),
133    Close(Close),
134    KeepAlive(KeepAlive),
135    Frame(Frame),
136    Fragment(Fragment),
137    OAM(Oam),
138    Join(Join),
139}
140
141#[derive(Debug, Clone, PartialEq, Eq)]
142pub struct TransportMessage {
143    pub body: TransportBody,
144}
145
146impl TransportMessage {
147    #[cfg(feature = "test")]
148    #[doc(hidden)]
149    pub fn rand() -> Self {
150        use rand::Rng;
151
152        let mut rng = rand::thread_rng();
153
154        let body = match rng.gen_range(0..10) {
155            0 => TransportBody::InitSyn(InitSyn::rand()),
156            1 => TransportBody::InitAck(InitAck::rand()),
157            2 => TransportBody::OpenSyn(OpenSyn::rand()),
158            3 => TransportBody::OpenAck(OpenAck::rand()),
159            4 => TransportBody::Close(Close::rand()),
160            5 => TransportBody::KeepAlive(KeepAlive::rand()),
161            6 => TransportBody::Frame(Frame::rand()),
162            7 => TransportBody::Fragment(Fragment::rand()),
163            8 => TransportBody::OAM(Oam::rand()),
164            9 => TransportBody::Join(Join::rand()),
165            _ => unreachable!(),
166        };
167
168        Self { body }
169    }
170}
171
172impl From<TransportBody> for TransportMessage {
173    fn from(body: TransportBody) -> Self {
174        Self { body }
175    }
176}
177
178impl From<InitSyn> for TransportMessage {
179    fn from(init_syn: InitSyn) -> Self {
180        TransportBody::InitSyn(init_syn).into()
181    }
182}
183
184impl From<InitAck> for TransportMessage {
185    fn from(init_ack: InitAck) -> Self {
186        TransportBody::InitAck(init_ack).into()
187    }
188}
189
190impl From<OpenSyn> for TransportMessage {
191    fn from(open_syn: OpenSyn) -> Self {
192        TransportBody::OpenSyn(open_syn).into()
193    }
194}
195
196impl From<OpenAck> for TransportMessage {
197    fn from(open_ack: OpenAck) -> Self {
198        TransportBody::OpenAck(open_ack).into()
199    }
200}
201
202impl From<Close> for TransportMessage {
203    fn from(close: Close) -> Self {
204        TransportBody::Close(close).into()
205    }
206}
207
208impl From<KeepAlive> for TransportMessage {
209    fn from(keep_alive: KeepAlive) -> Self {
210        TransportBody::KeepAlive(keep_alive).into()
211    }
212}
213
214impl From<Frame> for TransportMessage {
215    fn from(frame: Frame) -> Self {
216        TransportBody::Frame(frame).into()
217    }
218}
219
220impl From<Fragment> for TransportMessage {
221    fn from(fragment: Fragment) -> Self {
222        TransportBody::Fragment(fragment).into()
223    }
224}
225
226impl From<Join> for TransportMessage {
227    fn from(join: Join) -> Self {
228        TransportBody::Join(join).into()
229    }
230}
231
232impl fmt::Display for TransportMessage {
233    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
234        use TransportBody::*;
235        match &self.body {
236            OAM(_) => write!(f, "OAM"),
237            InitSyn(_) => write!(f, "InitSyn"),
238            InitAck(_) => write!(f, "InitAck"),
239            OpenSyn(_) => write!(f, "OpenSyn"),
240            OpenAck(_) => write!(f, "OpenAck"),
241            Close(_) => write!(f, "Close"),
242            KeepAlive(_) => write!(f, "KeepAlive"),
243            Frame(m) => {
244                write!(f, "Frame[")?;
245                let mut netmsgs = m.payload.iter().peekable();
246                while let Some(m) = netmsgs.next() {
247                    m.fmt(f)?;
248                    if netmsgs.peek().is_some() {
249                        write!(f, ", ")?;
250                    }
251                }
252                write!(f, "]")
253            }
254            Fragment(_) => write!(f, "Fragment"),
255            Join(_) => write!(f, "Join"),
256        }
257    }
258}
259
260pub mod ext {
261    use crate::{common::ZExtZ64, core::Priority};
262
263    /// ```text
264    ///  7 6 5 4 3 2 1 0
265    /// +-+-+-+-+-+-+-+-+
266    /// %0|  rsv  |prio %
267    /// +---------------+
268    /// - prio: Priority class
269    /// ```
270    #[repr(transparent)]
271    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
272    pub struct QoSType<const ID: u8> {
273        inner: u8,
274    }
275
276    impl<const ID: u8> QoSType<{ ID }> {
277        const P_MASK: u8 = 0b00000111;
278        pub const DEFAULT: Self = Self::new(Priority::DEFAULT);
279
280        pub const fn new(priority: Priority) -> Self {
281            Self {
282                inner: priority as u8,
283            }
284        }
285
286        pub const fn priority(&self) -> Priority {
287            unsafe { core::mem::transmute(self.inner & Self::P_MASK) }
288        }
289
290        #[cfg(feature = "test")]
291        #[doc(hidden)]
292        pub fn rand() -> Self {
293            use rand::Rng;
294            let mut rng = rand::thread_rng();
295
296            let inner: u8 = rng.gen();
297            Self { inner }
298        }
299    }
300
301    impl<const ID: u8> Default for QoSType<{ ID }> {
302        fn default() -> Self {
303            Self::DEFAULT
304        }
305    }
306
307    impl<const ID: u8> From<ZExtZ64<{ ID }>> for QoSType<{ ID }> {
308        fn from(ext: ZExtZ64<{ ID }>) -> Self {
309            Self {
310                inner: ext.value as u8,
311            }
312        }
313    }
314
315    impl<const ID: u8> From<QoSType<{ ID }>> for ZExtZ64<{ ID }> {
316        fn from(ext: QoSType<{ ID }>) -> Self {
317            ZExtZ64::new(ext.inner as u64)
318        }
319    }
320
321    #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
322    pub struct PatchType<const ID: u8>(u8);
323
324    impl<const ID: u8> PatchType<ID> {
325        pub const NONE: Self = Self(0);
326        pub const CURRENT: Self = Self(1);
327
328        pub fn new(int: u8) -> Self {
329            Self(int)
330        }
331
332        pub fn raw(self) -> u8 {
333            self.0
334        }
335
336        pub fn has_fragmentation_markers(&self) -> bool {
337            self.0 >= 1
338        }
339
340        #[cfg(feature = "test")]
341        #[doc(hidden)]
342        pub fn rand() -> Self {
343            use rand::Rng;
344            Self(rand::thread_rng().gen())
345        }
346    }
347
348    impl<const ID: u8> From<ZExtZ64<ID>> for PatchType<ID> {
349        fn from(ext: ZExtZ64<ID>) -> Self {
350            Self(ext.value as u8)
351        }
352    }
353
354    impl<const ID: u8> From<PatchType<ID>> for ZExtZ64<ID> {
355        fn from(ext: PatchType<ID>) -> Self {
356            ZExtZ64::new(ext.0 as u64)
357        }
358    }
359}