zenoh_protocol/transport/
mod.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14pub mod close;
15pub mod fragment;
16pub mod frame;
17pub mod init;
18pub mod join;
19pub mod keepalive;
20pub mod oam;
21pub mod open;
22
23use core::fmt;
24
25pub use close::Close;
26pub use fragment::{Fragment, FragmentHeader};
27pub use frame::{Frame, FrameHeader};
28pub use init::{InitAck, InitSyn};
29pub use join::Join;
30pub use keepalive::KeepAlive;
31pub use oam::Oam;
32pub use open::{OpenAck, OpenSyn};
33
34use crate::network::{NetworkMessage, NetworkMessageRef};
35
36/// NOTE: 16 bits (2 bytes) may be prepended to the serialized message indicating the total length
37///       in bytes of the message, resulting in the maximum length of a message being 65_535 bytes.
38///       This is necessary in those stream-oriented transports (e.g., TCP) that do not preserve
39///       the boundary of the serialized messages. The length is encoded as little-endian.
40///       In any case, the length of a message must not exceed 65_535 bytes.
41pub type BatchSize = u16;
42pub type AtomicBatchSize = core::sync::atomic::AtomicU16;
43
44pub mod batch_size {
45    use super::BatchSize;
46
47    pub const UNICAST: BatchSize = BatchSize::MAX;
48    pub const MULTICAST: BatchSize = 8_192;
49}
50
51pub mod id {
52    // WARNING: it's crucial that these IDs do NOT collide with the IDs
53    //          defined in `crate::network::id`.
54    pub const OAM: u8 = 0x00;
55    pub const INIT: u8 = 0x01; // For unicast communications only
56    pub const OPEN: u8 = 0x02; // For unicast communications only
57    pub const CLOSE: u8 = 0x03;
58    pub const KEEP_ALIVE: u8 = 0x04;
59    pub const FRAME: u8 = 0x05;
60    pub const FRAGMENT: u8 = 0x06;
61    pub const JOIN: u8 = 0x07; // For multicast communications only
62}
63
64#[derive(Debug)]
65pub struct TransportMessageLowLatency {
66    pub body: TransportBodyLowLatency,
67}
68
69#[derive(Debug, Clone, Copy)]
70pub struct TransportMessageLowLatencyRef<'a> {
71    pub body: TransportBodyLowLatencyRef<'a>,
72}
73
74impl TryFrom<NetworkMessage> for TransportMessageLowLatency {
75    type Error = zenoh_result::Error;
76    fn try_from(msg: NetworkMessage) -> Result<Self, Self::Error> {
77        Ok(Self {
78            body: TransportBodyLowLatency::Network(msg),
79        })
80    }
81}
82
83#[allow(clippy::large_enum_variant)]
84#[derive(Debug)]
85pub enum TransportBodyLowLatency {
86    Close(Close),
87    KeepAlive(KeepAlive),
88    Network(NetworkMessage),
89}
90
91#[allow(clippy::large_enum_variant)]
92#[derive(Debug, Clone, Copy)]
93pub enum TransportBodyLowLatencyRef<'a> {
94    Close(Close),
95    KeepAlive(KeepAlive),
96    Network(NetworkMessageRef<'a>),
97}
98
99pub type TransportSn = u32;
100
101#[derive(Debug, Copy, Clone, PartialEq, Eq)]
102pub struct PrioritySn {
103    pub reliable: TransportSn,
104    pub best_effort: TransportSn,
105}
106
107impl PrioritySn {
108    pub const DEFAULT: Self = Self {
109        reliable: TransportSn::MIN,
110        best_effort: TransportSn::MIN,
111    };
112
113    #[cfg(feature = "test")]
114    pub fn rand() -> Self {
115        use rand::Rng;
116        let mut rng = rand::thread_rng();
117
118        Self {
119            reliable: rng.gen(),
120            best_effort: rng.gen(),
121        }
122    }
123}
124
125// Zenoh messages at zenoh-transport level
126#[derive(Debug, Clone, PartialEq, Eq)]
127pub enum TransportBody {
128    InitSyn(InitSyn),
129    InitAck(InitAck),
130    OpenSyn(OpenSyn),
131    OpenAck(OpenAck),
132    Close(Close),
133    KeepAlive(KeepAlive),
134    Frame(Frame),
135    Fragment(Fragment),
136    OAM(Oam),
137    Join(Join),
138}
139
140#[derive(Debug, Clone, PartialEq, Eq)]
141pub struct TransportMessage {
142    pub body: TransportBody,
143}
144
145impl TransportMessage {
146    #[cfg(feature = "test")]
147    pub fn rand() -> Self {
148        use rand::Rng;
149
150        let mut rng = rand::thread_rng();
151
152        let body = match rng.gen_range(0..10) {
153            0 => TransportBody::InitSyn(InitSyn::rand()),
154            1 => TransportBody::InitAck(InitAck::rand()),
155            2 => TransportBody::OpenSyn(OpenSyn::rand()),
156            3 => TransportBody::OpenAck(OpenAck::rand()),
157            4 => TransportBody::Close(Close::rand()),
158            5 => TransportBody::KeepAlive(KeepAlive::rand()),
159            6 => TransportBody::Frame(Frame::rand()),
160            7 => TransportBody::Fragment(Fragment::rand()),
161            8 => TransportBody::OAM(Oam::rand()),
162            9 => TransportBody::Join(Join::rand()),
163            _ => unreachable!(),
164        };
165
166        Self { body }
167    }
168}
169
170impl From<TransportBody> for TransportMessage {
171    fn from(body: TransportBody) -> Self {
172        Self { body }
173    }
174}
175
176impl From<InitSyn> for TransportMessage {
177    fn from(init_syn: InitSyn) -> Self {
178        TransportBody::InitSyn(init_syn).into()
179    }
180}
181
182impl From<InitAck> for TransportMessage {
183    fn from(init_ack: InitAck) -> Self {
184        TransportBody::InitAck(init_ack).into()
185    }
186}
187
188impl From<OpenSyn> for TransportMessage {
189    fn from(open_syn: OpenSyn) -> Self {
190        TransportBody::OpenSyn(open_syn).into()
191    }
192}
193
194impl From<OpenAck> for TransportMessage {
195    fn from(open_ack: OpenAck) -> Self {
196        TransportBody::OpenAck(open_ack).into()
197    }
198}
199
200impl From<Close> for TransportMessage {
201    fn from(close: Close) -> Self {
202        TransportBody::Close(close).into()
203    }
204}
205
206impl From<KeepAlive> for TransportMessage {
207    fn from(keep_alive: KeepAlive) -> Self {
208        TransportBody::KeepAlive(keep_alive).into()
209    }
210}
211
212impl From<Frame> for TransportMessage {
213    fn from(frame: Frame) -> Self {
214        TransportBody::Frame(frame).into()
215    }
216}
217
218impl From<Fragment> for TransportMessage {
219    fn from(fragment: Fragment) -> Self {
220        TransportBody::Fragment(fragment).into()
221    }
222}
223
224impl From<Join> for TransportMessage {
225    fn from(join: Join) -> Self {
226        TransportBody::Join(join).into()
227    }
228}
229
230impl fmt::Display for TransportMessage {
231    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
232        use TransportBody::*;
233        match &self.body {
234            OAM(_) => write!(f, "OAM"),
235            InitSyn(_) => write!(f, "InitSyn"),
236            InitAck(_) => write!(f, "InitAck"),
237            OpenSyn(_) => write!(f, "OpenSyn"),
238            OpenAck(_) => write!(f, "OpenAck"),
239            Close(_) => write!(f, "Close"),
240            KeepAlive(_) => write!(f, "KeepAlive"),
241            Frame(m) => {
242                write!(f, "Frame[")?;
243                let mut netmsgs = m.payload.iter().peekable();
244                while let Some(m) = netmsgs.next() {
245                    m.fmt(f)?;
246                    if netmsgs.peek().is_some() {
247                        write!(f, ", ")?;
248                    }
249                }
250                write!(f, "]")
251            }
252            Fragment(_) => write!(f, "Fragment"),
253            Join(_) => write!(f, "Join"),
254        }
255    }
256}
257
258pub mod ext {
259    use crate::{common::ZExtZ64, core::Priority};
260
261    /// ```text
262    ///  7 6 5 4 3 2 1 0
263    /// +-+-+-+-+-+-+-+-+
264    /// %0|  rsv  |prio %
265    /// +---------------+
266    /// - prio: Priority class
267    /// ```
268    #[repr(transparent)]
269    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
270    pub struct QoSType<const ID: u8> {
271        inner: u8,
272    }
273
274    impl<const ID: u8> QoSType<{ ID }> {
275        const P_MASK: u8 = 0b00000111;
276        pub const DEFAULT: Self = Self::new(Priority::DEFAULT);
277
278        pub const fn new(priority: Priority) -> Self {
279            Self {
280                inner: priority as u8,
281            }
282        }
283
284        pub const fn priority(&self) -> Priority {
285            unsafe { core::mem::transmute(self.inner & Self::P_MASK) }
286        }
287
288        #[cfg(feature = "test")]
289        pub fn rand() -> Self {
290            use rand::Rng;
291            let mut rng = rand::thread_rng();
292
293            let inner: u8 = rng.gen();
294            Self { inner }
295        }
296    }
297
298    impl<const ID: u8> Default for QoSType<{ ID }> {
299        fn default() -> Self {
300            Self::DEFAULT
301        }
302    }
303
304    impl<const ID: u8> From<ZExtZ64<{ ID }>> for QoSType<{ ID }> {
305        fn from(ext: ZExtZ64<{ ID }>) -> Self {
306            Self {
307                inner: ext.value as u8,
308            }
309        }
310    }
311
312    impl<const ID: u8> From<QoSType<{ ID }>> for ZExtZ64<{ ID }> {
313        fn from(ext: QoSType<{ ID }>) -> Self {
314            ZExtZ64::new(ext.inner as u64)
315        }
316    }
317
318    #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
319    pub struct PatchType<const ID: u8>(u8);
320
321    impl<const ID: u8> PatchType<ID> {
322        pub const NONE: Self = Self(0);
323        pub const CURRENT: Self = Self(1);
324
325        pub fn new(int: u8) -> Self {
326            Self(int)
327        }
328
329        pub fn raw(self) -> u8 {
330            self.0
331        }
332
333        pub fn has_fragmentation_markers(&self) -> bool {
334            self.0 >= 1
335        }
336
337        #[cfg(feature = "test")]
338        pub fn rand() -> Self {
339            use rand::Rng;
340            Self(rand::thread_rng().gen())
341        }
342    }
343
344    impl<const ID: u8> From<ZExtZ64<ID>> for PatchType<ID> {
345        fn from(ext: ZExtZ64<ID>) -> Self {
346            Self(ext.value as u8)
347        }
348    }
349
350    impl<const ID: u8> From<PatchType<ID>> for ZExtZ64<ID> {
351        fn from(ext: PatchType<ID>) -> Self {
352            ZExtZ64::new(ext.0 as u64)
353        }
354    }
355}