zenoh_protocol/network/
mod.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14pub mod declare;
15pub mod interest;
16pub mod oam;
17pub mod push;
18pub mod request;
19pub mod response;
20
21use core::fmt;
22
23pub use declare::{
24    Declare, DeclareBody, DeclareFinal, DeclareKeyExpr, DeclareQueryable, DeclareSubscriber,
25    DeclareToken, UndeclareKeyExpr, UndeclareQueryable, UndeclareSubscriber, UndeclareToken,
26};
27pub use interest::Interest;
28pub use oam::Oam;
29pub use push::Push;
30pub use request::{AtomicRequestId, Request, RequestId};
31pub use response::{Response, ResponseFinal};
32
33use crate::core::{CongestionControl, Priority, Reliability, WireExpr};
34
35pub mod id {
36    // WARNING: it's crucial that these IDs do NOT collide with the IDs
37    //          defined in `crate::transport::id`.
38    pub const OAM: u8 = 0x1f;
39    pub const DECLARE: u8 = 0x1e;
40    pub const PUSH: u8 = 0x1d;
41    pub const REQUEST: u8 = 0x1c;
42    pub const RESPONSE: u8 = 0x1b;
43    pub const RESPONSE_FINAL: u8 = 0x1a;
44    pub const INTEREST: u8 = 0x19;
45}
46
47#[repr(u8)]
48#[derive(Debug, Default, Clone, Copy, Hash, PartialEq, Eq)]
49pub enum Mapping {
50    #[default]
51    Receiver = 0,
52    Sender = 1,
53}
54
55impl Mapping {
56    pub const DEFAULT: Self = Self::Receiver;
57
58    #[cfg(feature = "test")]
59    pub fn rand() -> Self {
60        use rand::Rng;
61
62        let mut rng = rand::thread_rng();
63        if rng.gen_bool(0.5) {
64            Mapping::Sender
65        } else {
66            Mapping::Receiver
67        }
68    }
69}
70
71// Zenoh messages at zenoh-network level
72#[derive(Debug, Clone, PartialEq, Eq)]
73pub enum NetworkBody {
74    Push(Push),
75    Request(Request),
76    Response(Response),
77    ResponseFinal(ResponseFinal),
78    Interest(Interest),
79    Declare(Declare),
80    OAM(Oam),
81}
82
83#[derive(Debug, Copy, Clone, PartialEq, Eq)]
84pub enum NetworkBodyRef<'a> {
85    Push(&'a Push),
86    Request(&'a Request),
87    Response(&'a Response),
88    ResponseFinal(&'a ResponseFinal),
89    Interest(&'a Interest),
90    Declare(&'a Declare),
91    OAM(&'a Oam),
92}
93
94#[derive(Debug, PartialEq, Eq)]
95pub enum NetworkBodyMut<'a> {
96    Push(&'a mut Push),
97    Request(&'a mut Request),
98    Response(&'a mut Response),
99    ResponseFinal(&'a mut ResponseFinal),
100    Interest(&'a mut Interest),
101    Declare(&'a mut Declare),
102    OAM(&'a mut Oam),
103}
104
105#[derive(Debug, Clone, PartialEq, Eq)]
106pub struct NetworkMessage {
107    pub body: NetworkBody,
108    pub reliability: Reliability,
109}
110
111#[derive(Debug, Copy, Clone, PartialEq, Eq)]
112pub struct NetworkMessageRef<'a> {
113    pub body: NetworkBodyRef<'a>,
114    pub reliability: Reliability,
115}
116
117#[derive(Debug, PartialEq, Eq)]
118pub struct NetworkMessageMut<'a> {
119    pub body: NetworkBodyMut<'a>,
120    pub reliability: Reliability,
121}
122
123pub trait NetworkMessageExt {
124    #[doc(hidden)]
125    fn body(&self) -> NetworkBodyRef;
126    #[doc(hidden)]
127    fn reliability(&self) -> Reliability;
128
129    #[inline]
130    fn is_reliable(&self) -> bool {
131        self.reliability() == Reliability::Reliable
132    }
133
134    #[inline]
135    fn is_express(&self) -> bool {
136        match self.body() {
137            NetworkBodyRef::Push(msg) => msg.ext_qos.is_express(),
138            NetworkBodyRef::Request(msg) => msg.ext_qos.is_express(),
139            NetworkBodyRef::Response(msg) => msg.ext_qos.is_express(),
140            NetworkBodyRef::ResponseFinal(msg) => msg.ext_qos.is_express(),
141            NetworkBodyRef::Interest(msg) => msg.ext_qos.is_express(),
142            NetworkBodyRef::Declare(msg) => msg.ext_qos.is_express(),
143            NetworkBodyRef::OAM(msg) => msg.ext_qos.is_express(),
144        }
145    }
146
147    #[inline]
148    fn is_droppable(&self) -> bool {
149        if !self.is_reliable() {
150            return true;
151        }
152
153        let cc = match self.body() {
154            NetworkBodyRef::Push(msg) => msg.ext_qos.get_congestion_control(),
155            NetworkBodyRef::Request(msg) => msg.ext_qos.get_congestion_control(),
156            NetworkBodyRef::Response(msg) => msg.ext_qos.get_congestion_control(),
157            NetworkBodyRef::ResponseFinal(msg) => msg.ext_qos.get_congestion_control(),
158            NetworkBodyRef::Interest(msg) => msg.ext_qos.get_congestion_control(),
159            NetworkBodyRef::Declare(msg) => msg.ext_qos.get_congestion_control(),
160            NetworkBodyRef::OAM(msg) => msg.ext_qos.get_congestion_control(),
161        };
162
163        cc == CongestionControl::Drop
164    }
165
166    #[inline]
167    fn priority(&self) -> Priority {
168        match self.body() {
169            NetworkBodyRef::Push(msg) => msg.ext_qos.get_priority(),
170            NetworkBodyRef::Request(msg) => msg.ext_qos.get_priority(),
171            NetworkBodyRef::Response(msg) => msg.ext_qos.get_priority(),
172            NetworkBodyRef::ResponseFinal(msg) => msg.ext_qos.get_priority(),
173            NetworkBodyRef::Interest(msg) => msg.ext_qos.get_priority(),
174            NetworkBodyRef::Declare(msg) => msg.ext_qos.get_priority(),
175            NetworkBodyRef::OAM(msg) => msg.ext_qos.get_priority(),
176        }
177    }
178
179    #[inline]
180    fn wire_expr(&self) -> Option<&WireExpr> {
181        match &self.body() {
182            NetworkBodyRef::Push(m) => Some(&m.wire_expr),
183            NetworkBodyRef::Request(m) => Some(&m.wire_expr),
184            NetworkBodyRef::Response(m) => Some(&m.wire_expr),
185            NetworkBodyRef::ResponseFinal(_) => None,
186            NetworkBodyRef::Interest(m) => m.wire_expr.as_ref(),
187            NetworkBodyRef::Declare(m) => match &m.body {
188                DeclareBody::DeclareKeyExpr(m) => Some(&m.wire_expr),
189                DeclareBody::UndeclareKeyExpr(_) => None,
190                DeclareBody::DeclareSubscriber(m) => Some(&m.wire_expr),
191                DeclareBody::UndeclareSubscriber(m) => Some(&m.ext_wire_expr.wire_expr),
192                DeclareBody::DeclareQueryable(m) => Some(&m.wire_expr),
193                DeclareBody::UndeclareQueryable(m) => Some(&m.ext_wire_expr.wire_expr),
194                DeclareBody::DeclareToken(m) => Some(&m.wire_expr),
195                DeclareBody::UndeclareToken(m) => Some(&m.ext_wire_expr.wire_expr),
196                DeclareBody::DeclareFinal(_) => None,
197            },
198            NetworkBodyRef::OAM(_) => None,
199        }
200    }
201
202    #[inline]
203    fn as_ref(&self) -> NetworkMessageRef {
204        NetworkMessageRef {
205            body: self.body(),
206            reliability: self.reliability(),
207        }
208    }
209}
210
211impl NetworkMessageExt for NetworkMessage {
212    fn body(&self) -> NetworkBodyRef {
213        match &self.body {
214            NetworkBody::Push(body) => NetworkBodyRef::Push(body),
215            NetworkBody::Request(body) => NetworkBodyRef::Request(body),
216            NetworkBody::Response(body) => NetworkBodyRef::Response(body),
217            NetworkBody::ResponseFinal(body) => NetworkBodyRef::ResponseFinal(body),
218            NetworkBody::Interest(body) => NetworkBodyRef::Interest(body),
219            NetworkBody::Declare(body) => NetworkBodyRef::Declare(body),
220            NetworkBody::OAM(body) => NetworkBodyRef::OAM(body),
221        }
222    }
223
224    fn reliability(&self) -> Reliability {
225        self.reliability
226    }
227}
228
229impl NetworkMessageExt for NetworkMessageRef<'_> {
230    fn body(&self) -> NetworkBodyRef {
231        self.body
232    }
233
234    fn reliability(&self) -> Reliability {
235        self.reliability
236    }
237}
238
239impl NetworkMessageExt for NetworkMessageMut<'_> {
240    fn body(&self) -> NetworkBodyRef {
241        match &self.body {
242            NetworkBodyMut::Push(body) => NetworkBodyRef::Push(body),
243            NetworkBodyMut::Request(body) => NetworkBodyRef::Request(body),
244            NetworkBodyMut::Response(body) => NetworkBodyRef::Response(body),
245            NetworkBodyMut::ResponseFinal(body) => NetworkBodyRef::ResponseFinal(body),
246            NetworkBodyMut::Interest(body) => NetworkBodyRef::Interest(body),
247            NetworkBodyMut::Declare(body) => NetworkBodyRef::Declare(body),
248            NetworkBodyMut::OAM(body) => NetworkBodyRef::OAM(body),
249        }
250    }
251
252    fn reliability(&self) -> Reliability {
253        self.reliability
254    }
255}
256
257impl NetworkMessage {
258    #[cfg(feature = "test")]
259    pub fn rand() -> Self {
260        use rand::Rng;
261
262        let mut rng = rand::thread_rng();
263
264        let body = match rng.gen_range(0..6) {
265            0 => NetworkBody::Push(Push::rand()),
266            1 => NetworkBody::Request(Request::rand()),
267            2 => NetworkBody::Response(Response::rand()),
268            3 => NetworkBody::ResponseFinal(ResponseFinal::rand()),
269            4 => NetworkBody::Declare(Declare::rand()),
270            5 => NetworkBody::OAM(Oam::rand()),
271            _ => unreachable!(),
272        };
273
274        body.into()
275    }
276
277    #[inline]
278    pub fn as_mut(&mut self) -> NetworkMessageMut {
279        let body = match &mut self.body {
280            NetworkBody::Push(body) => NetworkBodyMut::Push(body),
281            NetworkBody::Request(body) => NetworkBodyMut::Request(body),
282            NetworkBody::Response(body) => NetworkBodyMut::Response(body),
283            NetworkBody::ResponseFinal(body) => NetworkBodyMut::ResponseFinal(body),
284            NetworkBody::Interest(body) => NetworkBodyMut::Interest(body),
285            NetworkBody::Declare(body) => NetworkBodyMut::Declare(body),
286            NetworkBody::OAM(body) => NetworkBodyMut::OAM(body),
287        };
288        NetworkMessageMut {
289            body,
290            reliability: self.reliability,
291        }
292    }
293}
294
295impl NetworkMessageMut<'_> {
296    #[inline]
297    pub fn as_mut(&mut self) -> NetworkMessageMut {
298        let body = match &mut self.body {
299            NetworkBodyMut::Push(body) => NetworkBodyMut::Push(body),
300            NetworkBodyMut::Request(body) => NetworkBodyMut::Request(body),
301            NetworkBodyMut::Response(body) => NetworkBodyMut::Response(body),
302            NetworkBodyMut::ResponseFinal(body) => NetworkBodyMut::ResponseFinal(body),
303            NetworkBodyMut::Interest(body) => NetworkBodyMut::Interest(body),
304            NetworkBodyMut::Declare(body) => NetworkBodyMut::Declare(body),
305            NetworkBodyMut::OAM(body) => NetworkBodyMut::OAM(body),
306        };
307        NetworkMessageMut {
308            body,
309            reliability: self.reliability,
310        }
311    }
312}
313
314impl fmt::Display for NetworkMessageRef<'_> {
315    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
316        match &self.body {
317            NetworkBodyRef::OAM(_) => write!(f, "OAM"),
318            NetworkBodyRef::Push(_) => write!(f, "Push"),
319            NetworkBodyRef::Request(_) => write!(f, "Request"),
320            NetworkBodyRef::Response(_) => write!(f, "Response"),
321            NetworkBodyRef::ResponseFinal(_) => write!(f, "ResponseFinal"),
322            NetworkBodyRef::Interest(_) => write!(f, "Interest"),
323            NetworkBodyRef::Declare(_) => write!(f, "Declare"),
324        }
325    }
326}
327
328impl fmt::Display for NetworkMessage {
329    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
330        self.as_ref().fmt(f)
331    }
332}
333
334impl fmt::Display for NetworkMessageMut<'_> {
335    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
336        self.as_ref().fmt(f)
337    }
338}
339
340impl From<NetworkBody> for NetworkMessage {
341    #[inline]
342    fn from(body: NetworkBody) -> Self {
343        Self {
344            body,
345            reliability: Reliability::DEFAULT,
346        }
347    }
348}
349
350impl From<Declare> for NetworkMessage {
351    fn from(declare: Declare) -> Self {
352        NetworkBody::Declare(declare).into()
353    }
354}
355
356impl From<Push> for NetworkMessage {
357    fn from(push: Push) -> Self {
358        NetworkBody::Push(push).into()
359    }
360}
361
362impl From<Request> for NetworkMessage {
363    fn from(request: Request) -> Self {
364        NetworkBody::Request(request).into()
365    }
366}
367
368impl From<Response> for NetworkMessage {
369    fn from(response: Response) -> Self {
370        NetworkBody::Response(response).into()
371    }
372}
373
374impl From<ResponseFinal> for NetworkMessage {
375    fn from(final_response: ResponseFinal) -> Self {
376        NetworkBody::ResponseFinal(final_response).into()
377    }
378}
379
380// Extensions
381pub mod ext {
382    use core::fmt;
383
384    use crate::{
385        common::{imsg, ZExtZ64},
386        core::{CongestionControl, EntityId, Priority, ZenohIdProto},
387    };
388
389    /// ```text
390    ///  7 6 5 4 3 2 1 0
391    /// +-+-+-+-+-+-+-+-+
392    /// |Z|0_1|    ID   |
393    /// +-+-+-+---------+
394    /// %0|rsv|E|D|prio %
395    /// +---------------+
396    ///
397    /// - prio: Priority class
398    /// - D:    Don't drop. Don't drop the message for congestion control.
399    /// - E:    Express. Don't batch this message.
400    /// - rsv:  Reserved
401    /// ```
402    #[repr(transparent)]
403    #[derive(Clone, Copy, PartialEq, Eq)]
404    pub struct QoSType<const ID: u8> {
405        inner: u8,
406    }
407
408    impl<const ID: u8> QoSType<{ ID }> {
409        const P_MASK: u8 = 0b00000111;
410        const D_FLAG: u8 = 0b00001000;
411        const E_FLAG: u8 = 0b00010000;
412
413        pub const DEFAULT: Self = Self::new(Priority::DEFAULT, CongestionControl::DEFAULT, false);
414
415        pub const DECLARE: Self =
416            Self::new(Priority::Control, CongestionControl::DEFAULT_DECLARE, false);
417        pub const PUSH: Self = Self::new(Priority::DEFAULT, CongestionControl::DEFAULT_PUSH, false);
418        pub const REQUEST: Self =
419            Self::new(Priority::DEFAULT, CongestionControl::DEFAULT_REQUEST, false);
420        pub const RESPONSE: Self = Self::new(
421            Priority::DEFAULT,
422            CongestionControl::DEFAULT_RESPONSE,
423            false,
424        );
425        pub const RESPONSE_FINAL: Self = Self::new(
426            Priority::DEFAULT,
427            CongestionControl::DEFAULT_RESPONSE,
428            false,
429        );
430        pub const OAM: Self = Self::new(Priority::Control, CongestionControl::DEFAULT_OAM, false);
431
432        pub const fn new(
433            priority: Priority,
434            congestion_control: CongestionControl,
435            is_express: bool,
436        ) -> Self {
437            let mut inner = priority as u8;
438            if let CongestionControl::Block = congestion_control {
439                inner |= Self::D_FLAG;
440            }
441            if is_express {
442                inner |= Self::E_FLAG;
443            }
444            Self { inner }
445        }
446
447        pub fn set_priority(&mut self, priority: Priority) {
448            self.inner = imsg::set_bitfield(self.inner, priority as u8, Self::P_MASK);
449        }
450
451        pub const fn get_priority(&self) -> Priority {
452            unsafe { core::mem::transmute(self.inner & Self::P_MASK) }
453        }
454
455        pub fn set_congestion_control(&mut self, cctrl: CongestionControl) {
456            match cctrl {
457                CongestionControl::Block => self.inner = imsg::set_flag(self.inner, Self::D_FLAG),
458                CongestionControl::Drop => self.inner = imsg::unset_flag(self.inner, Self::D_FLAG),
459            }
460        }
461
462        pub const fn get_congestion_control(&self) -> CongestionControl {
463            match imsg::has_flag(self.inner, Self::D_FLAG) {
464                true => CongestionControl::Block,
465                false => CongestionControl::Drop,
466            }
467        }
468
469        pub fn set_is_express(&mut self, is_express: bool) {
470            match is_express {
471                true => self.inner = imsg::set_flag(self.inner, Self::E_FLAG),
472                false => self.inner = imsg::unset_flag(self.inner, Self::E_FLAG),
473            }
474        }
475
476        pub const fn is_express(&self) -> bool {
477            imsg::has_flag(self.inner, Self::E_FLAG)
478        }
479
480        #[cfg(feature = "test")]
481        pub fn rand() -> Self {
482            use rand::Rng;
483            let mut rng = rand::thread_rng();
484
485            let inner: u8 = rng.gen();
486            Self { inner }
487        }
488    }
489
490    impl<const ID: u8> Default for QoSType<{ ID }> {
491        fn default() -> Self {
492            Self::new(Priority::DEFAULT, CongestionControl::DEFAULT, false)
493        }
494    }
495
496    impl<const ID: u8> From<ZExtZ64<{ ID }>> for QoSType<{ ID }> {
497        fn from(ext: ZExtZ64<{ ID }>) -> Self {
498            Self {
499                inner: ext.value as u8,
500            }
501        }
502    }
503
504    impl<const ID: u8> From<QoSType<{ ID }>> for ZExtZ64<{ ID }> {
505        fn from(ext: QoSType<{ ID }>) -> Self {
506            ZExtZ64::new(ext.inner as u64)
507        }
508    }
509
510    impl<const ID: u8> fmt::Debug for QoSType<{ ID }> {
511        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
512            f.debug_struct("QoS")
513                .field("priority", &self.get_priority())
514                .field("congestion", &self.get_congestion_control())
515                .field("express", &self.is_express())
516                .finish()
517        }
518    }
519
520    /// ```text
521    ///  7 6 5 4 3 2 1 0
522    /// +-+-+-+-+-+-+-+-+
523    /// |Z|1_0|    ID   |
524    /// +-+-+-+---------+
525    /// ~ ts: <u8;z16>  ~
526    /// +---------------+
527    /// ```
528    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
529    pub struct TimestampType<const ID: u8> {
530        pub timestamp: uhlc::Timestamp,
531    }
532
533    impl<const ID: u8> TimestampType<{ ID }> {
534        #[cfg(feature = "test")]
535        pub fn rand() -> Self {
536            use rand::Rng;
537            let mut rng = rand::thread_rng();
538
539            let time = uhlc::NTP64(rng.gen());
540            let id = uhlc::ID::try_from(ZenohIdProto::rand().to_le_bytes()).unwrap();
541            let timestamp = uhlc::Timestamp::new(time, id);
542            Self { timestamp }
543        }
544    }
545
546    /// ```text
547    ///  7 6 5 4 3 2 1 0
548    /// +-+-+-+-+-+-+-+-+
549    /// |Z|0_1|    ID   |
550    /// +-+-+-+---------+
551    /// %    node_id    %
552    /// +---------------+
553    /// ```
554    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
555    pub struct NodeIdType<const ID: u8> {
556        pub node_id: u16,
557    }
558
559    impl<const ID: u8> NodeIdType<{ ID }> {
560        // node_id == 0 means the message has been generated by the node itself
561        pub const DEFAULT: Self = Self { node_id: 0 };
562
563        #[cfg(feature = "test")]
564        pub fn rand() -> Self {
565            use rand::Rng;
566            let mut rng = rand::thread_rng();
567            let node_id = rng.gen();
568            Self { node_id }
569        }
570    }
571
572    impl<const ID: u8> Default for NodeIdType<{ ID }> {
573        fn default() -> Self {
574            Self::DEFAULT
575        }
576    }
577
578    impl<const ID: u8> From<ZExtZ64<{ ID }>> for NodeIdType<{ ID }> {
579        fn from(ext: ZExtZ64<{ ID }>) -> Self {
580            Self {
581                node_id: ext.value as u16,
582            }
583        }
584    }
585
586    impl<const ID: u8> From<NodeIdType<{ ID }>> for ZExtZ64<{ ID }> {
587        fn from(ext: NodeIdType<{ ID }>) -> Self {
588            ZExtZ64::new(ext.node_id as u64)
589        }
590    }
591
592    /// ```text
593    ///  7 6 5 4 3 2 1 0
594    /// +-+-+-+-+-+-+-+-+
595    /// |zid_len|X|X|X|X|
596    /// +-------+-+-+---+
597    /// ~      zid      ~
598    /// +---------------+
599    /// %      eid      %
600    /// +---------------+
601    /// ```
602    #[derive(Debug, Clone, PartialEq, Eq)]
603    pub struct EntityGlobalIdType<const ID: u8> {
604        pub zid: ZenohIdProto,
605        pub eid: EntityId,
606    }
607
608    impl<const ID: u8> EntityGlobalIdType<{ ID }> {
609        #[cfg(feature = "test")]
610        pub fn rand() -> Self {
611            use rand::Rng;
612            let mut rng = rand::thread_rng();
613
614            let zid = ZenohIdProto::rand();
615            let eid: EntityId = rng.gen();
616            Self { zid, eid }
617        }
618    }
619}