zenoh_protocol/network/
mod.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14pub mod declare;
15pub mod interest;
16pub mod oam;
17pub mod push;
18pub mod request;
19pub mod response;
20
21use core::fmt;
22
23pub use declare::{
24    Declare, DeclareBody, DeclareFinal, DeclareKeyExpr, DeclareQueryable, DeclareSubscriber,
25    DeclareToken, UndeclareKeyExpr, UndeclareQueryable, UndeclareSubscriber, UndeclareToken,
26};
27pub use interest::Interest;
28pub use oam::Oam;
29pub use push::Push;
30pub use request::{AtomicRequestId, Request, RequestId};
31pub use response::{Response, ResponseFinal};
32
33use crate::core::{CongestionControl, Priority, Reliability};
34
35pub mod id {
36    // WARNING: it's crucial that these IDs do NOT collide with the IDs
37    //          defined in `crate::transport::id`.
38    pub const OAM: u8 = 0x1f;
39    pub const DECLARE: u8 = 0x1e;
40    pub const PUSH: u8 = 0x1d;
41    pub const REQUEST: u8 = 0x1c;
42    pub const RESPONSE: u8 = 0x1b;
43    pub const RESPONSE_FINAL: u8 = 0x1a;
44    pub const INTEREST: u8 = 0x19;
45}
46
47#[repr(u8)]
48#[derive(Debug, Default, Clone, Copy, Hash, PartialEq, Eq)]
49pub enum Mapping {
50    #[default]
51    Receiver = 0,
52    Sender = 1,
53}
54
55impl Mapping {
56    pub const DEFAULT: Self = Self::Receiver;
57
58    #[cfg(feature = "test")]
59    pub fn rand() -> Self {
60        use rand::Rng;
61
62        let mut rng = rand::thread_rng();
63        if rng.gen_bool(0.5) {
64            Mapping::Sender
65        } else {
66            Mapping::Receiver
67        }
68    }
69}
70
71// Zenoh messages at zenoh-network level
72#[derive(Debug, Clone, PartialEq, Eq)]
73pub enum NetworkBody {
74    Push(Push),
75    Request(Request),
76    Response(Response),
77    ResponseFinal(ResponseFinal),
78    Interest(Interest),
79    Declare(Declare),
80    OAM(Oam),
81}
82
83#[derive(Debug, Clone, PartialEq, Eq)]
84pub struct NetworkMessage {
85    pub body: NetworkBody,
86    pub reliability: Reliability,
87    #[cfg(feature = "stats")]
88    pub size: Option<core::num::NonZeroUsize>,
89}
90
91impl NetworkMessage {
92    #[cfg(feature = "test")]
93    pub fn rand() -> Self {
94        use rand::Rng;
95
96        let mut rng = rand::thread_rng();
97
98        let body = match rng.gen_range(0..6) {
99            0 => NetworkBody::Push(Push::rand()),
100            1 => NetworkBody::Request(Request::rand()),
101            2 => NetworkBody::Response(Response::rand()),
102            3 => NetworkBody::ResponseFinal(ResponseFinal::rand()),
103            4 => NetworkBody::Declare(Declare::rand()),
104            5 => NetworkBody::OAM(Oam::rand()),
105            _ => unreachable!(),
106        };
107
108        body.into()
109    }
110
111    #[inline]
112    pub fn is_reliable(&self) -> bool {
113        self.reliability == Reliability::Reliable
114    }
115
116    #[inline]
117    pub fn is_express(&self) -> bool {
118        match &self.body {
119            NetworkBody::Push(msg) => msg.ext_qos.is_express(),
120            NetworkBody::Request(msg) => msg.ext_qos.is_express(),
121            NetworkBody::Response(msg) => msg.ext_qos.is_express(),
122            NetworkBody::ResponseFinal(msg) => msg.ext_qos.is_express(),
123            NetworkBody::Interest(msg) => msg.ext_qos.is_express(),
124            NetworkBody::Declare(msg) => msg.ext_qos.is_express(),
125            NetworkBody::OAM(msg) => msg.ext_qos.is_express(),
126        }
127    }
128
129    #[inline]
130    pub fn is_droppable(&self) -> bool {
131        if !self.is_reliable() {
132            return true;
133        }
134
135        let cc = match &self.body {
136            NetworkBody::Push(msg) => msg.ext_qos.get_congestion_control(),
137            NetworkBody::Request(msg) => msg.ext_qos.get_congestion_control(),
138            NetworkBody::Response(msg) => msg.ext_qos.get_congestion_control(),
139            NetworkBody::ResponseFinal(msg) => msg.ext_qos.get_congestion_control(),
140            NetworkBody::Interest(msg) => msg.ext_qos.get_congestion_control(),
141            NetworkBody::Declare(msg) => msg.ext_qos.get_congestion_control(),
142            NetworkBody::OAM(msg) => msg.ext_qos.get_congestion_control(),
143        };
144
145        cc == CongestionControl::Drop
146    }
147
148    #[inline]
149    pub fn priority(&self) -> Priority {
150        match &self.body {
151            NetworkBody::Push(msg) => msg.ext_qos.get_priority(),
152            NetworkBody::Request(msg) => msg.ext_qos.get_priority(),
153            NetworkBody::Response(msg) => msg.ext_qos.get_priority(),
154            NetworkBody::ResponseFinal(msg) => msg.ext_qos.get_priority(),
155            NetworkBody::Interest(msg) => msg.ext_qos.get_priority(),
156            NetworkBody::Declare(msg) => msg.ext_qos.get_priority(),
157            NetworkBody::OAM(msg) => msg.ext_qos.get_priority(),
158        }
159    }
160}
161
162impl fmt::Display for NetworkMessage {
163    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
164        use NetworkBody::*;
165        match &self.body {
166            OAM(_) => write!(f, "OAM"),
167            Push(_) => write!(f, "Push"),
168            Request(_) => write!(f, "Request"),
169            Response(_) => write!(f, "Response"),
170            ResponseFinal(_) => write!(f, "ResponseFinal"),
171            Interest(_) => write!(f, "Interest"),
172            Declare(_) => write!(f, "Declare"),
173        }
174    }
175}
176
177impl From<NetworkBody> for NetworkMessage {
178    #[inline]
179    fn from(body: NetworkBody) -> Self {
180        Self {
181            body,
182            reliability: Reliability::DEFAULT,
183            #[cfg(feature = "stats")]
184            size: None,
185        }
186    }
187}
188
189impl From<Declare> for NetworkMessage {
190    fn from(declare: Declare) -> Self {
191        NetworkBody::Declare(declare).into()
192    }
193}
194
195impl From<Push> for NetworkMessage {
196    fn from(push: Push) -> Self {
197        NetworkBody::Push(push).into()
198    }
199}
200
201impl From<Request> for NetworkMessage {
202    fn from(request: Request) -> Self {
203        NetworkBody::Request(request).into()
204    }
205}
206
207impl From<Response> for NetworkMessage {
208    fn from(response: Response) -> Self {
209        NetworkBody::Response(response).into()
210    }
211}
212
213impl From<ResponseFinal> for NetworkMessage {
214    fn from(final_response: ResponseFinal) -> Self {
215        NetworkBody::ResponseFinal(final_response).into()
216    }
217}
218
219// Extensions
220pub mod ext {
221    use core::fmt;
222
223    use crate::{
224        common::{imsg, ZExtZ64},
225        core::{CongestionControl, EntityId, Priority, ZenohIdProto},
226    };
227
228    /// ```text
229    ///  7 6 5 4 3 2 1 0
230    /// +-+-+-+-+-+-+-+-+
231    /// |Z|0_1|    ID   |
232    /// +-+-+-+---------+
233    /// %0|rsv|E|D|prio %
234    /// +---------------+
235    ///
236    /// - prio: Priority class
237    /// - D:    Don't drop. Don't drop the message for congestion control.
238    /// - E:    Express. Don't batch this message.
239    /// - rsv:  Reserved
240    /// ```
241    #[repr(transparent)]
242    #[derive(Clone, Copy, PartialEq, Eq)]
243    pub struct QoSType<const ID: u8> {
244        inner: u8,
245    }
246
247    impl<const ID: u8> QoSType<{ ID }> {
248        const P_MASK: u8 = 0b00000111;
249        const D_FLAG: u8 = 0b00001000;
250        const E_FLAG: u8 = 0b00010000;
251
252        pub const DEFAULT: Self = Self::new(Priority::DEFAULT, CongestionControl::DEFAULT, false);
253
254        pub const DECLARE: Self =
255            Self::new(Priority::Control, CongestionControl::DEFAULT_DECLARE, false);
256        pub const PUSH: Self = Self::new(Priority::DEFAULT, CongestionControl::DEFAULT_PUSH, false);
257        pub const REQUEST: Self =
258            Self::new(Priority::DEFAULT, CongestionControl::DEFAULT_REQUEST, false);
259        pub const RESPONSE: Self = Self::new(
260            Priority::DEFAULT,
261            CongestionControl::DEFAULT_RESPONSE,
262            false,
263        );
264        pub const RESPONSE_FINAL: Self = Self::new(
265            Priority::DEFAULT,
266            CongestionControl::DEFAULT_RESPONSE,
267            false,
268        );
269        pub const OAM: Self = Self::new(Priority::Control, CongestionControl::DEFAULT_OAM, false);
270
271        pub const fn new(
272            priority: Priority,
273            congestion_control: CongestionControl,
274            is_express: bool,
275        ) -> Self {
276            let mut inner = priority as u8;
277            if let CongestionControl::Block = congestion_control {
278                inner |= Self::D_FLAG;
279            }
280            if is_express {
281                inner |= Self::E_FLAG;
282            }
283            Self { inner }
284        }
285
286        pub fn set_priority(&mut self, priority: Priority) {
287            self.inner = imsg::set_bitfield(self.inner, priority as u8, Self::P_MASK);
288        }
289
290        pub const fn get_priority(&self) -> Priority {
291            unsafe { core::mem::transmute(self.inner & Self::P_MASK) }
292        }
293
294        pub fn set_congestion_control(&mut self, cctrl: CongestionControl) {
295            match cctrl {
296                CongestionControl::Block => self.inner = imsg::set_flag(self.inner, Self::D_FLAG),
297                CongestionControl::Drop => self.inner = imsg::unset_flag(self.inner, Self::D_FLAG),
298            }
299        }
300
301        pub const fn get_congestion_control(&self) -> CongestionControl {
302            match imsg::has_flag(self.inner, Self::D_FLAG) {
303                true => CongestionControl::Block,
304                false => CongestionControl::Drop,
305            }
306        }
307
308        pub fn set_is_express(&mut self, is_express: bool) {
309            match is_express {
310                true => self.inner = imsg::set_flag(self.inner, Self::E_FLAG),
311                false => self.inner = imsg::unset_flag(self.inner, Self::E_FLAG),
312            }
313        }
314
315        pub const fn is_express(&self) -> bool {
316            imsg::has_flag(self.inner, Self::E_FLAG)
317        }
318
319        #[cfg(feature = "test")]
320        pub fn rand() -> Self {
321            use rand::Rng;
322            let mut rng = rand::thread_rng();
323
324            let inner: u8 = rng.gen();
325            Self { inner }
326        }
327    }
328
329    impl<const ID: u8> Default for QoSType<{ ID }> {
330        fn default() -> Self {
331            Self::new(Priority::DEFAULT, CongestionControl::DEFAULT, false)
332        }
333    }
334
335    impl<const ID: u8> From<ZExtZ64<{ ID }>> for QoSType<{ ID }> {
336        fn from(ext: ZExtZ64<{ ID }>) -> Self {
337            Self {
338                inner: ext.value as u8,
339            }
340        }
341    }
342
343    impl<const ID: u8> From<QoSType<{ ID }>> for ZExtZ64<{ ID }> {
344        fn from(ext: QoSType<{ ID }>) -> Self {
345            ZExtZ64::new(ext.inner as u64)
346        }
347    }
348
349    impl<const ID: u8> fmt::Debug for QoSType<{ ID }> {
350        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
351            f.debug_struct("QoS")
352                .field("priority", &self.get_priority())
353                .field("congestion", &self.get_congestion_control())
354                .field("express", &self.is_express())
355                .finish()
356        }
357    }
358
359    /// ```text
360    ///  7 6 5 4 3 2 1 0
361    /// +-+-+-+-+-+-+-+-+
362    /// |Z|1_0|    ID   |
363    /// +-+-+-+---------+
364    /// ~ ts: <u8;z16>  ~
365    /// +---------------+
366    /// ```
367    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
368    pub struct TimestampType<const ID: u8> {
369        pub timestamp: uhlc::Timestamp,
370    }
371
372    impl<const ID: u8> TimestampType<{ ID }> {
373        #[cfg(feature = "test")]
374        pub fn rand() -> Self {
375            use rand::Rng;
376            let mut rng = rand::thread_rng();
377
378            let time = uhlc::NTP64(rng.gen());
379            let id = uhlc::ID::try_from(ZenohIdProto::rand().to_le_bytes()).unwrap();
380            let timestamp = uhlc::Timestamp::new(time, id);
381            Self { timestamp }
382        }
383    }
384
385    /// ```text
386    ///  7 6 5 4 3 2 1 0
387    /// +-+-+-+-+-+-+-+-+
388    /// |Z|0_1|    ID   |
389    /// +-+-+-+---------+
390    /// %    node_id    %
391    /// +---------------+
392    /// ```
393    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
394    pub struct NodeIdType<const ID: u8> {
395        pub node_id: u16,
396    }
397
398    impl<const ID: u8> NodeIdType<{ ID }> {
399        // node_id == 0 means the message has been generated by the node itself
400        pub const DEFAULT: Self = Self { node_id: 0 };
401
402        #[cfg(feature = "test")]
403        pub fn rand() -> Self {
404            use rand::Rng;
405            let mut rng = rand::thread_rng();
406            let node_id = rng.gen();
407            Self { node_id }
408        }
409    }
410
411    impl<const ID: u8> Default for NodeIdType<{ ID }> {
412        fn default() -> Self {
413            Self::DEFAULT
414        }
415    }
416
417    impl<const ID: u8> From<ZExtZ64<{ ID }>> for NodeIdType<{ ID }> {
418        fn from(ext: ZExtZ64<{ ID }>) -> Self {
419            Self {
420                node_id: ext.value as u16,
421            }
422        }
423    }
424
425    impl<const ID: u8> From<NodeIdType<{ ID }>> for ZExtZ64<{ ID }> {
426        fn from(ext: NodeIdType<{ ID }>) -> Self {
427            ZExtZ64::new(ext.node_id as u64)
428        }
429    }
430
431    /// ```text
432    ///  7 6 5 4 3 2 1 0
433    /// +-+-+-+-+-+-+-+-+
434    /// |zid_len|X|X|X|X|
435    /// +-------+-+-+---+
436    /// ~      zid      ~
437    /// +---------------+
438    /// %      eid      %
439    /// +---------------+
440    /// ```
441    #[derive(Debug, Clone, PartialEq, Eq)]
442    pub struct EntityGlobalIdType<const ID: u8> {
443        pub zid: ZenohIdProto,
444        pub eid: EntityId,
445    }
446
447    impl<const ID: u8> EntityGlobalIdType<{ ID }> {
448        #[cfg(feature = "test")]
449        pub fn rand() -> Self {
450            use rand::Rng;
451            let mut rng = rand::thread_rng();
452
453            let zid = ZenohIdProto::rand();
454            let eid: EntityId = rng.gen();
455            Self { zid, eid }
456        }
457    }
458}