zenoh_protocol/network/
mod.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14pub mod declare;
15pub mod interest;
16pub mod oam;
17pub mod push;
18pub mod request;
19pub mod response;
20
21use core::fmt;
22
23pub use declare::{
24    Declare, DeclareBody, DeclareFinal, DeclareKeyExpr, DeclareQueryable, DeclareSubscriber,
25    DeclareToken, UndeclareKeyExpr, UndeclareQueryable, UndeclareSubscriber, UndeclareToken,
26};
27pub use interest::Interest;
28pub use oam::Oam;
29pub use push::Push;
30pub use request::{AtomicRequestId, Request, RequestId};
31pub use response::{Response, ResponseFinal};
32
33use crate::core::{CongestionControl, Priority, Reliability, WireExpr};
34
35pub mod id {
36    // WARNING: it's crucial that these IDs do NOT collide with the IDs
37    //          defined in `crate::transport::id`.
38    pub const OAM: u8 = 0x1f;
39    pub const DECLARE: u8 = 0x1e;
40    pub const PUSH: u8 = 0x1d;
41    pub const REQUEST: u8 = 0x1c;
42    pub const RESPONSE: u8 = 0x1b;
43    pub const RESPONSE_FINAL: u8 = 0x1a;
44    pub const INTEREST: u8 = 0x19;
45}
46
47#[repr(u8)]
48#[derive(Debug, Default, Clone, Copy, Hash, PartialEq, Eq)]
49pub enum Mapping {
50    #[default]
51    Receiver = 0,
52    Sender = 1,
53}
54
55impl Mapping {
56    pub const DEFAULT: Self = Self::Receiver;
57
58    #[cfg(feature = "test")]
59    pub fn rand() -> Self {
60        use rand::Rng;
61
62        let mut rng = rand::thread_rng();
63        if rng.gen_bool(0.5) {
64            Mapping::Sender
65        } else {
66            Mapping::Receiver
67        }
68    }
69}
70
71// Zenoh messages at zenoh-network level
72#[derive(Debug, Clone, PartialEq, Eq)]
73pub enum NetworkBody {
74    Push(Push),
75    Request(Request),
76    Response(Response),
77    ResponseFinal(ResponseFinal),
78    Interest(Interest),
79    Declare(Declare),
80    OAM(Oam),
81}
82
83#[derive(Debug, Copy, Clone, PartialEq, Eq)]
84pub enum NetworkBodyRef<'a> {
85    Push(&'a Push),
86    Request(&'a Request),
87    Response(&'a Response),
88    ResponseFinal(&'a ResponseFinal),
89    Interest(&'a Interest),
90    Declare(&'a Declare),
91    OAM(&'a Oam),
92}
93
94#[derive(Debug, PartialEq, Eq)]
95pub enum NetworkBodyMut<'a> {
96    Push(&'a mut Push),
97    Request(&'a mut Request),
98    Response(&'a mut Response),
99    ResponseFinal(&'a mut ResponseFinal),
100    Interest(&'a mut Interest),
101    Declare(&'a mut Declare),
102    OAM(&'a mut Oam),
103}
104
105#[derive(Debug, Clone, PartialEq, Eq)]
106pub struct NetworkMessage {
107    pub body: NetworkBody,
108    pub reliability: Reliability,
109    #[cfg(feature = "stats")]
110    pub size: Option<core::num::NonZeroUsize>,
111}
112
113#[derive(Debug, Copy, Clone, PartialEq, Eq)]
114pub struct NetworkMessageRef<'a> {
115    pub body: NetworkBodyRef<'a>,
116    pub reliability: Reliability,
117    #[cfg(feature = "stats")]
118    pub size: Option<core::num::NonZeroUsize>,
119}
120
121#[derive(Debug, PartialEq, Eq)]
122pub struct NetworkMessageMut<'a> {
123    pub body: NetworkBodyMut<'a>,
124    pub reliability: Reliability,
125    #[cfg(feature = "stats")]
126    pub size: Option<core::num::NonZeroUsize>,
127}
128
129pub trait NetworkMessageExt {
130    #[doc(hidden)]
131    fn body(&self) -> NetworkBodyRef;
132    #[doc(hidden)]
133    fn reliability(&self) -> Reliability;
134    #[cfg(feature = "stats")]
135    fn size(&self) -> Option<core::num::NonZeroUsize>;
136
137    #[inline]
138    fn is_reliable(&self) -> bool {
139        self.reliability() == Reliability::Reliable
140    }
141
142    #[inline]
143    fn is_express(&self) -> bool {
144        match self.body() {
145            NetworkBodyRef::Push(msg) => msg.ext_qos.is_express(),
146            NetworkBodyRef::Request(msg) => msg.ext_qos.is_express(),
147            NetworkBodyRef::Response(msg) => msg.ext_qos.is_express(),
148            NetworkBodyRef::ResponseFinal(msg) => msg.ext_qos.is_express(),
149            NetworkBodyRef::Interest(msg) => msg.ext_qos.is_express(),
150            NetworkBodyRef::Declare(msg) => msg.ext_qos.is_express(),
151            NetworkBodyRef::OAM(msg) => msg.ext_qos.is_express(),
152        }
153    }
154
155    #[inline]
156    fn is_droppable(&self) -> bool {
157        if !self.is_reliable() {
158            return true;
159        }
160
161        let cc = match self.body() {
162            NetworkBodyRef::Push(msg) => msg.ext_qos.get_congestion_control(),
163            NetworkBodyRef::Request(msg) => msg.ext_qos.get_congestion_control(),
164            NetworkBodyRef::Response(msg) => msg.ext_qos.get_congestion_control(),
165            NetworkBodyRef::ResponseFinal(msg) => msg.ext_qos.get_congestion_control(),
166            NetworkBodyRef::Interest(msg) => msg.ext_qos.get_congestion_control(),
167            NetworkBodyRef::Declare(msg) => msg.ext_qos.get_congestion_control(),
168            NetworkBodyRef::OAM(msg) => msg.ext_qos.get_congestion_control(),
169        };
170
171        cc == CongestionControl::Drop
172    }
173
174    #[inline]
175    fn priority(&self) -> Priority {
176        match self.body() {
177            NetworkBodyRef::Push(msg) => msg.ext_qos.get_priority(),
178            NetworkBodyRef::Request(msg) => msg.ext_qos.get_priority(),
179            NetworkBodyRef::Response(msg) => msg.ext_qos.get_priority(),
180            NetworkBodyRef::ResponseFinal(msg) => msg.ext_qos.get_priority(),
181            NetworkBodyRef::Interest(msg) => msg.ext_qos.get_priority(),
182            NetworkBodyRef::Declare(msg) => msg.ext_qos.get_priority(),
183            NetworkBodyRef::OAM(msg) => msg.ext_qos.get_priority(),
184        }
185    }
186
187    #[inline]
188    fn wire_expr(&self) -> Option<&WireExpr> {
189        match &self.body() {
190            NetworkBodyRef::Push(m) => Some(&m.wire_expr),
191            NetworkBodyRef::Request(m) => Some(&m.wire_expr),
192            NetworkBodyRef::Response(m) => Some(&m.wire_expr),
193            NetworkBodyRef::ResponseFinal(_) => None,
194            NetworkBodyRef::Interest(m) => m.wire_expr.as_ref(),
195            NetworkBodyRef::Declare(m) => match &m.body {
196                DeclareBody::DeclareKeyExpr(m) => Some(&m.wire_expr),
197                DeclareBody::UndeclareKeyExpr(_) => None,
198                DeclareBody::DeclareSubscriber(m) => Some(&m.wire_expr),
199                DeclareBody::UndeclareSubscriber(m) => Some(&m.ext_wire_expr.wire_expr),
200                DeclareBody::DeclareQueryable(m) => Some(&m.wire_expr),
201                DeclareBody::UndeclareQueryable(m) => Some(&m.ext_wire_expr.wire_expr),
202                DeclareBody::DeclareToken(m) => Some(&m.wire_expr),
203                DeclareBody::UndeclareToken(m) => Some(&m.ext_wire_expr.wire_expr),
204                DeclareBody::DeclareFinal(_) => None,
205            },
206            NetworkBodyRef::OAM(_) => None,
207        }
208    }
209
210    #[inline]
211    fn as_ref(&self) -> NetworkMessageRef {
212        NetworkMessageRef {
213            body: self.body(),
214            reliability: self.reliability(),
215            #[cfg(feature = "stats")]
216            size: self.size(),
217        }
218    }
219}
220
221impl NetworkMessageExt for NetworkMessage {
222    fn body(&self) -> NetworkBodyRef {
223        match &self.body {
224            NetworkBody::Push(body) => NetworkBodyRef::Push(body),
225            NetworkBody::Request(body) => NetworkBodyRef::Request(body),
226            NetworkBody::Response(body) => NetworkBodyRef::Response(body),
227            NetworkBody::ResponseFinal(body) => NetworkBodyRef::ResponseFinal(body),
228            NetworkBody::Interest(body) => NetworkBodyRef::Interest(body),
229            NetworkBody::Declare(body) => NetworkBodyRef::Declare(body),
230            NetworkBody::OAM(body) => NetworkBodyRef::OAM(body),
231        }
232    }
233
234    fn reliability(&self) -> Reliability {
235        self.reliability
236    }
237
238    #[cfg(feature = "stats")]
239    fn size(&self) -> Option<core::num::NonZeroUsize> {
240        self.size
241    }
242}
243
244impl NetworkMessageExt for NetworkMessageRef<'_> {
245    fn body(&self) -> NetworkBodyRef {
246        self.body
247    }
248
249    fn reliability(&self) -> Reliability {
250        self.reliability
251    }
252
253    #[cfg(feature = "stats")]
254    fn size(&self) -> Option<core::num::NonZeroUsize> {
255        self.size
256    }
257}
258
259impl NetworkMessageExt for NetworkMessageMut<'_> {
260    fn body(&self) -> NetworkBodyRef {
261        match &self.body {
262            NetworkBodyMut::Push(body) => NetworkBodyRef::Push(body),
263            NetworkBodyMut::Request(body) => NetworkBodyRef::Request(body),
264            NetworkBodyMut::Response(body) => NetworkBodyRef::Response(body),
265            NetworkBodyMut::ResponseFinal(body) => NetworkBodyRef::ResponseFinal(body),
266            NetworkBodyMut::Interest(body) => NetworkBodyRef::Interest(body),
267            NetworkBodyMut::Declare(body) => NetworkBodyRef::Declare(body),
268            NetworkBodyMut::OAM(body) => NetworkBodyRef::OAM(body),
269        }
270    }
271
272    fn reliability(&self) -> Reliability {
273        self.reliability
274    }
275
276    #[cfg(feature = "stats")]
277    fn size(&self) -> Option<core::num::NonZeroUsize> {
278        self.size
279    }
280}
281
282impl NetworkMessage {
283    #[cfg(feature = "test")]
284    pub fn rand() -> Self {
285        use rand::Rng;
286
287        let mut rng = rand::thread_rng();
288
289        let body = match rng.gen_range(0..6) {
290            0 => NetworkBody::Push(Push::rand()),
291            1 => NetworkBody::Request(Request::rand()),
292            2 => NetworkBody::Response(Response::rand()),
293            3 => NetworkBody::ResponseFinal(ResponseFinal::rand()),
294            4 => NetworkBody::Declare(Declare::rand()),
295            5 => NetworkBody::OAM(Oam::rand()),
296            _ => unreachable!(),
297        };
298
299        body.into()
300    }
301
302    #[inline]
303    pub fn as_mut(&mut self) -> NetworkMessageMut {
304        let body = match &mut self.body {
305            NetworkBody::Push(body) => NetworkBodyMut::Push(body),
306            NetworkBody::Request(body) => NetworkBodyMut::Request(body),
307            NetworkBody::Response(body) => NetworkBodyMut::Response(body),
308            NetworkBody::ResponseFinal(body) => NetworkBodyMut::ResponseFinal(body),
309            NetworkBody::Interest(body) => NetworkBodyMut::Interest(body),
310            NetworkBody::Declare(body) => NetworkBodyMut::Declare(body),
311            NetworkBody::OAM(body) => NetworkBodyMut::OAM(body),
312        };
313        NetworkMessageMut {
314            body,
315            reliability: self.reliability,
316            #[cfg(feature = "stats")]
317            size: self.size,
318        }
319    }
320}
321
322impl NetworkMessageMut<'_> {
323    #[inline]
324    pub fn as_mut(&mut self) -> NetworkMessageMut {
325        let body = match &mut self.body {
326            NetworkBodyMut::Push(body) => NetworkBodyMut::Push(body),
327            NetworkBodyMut::Request(body) => NetworkBodyMut::Request(body),
328            NetworkBodyMut::Response(body) => NetworkBodyMut::Response(body),
329            NetworkBodyMut::ResponseFinal(body) => NetworkBodyMut::ResponseFinal(body),
330            NetworkBodyMut::Interest(body) => NetworkBodyMut::Interest(body),
331            NetworkBodyMut::Declare(body) => NetworkBodyMut::Declare(body),
332            NetworkBodyMut::OAM(body) => NetworkBodyMut::OAM(body),
333        };
334        NetworkMessageMut {
335            body,
336            reliability: self.reliability,
337            #[cfg(feature = "stats")]
338            size: self.size,
339        }
340    }
341}
342
343impl fmt::Display for NetworkMessageRef<'_> {
344    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
345        match &self.body {
346            NetworkBodyRef::OAM(_) => write!(f, "OAM"),
347            NetworkBodyRef::Push(_) => write!(f, "Push"),
348            NetworkBodyRef::Request(_) => write!(f, "Request"),
349            NetworkBodyRef::Response(_) => write!(f, "Response"),
350            NetworkBodyRef::ResponseFinal(_) => write!(f, "ResponseFinal"),
351            NetworkBodyRef::Interest(_) => write!(f, "Interest"),
352            NetworkBodyRef::Declare(_) => write!(f, "Declare"),
353        }
354    }
355}
356
357impl fmt::Display for NetworkMessage {
358    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
359        self.as_ref().fmt(f)
360    }
361}
362
363impl fmt::Display for NetworkMessageMut<'_> {
364    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
365        self.as_ref().fmt(f)
366    }
367}
368
369impl From<NetworkBody> for NetworkMessage {
370    #[inline]
371    fn from(body: NetworkBody) -> Self {
372        Self {
373            body,
374            reliability: Reliability::DEFAULT,
375            #[cfg(feature = "stats")]
376            size: None,
377        }
378    }
379}
380
381impl From<Declare> for NetworkMessage {
382    fn from(declare: Declare) -> Self {
383        NetworkBody::Declare(declare).into()
384    }
385}
386
387impl From<Push> for NetworkMessage {
388    fn from(push: Push) -> Self {
389        NetworkBody::Push(push).into()
390    }
391}
392
393impl From<Request> for NetworkMessage {
394    fn from(request: Request) -> Self {
395        NetworkBody::Request(request).into()
396    }
397}
398
399impl From<Response> for NetworkMessage {
400    fn from(response: Response) -> Self {
401        NetworkBody::Response(response).into()
402    }
403}
404
405impl From<ResponseFinal> for NetworkMessage {
406    fn from(final_response: ResponseFinal) -> Self {
407        NetworkBody::ResponseFinal(final_response).into()
408    }
409}
410
411// Extensions
412pub mod ext {
413    use core::fmt;
414
415    use crate::{
416        common::{imsg, ZExtZ64},
417        core::{CongestionControl, EntityId, Priority, ZenohIdProto},
418    };
419
420    /// ```text
421    ///  7 6 5 4 3 2 1 0
422    /// +-+-+-+-+-+-+-+-+
423    /// |Z|0_1|    ID   |
424    /// +-+-+-+---------+
425    /// %0|rsv|E|D|prio %
426    /// +---------------+
427    ///
428    /// - prio: Priority class
429    /// - D:    Don't drop. Don't drop the message for congestion control.
430    /// - E:    Express. Don't batch this message.
431    /// - rsv:  Reserved
432    /// ```
433    #[repr(transparent)]
434    #[derive(Clone, Copy, PartialEq, Eq)]
435    pub struct QoSType<const ID: u8> {
436        inner: u8,
437    }
438
439    impl<const ID: u8> QoSType<{ ID }> {
440        const P_MASK: u8 = 0b00000111;
441        const D_FLAG: u8 = 0b00001000;
442        const E_FLAG: u8 = 0b00010000;
443
444        pub const DEFAULT: Self = Self::new(Priority::DEFAULT, CongestionControl::DEFAULT, false);
445
446        pub const DECLARE: Self =
447            Self::new(Priority::Control, CongestionControl::DEFAULT_DECLARE, false);
448        pub const PUSH: Self = Self::new(Priority::DEFAULT, CongestionControl::DEFAULT_PUSH, false);
449        pub const REQUEST: Self =
450            Self::new(Priority::DEFAULT, CongestionControl::DEFAULT_REQUEST, false);
451        pub const RESPONSE: Self = Self::new(
452            Priority::DEFAULT,
453            CongestionControl::DEFAULT_RESPONSE,
454            false,
455        );
456        pub const RESPONSE_FINAL: Self = Self::new(
457            Priority::DEFAULT,
458            CongestionControl::DEFAULT_RESPONSE,
459            false,
460        );
461        pub const OAM: Self = Self::new(Priority::Control, CongestionControl::DEFAULT_OAM, false);
462
463        pub const fn new(
464            priority: Priority,
465            congestion_control: CongestionControl,
466            is_express: bool,
467        ) -> Self {
468            let mut inner = priority as u8;
469            if let CongestionControl::Block = congestion_control {
470                inner |= Self::D_FLAG;
471            }
472            if is_express {
473                inner |= Self::E_FLAG;
474            }
475            Self { inner }
476        }
477
478        pub fn set_priority(&mut self, priority: Priority) {
479            self.inner = imsg::set_bitfield(self.inner, priority as u8, Self::P_MASK);
480        }
481
482        pub const fn get_priority(&self) -> Priority {
483            unsafe { core::mem::transmute(self.inner & Self::P_MASK) }
484        }
485
486        pub fn set_congestion_control(&mut self, cctrl: CongestionControl) {
487            match cctrl {
488                CongestionControl::Block => self.inner = imsg::set_flag(self.inner, Self::D_FLAG),
489                CongestionControl::Drop => self.inner = imsg::unset_flag(self.inner, Self::D_FLAG),
490            }
491        }
492
493        pub const fn get_congestion_control(&self) -> CongestionControl {
494            match imsg::has_flag(self.inner, Self::D_FLAG) {
495                true => CongestionControl::Block,
496                false => CongestionControl::Drop,
497            }
498        }
499
500        pub fn set_is_express(&mut self, is_express: bool) {
501            match is_express {
502                true => self.inner = imsg::set_flag(self.inner, Self::E_FLAG),
503                false => self.inner = imsg::unset_flag(self.inner, Self::E_FLAG),
504            }
505        }
506
507        pub const fn is_express(&self) -> bool {
508            imsg::has_flag(self.inner, Self::E_FLAG)
509        }
510
511        #[cfg(feature = "test")]
512        pub fn rand() -> Self {
513            use rand::Rng;
514            let mut rng = rand::thread_rng();
515
516            let inner: u8 = rng.gen();
517            Self { inner }
518        }
519    }
520
521    impl<const ID: u8> Default for QoSType<{ ID }> {
522        fn default() -> Self {
523            Self::new(Priority::DEFAULT, CongestionControl::DEFAULT, false)
524        }
525    }
526
527    impl<const ID: u8> From<ZExtZ64<{ ID }>> for QoSType<{ ID }> {
528        fn from(ext: ZExtZ64<{ ID }>) -> Self {
529            Self {
530                inner: ext.value as u8,
531            }
532        }
533    }
534
535    impl<const ID: u8> From<QoSType<{ ID }>> for ZExtZ64<{ ID }> {
536        fn from(ext: QoSType<{ ID }>) -> Self {
537            ZExtZ64::new(ext.inner as u64)
538        }
539    }
540
541    impl<const ID: u8> fmt::Debug for QoSType<{ ID }> {
542        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
543            f.debug_struct("QoS")
544                .field("priority", &self.get_priority())
545                .field("congestion", &self.get_congestion_control())
546                .field("express", &self.is_express())
547                .finish()
548        }
549    }
550
551    /// ```text
552    ///  7 6 5 4 3 2 1 0
553    /// +-+-+-+-+-+-+-+-+
554    /// |Z|1_0|    ID   |
555    /// +-+-+-+---------+
556    /// ~ ts: <u8;z16>  ~
557    /// +---------------+
558    /// ```
559    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
560    pub struct TimestampType<const ID: u8> {
561        pub timestamp: uhlc::Timestamp,
562    }
563
564    impl<const ID: u8> TimestampType<{ ID }> {
565        #[cfg(feature = "test")]
566        pub fn rand() -> Self {
567            use rand::Rng;
568            let mut rng = rand::thread_rng();
569
570            let time = uhlc::NTP64(rng.gen());
571            let id = uhlc::ID::try_from(ZenohIdProto::rand().to_le_bytes()).unwrap();
572            let timestamp = uhlc::Timestamp::new(time, id);
573            Self { timestamp }
574        }
575    }
576
577    /// ```text
578    ///  7 6 5 4 3 2 1 0
579    /// +-+-+-+-+-+-+-+-+
580    /// |Z|0_1|    ID   |
581    /// +-+-+-+---------+
582    /// %    node_id    %
583    /// +---------------+
584    /// ```
585    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
586    pub struct NodeIdType<const ID: u8> {
587        pub node_id: u16,
588    }
589
590    impl<const ID: u8> NodeIdType<{ ID }> {
591        // node_id == 0 means the message has been generated by the node itself
592        pub const DEFAULT: Self = Self { node_id: 0 };
593
594        #[cfg(feature = "test")]
595        pub fn rand() -> Self {
596            use rand::Rng;
597            let mut rng = rand::thread_rng();
598            let node_id = rng.gen();
599            Self { node_id }
600        }
601    }
602
603    impl<const ID: u8> Default for NodeIdType<{ ID }> {
604        fn default() -> Self {
605            Self::DEFAULT
606        }
607    }
608
609    impl<const ID: u8> From<ZExtZ64<{ ID }>> for NodeIdType<{ ID }> {
610        fn from(ext: ZExtZ64<{ ID }>) -> Self {
611            Self {
612                node_id: ext.value as u16,
613            }
614        }
615    }
616
617    impl<const ID: u8> From<NodeIdType<{ ID }>> for ZExtZ64<{ ID }> {
618        fn from(ext: NodeIdType<{ ID }>) -> Self {
619            ZExtZ64::new(ext.node_id as u64)
620        }
621    }
622
623    /// ```text
624    ///  7 6 5 4 3 2 1 0
625    /// +-+-+-+-+-+-+-+-+
626    /// |zid_len|X|X|X|X|
627    /// +-------+-+-+---+
628    /// ~      zid      ~
629    /// +---------------+
630    /// %      eid      %
631    /// +---------------+
632    /// ```
633    #[derive(Debug, Clone, PartialEq, Eq)]
634    pub struct EntityGlobalIdType<const ID: u8> {
635        pub zid: ZenohIdProto,
636        pub eid: EntityId,
637    }
638
639    impl<const ID: u8> EntityGlobalIdType<{ ID }> {
640        #[cfg(feature = "test")]
641        pub fn rand() -> Self {
642            use rand::Rng;
643            let mut rng = rand::thread_rng();
644
645            let zid = ZenohIdProto::rand();
646            let eid: EntityId = rng.gen();
647            Self { zid, eid }
648        }
649    }
650}