zenoh_protocol/network/
mod.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14pub mod declare;
15pub mod interest;
16pub mod oam;
17pub mod push;
18pub mod request;
19pub mod response;
20
21use core::fmt;
22
23pub use declare::{
24    Declare, DeclareBody, DeclareFinal, DeclareKeyExpr, DeclareQueryable, DeclareSubscriber,
25    DeclareToken, UndeclareKeyExpr, UndeclareQueryable, UndeclareSubscriber, UndeclareToken,
26};
27pub use interest::Interest;
28pub use oam::Oam;
29pub use push::Push;
30pub use request::{AtomicRequestId, Request, RequestId};
31pub use response::{Response, ResponseFinal};
32
33use crate::core::{CongestionControl, Priority, Reliability, WireExpr};
34
35pub mod id {
36    // WARNING: it's crucial that these IDs do NOT collide with the IDs
37    //          defined in `crate::transport::id`.
38    pub const OAM: u8 = 0x1f;
39    pub const DECLARE: u8 = 0x1e;
40    pub const PUSH: u8 = 0x1d;
41    pub const REQUEST: u8 = 0x1c;
42    pub const RESPONSE: u8 = 0x1b;
43    pub const RESPONSE_FINAL: u8 = 0x1a;
44    pub const INTEREST: u8 = 0x19;
45}
46
47#[repr(u8)]
48#[derive(Debug, Default, Clone, Copy, Hash, PartialEq, Eq)]
49pub enum Mapping {
50    #[default]
51    Receiver = 0,
52    Sender = 1,
53}
54
55impl Mapping {
56    pub const DEFAULT: Self = Self::Receiver;
57
58    #[cfg(feature = "test")]
59    pub fn rand() -> Self {
60        use rand::Rng;
61
62        let mut rng = rand::thread_rng();
63        if rng.gen_bool(0.5) {
64            Mapping::Sender
65        } else {
66            Mapping::Receiver
67        }
68    }
69}
70
71// Zenoh messages at zenoh-network level
72#[derive(Debug, Clone, PartialEq, Eq)]
73pub enum NetworkBody {
74    Push(Push),
75    Request(Request),
76    Response(Response),
77    ResponseFinal(ResponseFinal),
78    Interest(Interest),
79    Declare(Declare),
80    OAM(Oam),
81}
82
83#[derive(Debug, Copy, Clone, PartialEq, Eq)]
84pub enum NetworkBodyRef<'a> {
85    Push(&'a Push),
86    Request(&'a Request),
87    Response(&'a Response),
88    ResponseFinal(&'a ResponseFinal),
89    Interest(&'a Interest),
90    Declare(&'a Declare),
91    OAM(&'a Oam),
92}
93
94#[derive(Debug, PartialEq, Eq)]
95pub enum NetworkBodyMut<'a> {
96    Push(&'a mut Push),
97    Request(&'a mut Request),
98    Response(&'a mut Response),
99    ResponseFinal(&'a mut ResponseFinal),
100    Interest(&'a mut Interest),
101    Declare(&'a mut Declare),
102    OAM(&'a mut Oam),
103}
104
105#[derive(Debug, Clone, PartialEq, Eq)]
106pub struct NetworkMessage {
107    pub body: NetworkBody,
108    pub reliability: Reliability,
109}
110
111#[derive(Debug, Copy, Clone, PartialEq, Eq)]
112pub struct NetworkMessageRef<'a> {
113    pub body: NetworkBodyRef<'a>,
114    pub reliability: Reliability,
115}
116
117#[derive(Debug, PartialEq, Eq)]
118pub struct NetworkMessageMut<'a> {
119    pub body: NetworkBodyMut<'a>,
120    pub reliability: Reliability,
121}
122
123pub trait NetworkMessageExt {
124    #[doc(hidden)]
125    fn body(&self) -> NetworkBodyRef<'_>;
126
127    #[doc(hidden)]
128    fn reliability(&self) -> Reliability;
129
130    #[inline]
131    fn is_reliable(&self) -> bool {
132        self.reliability() == Reliability::Reliable
133    }
134
135    #[inline]
136    fn is_express(&self) -> bool {
137        match self.body() {
138            NetworkBodyRef::Push(msg) => msg.ext_qos.is_express(),
139            NetworkBodyRef::Request(msg) => msg.ext_qos.is_express(),
140            NetworkBodyRef::Response(msg) => msg.ext_qos.is_express(),
141            NetworkBodyRef::ResponseFinal(msg) => msg.ext_qos.is_express(),
142            NetworkBodyRef::Interest(msg) => msg.ext_qos.is_express(),
143            NetworkBodyRef::Declare(msg) => msg.ext_qos.is_express(),
144            NetworkBodyRef::OAM(msg) => msg.ext_qos.is_express(),
145        }
146    }
147
148    #[inline]
149    fn congestion_control(&self) -> CongestionControl {
150        match self.body() {
151            NetworkBodyRef::Push(msg) => msg.ext_qos.get_congestion_control(),
152            NetworkBodyRef::Request(msg) => msg.ext_qos.get_congestion_control(),
153            NetworkBodyRef::Response(msg) => msg.ext_qos.get_congestion_control(),
154            NetworkBodyRef::ResponseFinal(msg) => msg.ext_qos.get_congestion_control(),
155            NetworkBodyRef::Interest(msg) => msg.ext_qos.get_congestion_control(),
156            NetworkBodyRef::Declare(msg) => msg.ext_qos.get_congestion_control(),
157            NetworkBodyRef::OAM(msg) => msg.ext_qos.get_congestion_control(),
158        }
159    }
160
161    #[inline]
162    fn is_droppable(&self) -> bool {
163        !self.is_reliable() || self.congestion_control() == CongestionControl::Drop
164    }
165
166    #[inline]
167    fn priority(&self) -> Priority {
168        match self.body() {
169            NetworkBodyRef::Push(msg) => msg.ext_qos.get_priority(),
170            NetworkBodyRef::Request(msg) => msg.ext_qos.get_priority(),
171            NetworkBodyRef::Response(msg) => msg.ext_qos.get_priority(),
172            NetworkBodyRef::ResponseFinal(msg) => msg.ext_qos.get_priority(),
173            NetworkBodyRef::Interest(msg) => msg.ext_qos.get_priority(),
174            NetworkBodyRef::Declare(msg) => msg.ext_qos.get_priority(),
175            NetworkBodyRef::OAM(msg) => msg.ext_qos.get_priority(),
176        }
177    }
178
179    #[inline]
180    fn wire_expr(&self) -> Option<&WireExpr<'_>> {
181        match &self.body() {
182            NetworkBodyRef::Push(m) => Some(&m.wire_expr),
183            NetworkBodyRef::Request(m) => Some(&m.wire_expr),
184            NetworkBodyRef::Response(m) => Some(&m.wire_expr),
185            NetworkBodyRef::ResponseFinal(_) => None,
186            NetworkBodyRef::Interest(m) => m.wire_expr.as_ref(),
187            NetworkBodyRef::Declare(m) => match &m.body {
188                DeclareBody::DeclareKeyExpr(m) => Some(&m.wire_expr),
189                DeclareBody::UndeclareKeyExpr(_) => None,
190                DeclareBody::DeclareSubscriber(m) => Some(&m.wire_expr),
191                DeclareBody::UndeclareSubscriber(m) => Some(&m.ext_wire_expr.wire_expr),
192                DeclareBody::DeclareQueryable(m) => Some(&m.wire_expr),
193                DeclareBody::UndeclareQueryable(m) => Some(&m.ext_wire_expr.wire_expr),
194                DeclareBody::DeclareToken(m) => Some(&m.wire_expr),
195                DeclareBody::UndeclareToken(m) => Some(&m.ext_wire_expr.wire_expr),
196                DeclareBody::DeclareFinal(_) => None,
197            },
198            NetworkBodyRef::OAM(_) => None,
199        }
200    }
201
202    #[inline]
203    fn as_ref(&self) -> NetworkMessageRef<'_> {
204        NetworkMessageRef {
205            body: self.body(),
206            reliability: self.reliability(),
207        }
208    }
209
210    #[inline]
211    fn to_owned(&self) -> NetworkMessage {
212        NetworkMessage {
213            body: match self.body() {
214                NetworkBodyRef::Push(msg) => NetworkBody::Push(msg.clone()),
215                NetworkBodyRef::Request(msg) => NetworkBody::Request(msg.clone()),
216                NetworkBodyRef::Response(msg) => NetworkBody::Response(msg.clone()),
217                NetworkBodyRef::ResponseFinal(msg) => NetworkBody::ResponseFinal(msg.clone()),
218                NetworkBodyRef::Interest(msg) => NetworkBody::Interest(msg.clone()),
219                NetworkBodyRef::Declare(msg) => NetworkBody::Declare(msg.clone()),
220                NetworkBodyRef::OAM(msg) => NetworkBody::OAM(msg.clone()),
221            },
222            reliability: self.reliability(),
223        }
224    }
225}
226
227impl NetworkMessageExt for NetworkMessage {
228    fn body(&self) -> NetworkBodyRef<'_> {
229        match &self.body {
230            NetworkBody::Push(body) => NetworkBodyRef::Push(body),
231            NetworkBody::Request(body) => NetworkBodyRef::Request(body),
232            NetworkBody::Response(body) => NetworkBodyRef::Response(body),
233            NetworkBody::ResponseFinal(body) => NetworkBodyRef::ResponseFinal(body),
234            NetworkBody::Interest(body) => NetworkBodyRef::Interest(body),
235            NetworkBody::Declare(body) => NetworkBodyRef::Declare(body),
236            NetworkBody::OAM(body) => NetworkBodyRef::OAM(body),
237        }
238    }
239
240    fn reliability(&self) -> Reliability {
241        self.reliability
242    }
243}
244
245impl NetworkMessageExt for NetworkMessageRef<'_> {
246    fn body(&self) -> NetworkBodyRef<'_> {
247        self.body
248    }
249
250    fn reliability(&self) -> Reliability {
251        self.reliability
252    }
253}
254
255impl NetworkMessageExt for NetworkMessageMut<'_> {
256    fn body(&self) -> NetworkBodyRef<'_> {
257        match &self.body {
258            NetworkBodyMut::Push(body) => NetworkBodyRef::Push(body),
259            NetworkBodyMut::Request(body) => NetworkBodyRef::Request(body),
260            NetworkBodyMut::Response(body) => NetworkBodyRef::Response(body),
261            NetworkBodyMut::ResponseFinal(body) => NetworkBodyRef::ResponseFinal(body),
262            NetworkBodyMut::Interest(body) => NetworkBodyRef::Interest(body),
263            NetworkBodyMut::Declare(body) => NetworkBodyRef::Declare(body),
264            NetworkBodyMut::OAM(body) => NetworkBodyRef::OAM(body),
265        }
266    }
267
268    fn reliability(&self) -> Reliability {
269        self.reliability
270    }
271}
272
273impl NetworkMessage {
274    #[cfg(feature = "test")]
275    pub fn rand() -> Self {
276        use rand::Rng;
277
278        let mut rng = rand::thread_rng();
279
280        let body = match rng.gen_range(0..6) {
281            0 => NetworkBody::Push(Push::rand()),
282            1 => NetworkBody::Request(Request::rand()),
283            2 => NetworkBody::Response(Response::rand()),
284            3 => NetworkBody::ResponseFinal(ResponseFinal::rand()),
285            4 => NetworkBody::Declare(Declare::rand()),
286            5 => NetworkBody::OAM(Oam::rand()),
287            _ => unreachable!(),
288        };
289
290        body.into()
291    }
292
293    #[inline]
294    pub fn as_mut(&mut self) -> NetworkMessageMut<'_> {
295        let body = match &mut self.body {
296            NetworkBody::Push(body) => NetworkBodyMut::Push(body),
297            NetworkBody::Request(body) => NetworkBodyMut::Request(body),
298            NetworkBody::Response(body) => NetworkBodyMut::Response(body),
299            NetworkBody::ResponseFinal(body) => NetworkBodyMut::ResponseFinal(body),
300            NetworkBody::Interest(body) => NetworkBodyMut::Interest(body),
301            NetworkBody::Declare(body) => NetworkBodyMut::Declare(body),
302            NetworkBody::OAM(body) => NetworkBodyMut::OAM(body),
303        };
304        NetworkMessageMut {
305            body,
306            reliability: self.reliability,
307        }
308    }
309}
310
311impl NetworkMessageMut<'_> {
312    #[inline]
313    pub fn as_mut(&mut self) -> NetworkMessageMut<'_> {
314        let body = match &mut self.body {
315            NetworkBodyMut::Push(body) => NetworkBodyMut::Push(body),
316            NetworkBodyMut::Request(body) => NetworkBodyMut::Request(body),
317            NetworkBodyMut::Response(body) => NetworkBodyMut::Response(body),
318            NetworkBodyMut::ResponseFinal(body) => NetworkBodyMut::ResponseFinal(body),
319            NetworkBodyMut::Interest(body) => NetworkBodyMut::Interest(body),
320            NetworkBodyMut::Declare(body) => NetworkBodyMut::Declare(body),
321            NetworkBodyMut::OAM(body) => NetworkBodyMut::OAM(body),
322        };
323        NetworkMessageMut {
324            body,
325            reliability: self.reliability,
326        }
327    }
328}
329
330impl fmt::Display for NetworkMessageRef<'_> {
331    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
332        match &self.body {
333            NetworkBodyRef::OAM(_) => write!(f, "OAM"),
334            NetworkBodyRef::Push(_) => write!(f, "Push"),
335            NetworkBodyRef::Request(_) => write!(f, "Request"),
336            NetworkBodyRef::Response(_) => write!(f, "Response"),
337            NetworkBodyRef::ResponseFinal(_) => write!(f, "ResponseFinal"),
338            NetworkBodyRef::Interest(_) => write!(f, "Interest"),
339            NetworkBodyRef::Declare(_) => write!(f, "Declare"),
340        }
341    }
342}
343
344impl fmt::Display for NetworkMessage {
345    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
346        self.as_ref().fmt(f)
347    }
348}
349
350impl fmt::Display for NetworkMessageMut<'_> {
351    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
352        self.as_ref().fmt(f)
353    }
354}
355
356impl From<NetworkBody> for NetworkMessage {
357    #[inline]
358    fn from(body: NetworkBody) -> Self {
359        Self {
360            body,
361            reliability: Reliability::DEFAULT,
362        }
363    }
364}
365
366#[cfg(feature = "test")]
367impl From<Push> for NetworkMessage {
368    fn from(push: Push) -> Self {
369        NetworkBody::Push(push).into()
370    }
371}
372
373// Extensions
374pub mod ext {
375    use core::fmt;
376
377    use crate::{
378        common::{imsg, ZExtZ64},
379        core::{CongestionControl, EntityId, Priority, ZenohIdProto},
380    };
381
382    /// ```text
383    ///  7 6 5 4 3 2 1 0
384    /// +-+-+-+-+-+-+-+-+
385    /// |Z|0_1|    ID   |
386    /// +-+-+-+---------+
387    /// %0|r|F|E|D|prio %
388    /// +---------------+
389    ///
390    /// - prio: Priority class
391    /// - D:    Don't drop. Don't drop the message for congestion control.
392    /// - E:    Express. Don't batch this message.
393    /// - F:    Don't drop the first message for congestion control.
394    /// - r:  Reserved
395    /// ```
396    #[repr(transparent)]
397    #[derive(Clone, Copy, PartialEq, Eq)]
398    pub struct QoSType<const ID: u8> {
399        inner: u8,
400    }
401
402    impl<const ID: u8> QoSType<{ ID }> {
403        const P_MASK: u8 = 0b00000111;
404        const D_FLAG: u8 = 0b00001000;
405        const E_FLAG: u8 = 0b00010000;
406        const F_FLAG: u8 = 0b00100000;
407
408        pub const DEFAULT: Self = Self::new(Priority::DEFAULT, CongestionControl::DEFAULT, false);
409
410        pub const DECLARE: Self =
411            Self::new(Priority::Control, CongestionControl::DEFAULT_DECLARE, false);
412        pub const PUSH: Self = Self::new(Priority::DEFAULT, CongestionControl::DEFAULT_PUSH, false);
413        pub const REQUEST: Self =
414            Self::new(Priority::DEFAULT, CongestionControl::DEFAULT_REQUEST, false);
415        pub const RESPONSE: Self = Self::new(
416            Priority::DEFAULT,
417            CongestionControl::DEFAULT_RESPONSE,
418            false,
419        );
420        pub const RESPONSE_FINAL: Self = Self::new(
421            Priority::DEFAULT,
422            CongestionControl::DEFAULT_RESPONSE,
423            false,
424        );
425        pub const OAM: Self = Self::new(Priority::Control, CongestionControl::DEFAULT_OAM, false);
426
427        pub const fn new(
428            priority: Priority,
429            congestion_control: CongestionControl,
430            is_express: bool,
431        ) -> Self {
432            let mut inner = priority as u8;
433            match congestion_control {
434                CongestionControl::Block => inner |= Self::D_FLAG,
435                #[cfg(feature = "unstable")]
436                CongestionControl::BlockFirst => inner |= Self::F_FLAG,
437                _ => {}
438            }
439            if is_express {
440                inner |= Self::E_FLAG;
441            }
442            Self { inner }
443        }
444
445        pub fn set_priority(&mut self, priority: Priority) {
446            self.inner = imsg::set_bitfield(self.inner, priority as u8, Self::P_MASK);
447        }
448
449        pub const fn get_priority(&self) -> Priority {
450            unsafe { core::mem::transmute(self.inner & Self::P_MASK) }
451        }
452
453        pub fn set_congestion_control(&mut self, cctrl: CongestionControl) {
454            match cctrl {
455                CongestionControl::Block => {
456                    self.inner = imsg::set_flag(self.inner, Self::D_FLAG);
457                    self.inner = imsg::unset_flag(self.inner, Self::F_FLAG);
458                }
459                CongestionControl::Drop => {
460                    self.inner = imsg::unset_flag(self.inner, Self::D_FLAG);
461                    self.inner = imsg::unset_flag(self.inner, Self::F_FLAG);
462                }
463                #[cfg(feature = "unstable")]
464                CongestionControl::BlockFirst => {
465                    self.inner = imsg::unset_flag(self.inner, Self::D_FLAG);
466                    self.inner = imsg::set_flag(self.inner, Self::F_FLAG);
467                }
468            }
469        }
470
471        pub const fn get_congestion_control(&self) -> CongestionControl {
472            match (
473                imsg::has_flag(self.inner, Self::D_FLAG),
474                imsg::has_flag(self.inner, Self::F_FLAG),
475            ) {
476                (false, false) => CongestionControl::Drop,
477                #[cfg(feature = "unstable")]
478                (false, true) => CongestionControl::BlockFirst,
479                #[cfg(not(feature = "unstable"))]
480                (false, true) => CongestionControl::Drop,
481                (true, _) => CongestionControl::Block,
482            }
483        }
484
485        pub fn set_is_express(&mut self, is_express: bool) {
486            match is_express {
487                true => self.inner = imsg::set_flag(self.inner, Self::E_FLAG),
488                false => self.inner = imsg::unset_flag(self.inner, Self::E_FLAG),
489            }
490        }
491
492        pub const fn is_express(&self) -> bool {
493            imsg::has_flag(self.inner, Self::E_FLAG)
494        }
495
496        #[cfg(feature = "test")]
497        pub fn rand() -> Self {
498            use rand::Rng;
499            let mut rng = rand::thread_rng();
500
501            let inner: u8 = rng.gen();
502            Self { inner }
503        }
504    }
505
506    impl<const ID: u8> Default for QoSType<{ ID }> {
507        fn default() -> Self {
508            Self::new(Priority::DEFAULT, CongestionControl::DEFAULT, false)
509        }
510    }
511
512    impl<const ID: u8> From<ZExtZ64<{ ID }>> for QoSType<{ ID }> {
513        fn from(ext: ZExtZ64<{ ID }>) -> Self {
514            Self {
515                inner: ext.value as u8,
516            }
517        }
518    }
519
520    impl<const ID: u8> From<QoSType<{ ID }>> for ZExtZ64<{ ID }> {
521        fn from(ext: QoSType<{ ID }>) -> Self {
522            ZExtZ64::new(ext.inner as u64)
523        }
524    }
525
526    impl<const ID: u8> fmt::Debug for QoSType<{ ID }> {
527        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
528            f.debug_struct("QoS")
529                .field("priority", &self.get_priority())
530                .field("congestion", &self.get_congestion_control())
531                .field("express", &self.is_express())
532                .finish()
533        }
534    }
535
536    /// ```text
537    ///  7 6 5 4 3 2 1 0
538    /// +-+-+-+-+-+-+-+-+
539    /// |Z|1_0|    ID   |
540    /// +-+-+-+---------+
541    /// ~ ts: <u8;z16>  ~
542    /// +---------------+
543    /// ```
544    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
545    pub struct TimestampType<const ID: u8> {
546        pub timestamp: uhlc::Timestamp,
547    }
548
549    impl<const ID: u8> TimestampType<{ ID }> {
550        #[cfg(feature = "test")]
551        pub fn rand() -> Self {
552            use rand::Rng;
553            let mut rng = rand::thread_rng();
554
555            let time = uhlc::NTP64(rng.gen());
556            let id = uhlc::ID::try_from(ZenohIdProto::rand().to_le_bytes()).unwrap();
557            let timestamp = uhlc::Timestamp::new(time, id);
558            Self { timestamp }
559        }
560    }
561
562    /// ```text
563    ///  7 6 5 4 3 2 1 0
564    /// +-+-+-+-+-+-+-+-+
565    /// |Z|0_1|    ID   |
566    /// +-+-+-+---------+
567    /// %    node_id    %
568    /// +---------------+
569    /// ```
570    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
571    pub struct NodeIdType<const ID: u8> {
572        pub node_id: u16,
573    }
574
575    impl<const ID: u8> NodeIdType<{ ID }> {
576        // node_id == 0 means the message has been generated by the node itself
577        pub const DEFAULT: Self = Self { node_id: 0 };
578
579        #[cfg(feature = "test")]
580        pub fn rand() -> Self {
581            use rand::Rng;
582            let mut rng = rand::thread_rng();
583            let node_id = rng.gen();
584            Self { node_id }
585        }
586    }
587
588    impl<const ID: u8> Default for NodeIdType<{ ID }> {
589        fn default() -> Self {
590            Self::DEFAULT
591        }
592    }
593
594    impl<const ID: u8> From<ZExtZ64<{ ID }>> for NodeIdType<{ ID }> {
595        fn from(ext: ZExtZ64<{ ID }>) -> Self {
596            Self {
597                node_id: ext.value as u16,
598            }
599        }
600    }
601
602    impl<const ID: u8> From<NodeIdType<{ ID }>> for ZExtZ64<{ ID }> {
603        fn from(ext: NodeIdType<{ ID }>) -> Self {
604            ZExtZ64::new(ext.node_id as u64)
605        }
606    }
607
608    /// ```text
609    ///  7 6 5 4 3 2 1 0
610    /// +-+-+-+-+-+-+-+-+
611    /// |zid_len|X|X|X|X|
612    /// +-------+-+-+---+
613    /// ~      zid      ~
614    /// +---------------+
615    /// %      eid      %
616    /// +---------------+
617    /// ```
618    #[derive(Debug, Clone, PartialEq, Eq)]
619    pub struct EntityGlobalIdType<const ID: u8> {
620        pub zid: ZenohIdProto,
621        pub eid: EntityId,
622    }
623
624    impl<const ID: u8> EntityGlobalIdType<{ ID }> {
625        #[cfg(feature = "test")]
626        pub fn rand() -> Self {
627            use rand::Rng;
628            let mut rng = rand::thread_rng();
629
630            let zid = ZenohIdProto::rand();
631            let eid: EntityId = rng.gen();
632            Self { zid, eid }
633        }
634    }
635}