Skip to main content

zenoh_protocol/core/
mod.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14
15use alloc::{
16    boxed::Box,
17    format,
18    string::{String, ToString},
19};
20use core::{
21    convert::{From, TryFrom, TryInto},
22    fmt::{self, Debug, Display},
23    hash::Hash,
24    ops::{Deref, RangeInclusive},
25    str::FromStr,
26};
27
28use serde::{Deserialize, Serialize};
29pub use uhlc::{Timestamp, NTP64};
30use zenoh_keyexpr::OwnedKeyExpr;
31use zenoh_result::{bail, zerror};
32
33/// The unique Id of the [`HLC`](uhlc::HLC) that generated the concerned [`Timestamp`].
34pub type TimestampId = uhlc::ID;
35
36/// Constants and helpers for zenoh `whatami` flags.
37pub mod whatami;
38pub use whatami::*;
39pub use zenoh_keyexpr::key_expr;
40
41pub mod wire_expr;
42pub use wire_expr::*;
43
44mod cowstr;
45pub use cowstr::CowStr;
46pub mod encoding;
47pub use encoding::{Encoding, EncodingId};
48
49pub mod locator;
50pub use locator::*;
51
52pub mod endpoint;
53pub use endpoint::*;
54
55pub mod resolution;
56pub use resolution::*;
57
58pub mod parameters;
59pub use parameters::Parameters;
60
61pub mod region;
62pub use region::*;
63
64/// The global unique id of a zenoh peer.
65#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
66#[repr(transparent)]
67pub struct ZenohIdProto(uhlc::ID);
68
69impl ZenohIdProto {
70    pub const MAX_SIZE: usize = 16;
71
72    #[inline]
73    pub fn size(&self) -> usize {
74        self.0.size()
75    }
76
77    #[inline]
78    pub fn to_le_bytes(&self) -> [u8; uhlc::ID::MAX_SIZE] {
79        self.0.to_le_bytes()
80    }
81
82    #[doc(hidden)]
83    pub fn rand() -> ZenohIdProto {
84        ZenohIdProto(uhlc::ID::rand())
85    }
86
87    pub fn into_keyexpr(self) -> OwnedKeyExpr {
88        self.into()
89    }
90
91    pub fn short(self) -> ShortZenohIdProto {
92        ShortZenohIdProto(self)
93    }
94}
95
96pub struct ShortZenohIdProto(ZenohIdProto);
97
98impl Display for ShortZenohIdProto {
99    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
100        type Repr = u32;
101        const L: usize = core::mem::size_of::<Repr>();
102        let bytes = self.0.to_le_bytes();
103        let start = self.0.size().saturating_sub(L);
104        write!(
105            f,
106            "{:x}",
107            Repr::from_le_bytes(bytes[start..start + L].try_into().unwrap())
108        )
109    }
110}
111
112impl Debug for ShortZenohIdProto {
113    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
114        write!(f, "{self}")
115    }
116}
117
118impl Default for ZenohIdProto {
119    fn default() -> Self {
120        Self::rand()
121    }
122}
123
124// Mimics uhlc::SizeError,
125#[derive(Debug, Clone, Copy)]
126pub struct SizeError(usize);
127
128#[cfg(feature = "std")]
129impl std::error::Error for SizeError {}
130#[cfg(not(feature = "std"))]
131impl zenoh_result::IError for SizeError {}
132
133impl From<uhlc::SizeError> for SizeError {
134    fn from(val: uhlc::SizeError) -> Self {
135        Self(val.0)
136    }
137}
138
139// Taken from uhlc::SizeError
140impl fmt::Display for SizeError {
141    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
142        write!(
143            f,
144            "Maximum ID size ({} bytes) exceeded: {}",
145            uhlc::ID::MAX_SIZE,
146            self.0
147        )
148    }
149}
150
151macro_rules! derive_tryfrom {
152    ($T: ty) => {
153        impl TryFrom<$T> for ZenohIdProto {
154            type Error = zenoh_result::Error;
155            fn try_from(val: $T) -> Result<Self, Self::Error> {
156                match val.try_into() {
157                    Ok(ok) => Ok(Self(ok)),
158                    Err(err) => Err(Box::<SizeError>::new(err.into())),
159                }
160            }
161        }
162    };
163}
164derive_tryfrom!([u8; 1]);
165derive_tryfrom!(&[u8; 1]);
166derive_tryfrom!([u8; 2]);
167derive_tryfrom!(&[u8; 2]);
168derive_tryfrom!([u8; 3]);
169derive_tryfrom!(&[u8; 3]);
170derive_tryfrom!([u8; 4]);
171derive_tryfrom!(&[u8; 4]);
172derive_tryfrom!([u8; 5]);
173derive_tryfrom!(&[u8; 5]);
174derive_tryfrom!([u8; 6]);
175derive_tryfrom!(&[u8; 6]);
176derive_tryfrom!([u8; 7]);
177derive_tryfrom!(&[u8; 7]);
178derive_tryfrom!([u8; 8]);
179derive_tryfrom!(&[u8; 8]);
180derive_tryfrom!([u8; 9]);
181derive_tryfrom!(&[u8; 9]);
182derive_tryfrom!([u8; 10]);
183derive_tryfrom!(&[u8; 10]);
184derive_tryfrom!([u8; 11]);
185derive_tryfrom!(&[u8; 11]);
186derive_tryfrom!([u8; 12]);
187derive_tryfrom!(&[u8; 12]);
188derive_tryfrom!([u8; 13]);
189derive_tryfrom!(&[u8; 13]);
190derive_tryfrom!([u8; 14]);
191derive_tryfrom!(&[u8; 14]);
192derive_tryfrom!([u8; 15]);
193derive_tryfrom!(&[u8; 15]);
194derive_tryfrom!([u8; 16]);
195derive_tryfrom!(&[u8; 16]);
196derive_tryfrom!(&[u8]);
197
198impl FromStr for ZenohIdProto {
199    type Err = zenoh_result::Error;
200
201    fn from_str(s: &str) -> Result<Self, Self::Err> {
202        if s.contains(|c: char| c.is_ascii_uppercase()) {
203            bail!(
204                "Invalid id: {} - uppercase hexadecimal is not accepted, use lowercase",
205                s
206            );
207        }
208        let u: uhlc::ID = s
209            .parse()
210            .map_err(|e: uhlc::ParseIDError| zerror!("Invalid id: {} - {}", s, e.cause))?;
211        Ok(ZenohIdProto(u))
212    }
213}
214
215impl fmt::Debug for ZenohIdProto {
216    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
217        write!(f, "{}", self.0)
218    }
219}
220
221impl fmt::Display for ZenohIdProto {
222    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223        fmt::Debug::fmt(self, f)
224    }
225}
226
227// A PeerID can be converted into a Timestamp's ID
228impl From<&ZenohIdProto> for uhlc::ID {
229    fn from(zid: &ZenohIdProto) -> Self {
230        zid.0
231    }
232}
233
234impl From<ZenohIdProto> for uhlc::ID {
235    fn from(zid: ZenohIdProto) -> Self {
236        zid.0
237    }
238}
239
240impl From<ZenohIdProto> for OwnedKeyExpr {
241    fn from(zid: ZenohIdProto) -> Self {
242        // SAFETY: zid.to_string() returns an stringified hexadecimal
243        // representation of the zid. Therefore, building a OwnedKeyExpr
244        // by calling from_string_unchecked() is safe because it is
245        // guaranteed that no wildcards nor reserved chars will be present.
246        unsafe { OwnedKeyExpr::from_string_unchecked(zid.to_string()) }
247    }
248}
249
250impl From<&ZenohIdProto> for OwnedKeyExpr {
251    fn from(zid: &ZenohIdProto) -> Self {
252        (*zid).into()
253    }
254}
255
256impl serde::Serialize for ZenohIdProto {
257    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
258    where
259        S: serde::Serializer,
260    {
261        serializer.serialize_str(self.to_string().as_str())
262    }
263}
264
265impl<'de> serde::Deserialize<'de> for ZenohIdProto {
266    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
267    where
268        D: serde::Deserializer<'de>,
269    {
270        struct ZenohIdVisitor;
271
272        impl<'de> serde::de::Visitor<'de> for ZenohIdVisitor {
273            type Value = ZenohIdProto;
274
275            fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
276                formatter.write_str(&format!(
277                    "An hex string of 1-{} bytes",
278                    ZenohIdProto::MAX_SIZE
279                ))
280            }
281
282            fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
283            where
284                E: serde::de::Error,
285            {
286                v.parse().map_err(serde::de::Error::custom)
287            }
288
289            fn visit_borrowed_str<E>(self, v: &'de str) -> Result<Self::Value, E>
290            where
291                E: serde::de::Error,
292            {
293                self.visit_str(v)
294            }
295
296            fn visit_string<E>(self, v: String) -> Result<Self::Value, E>
297            where
298                E: serde::de::Error,
299            {
300                self.visit_str(&v)
301            }
302        }
303
304        deserializer.deserialize_str(ZenohIdVisitor)
305    }
306}
307
308/// The unique id of a zenoh entity inside it's parent `Session`.
309pub type EntityId = u32;
310
311/// The global unique id of a zenoh entity.
312#[derive(Debug, Default, Copy, Clone, Eq, Hash, PartialEq)]
313pub struct EntityGlobalIdProto {
314    pub zid: ZenohIdProto,
315    pub eid: EntityId,
316}
317
318impl EntityGlobalIdProto {
319    #[cfg(feature = "test")]
320    #[doc(hidden)]
321    pub fn rand() -> Self {
322        use rand::Rng;
323        Self {
324            zid: ZenohIdProto::rand(),
325            eid: rand::thread_rng().gen(),
326        }
327    }
328}
329
330#[repr(u8)]
331#[derive(Debug, Default, Copy, Clone, Eq, Hash, PartialEq, PartialOrd, Ord, Serialize)]
332pub enum Priority {
333    Control = 0,
334    RealTime = 1,
335    InteractiveHigh = 2,
336    InteractiveLow = 3,
337    DataHigh = 4,
338    #[default]
339    Data = 5,
340    DataLow = 6,
341    Background = 7,
342}
343
344impl Display for Priority {
345    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
346        f.write_str(match self {
347            Priority::Control => "control",
348            Priority::RealTime => "real-time",
349            Priority::InteractiveHigh => "interactive-high",
350            Priority::InteractiveLow => "interactive-low",
351            Priority::Data => "data",
352            Priority::DataHigh => "data-high",
353            Priority::DataLow => "data-low",
354            Priority::Background => "background",
355        })
356    }
357}
358
359#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize)]
360/// A [`Priority`] range bounded inclusively below and above.
361pub struct PriorityRange(RangeInclusive<Priority>);
362
363impl Deref for PriorityRange {
364    type Target = RangeInclusive<Priority>;
365
366    fn deref(&self) -> &Self::Target {
367        &self.0
368    }
369}
370
371impl PriorityRange {
372    pub fn new(range: RangeInclusive<Priority>) -> Self {
373        Self(range)
374    }
375
376    /// Returns `true` if `self` is a superset of `other`.
377    pub fn includes(&self, other: &PriorityRange) -> bool {
378        self.start() <= other.start() && other.end() <= self.end()
379    }
380
381    pub fn len(&self) -> usize {
382        *self.end() as usize - *self.start() as usize + 1
383    }
384
385    pub fn is_empty(&self) -> bool {
386        false
387    }
388
389    #[cfg(feature = "test")]
390    #[doc(hidden)]
391    pub fn rand() -> Self {
392        use rand::Rng;
393        let mut rng = rand::thread_rng();
394        let start = rng.gen_range(Priority::MAX as u8..Priority::MIN as u8);
395        let end = rng.gen_range((start + 1)..=Priority::MIN as u8);
396
397        Self(Priority::try_from(start).unwrap()..=Priority::try_from(end).unwrap())
398    }
399}
400
401impl Display for PriorityRange {
402    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
403        write!(f, "{}-{}", *self.start() as u8, *self.end() as u8)
404    }
405}
406
407#[derive(Debug, PartialEq)]
408pub enum InvalidPriorityRange {
409    InvalidSyntax { found: String },
410    InvalidBound { message: String },
411}
412
413impl Display for InvalidPriorityRange {
414    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
415        match self {
416            InvalidPriorityRange::InvalidSyntax { found } => write!(f, "invalid priority range string, expected an range of the form `start-end` but found {found}"),
417            InvalidPriorityRange::InvalidBound { message } => write!(f, "invalid priority range bound: {message}"),
418        }
419    }
420}
421
422#[cfg(feature = "std")]
423impl std::error::Error for InvalidPriorityRange {}
424
425impl FromStr for PriorityRange {
426    type Err = InvalidPriorityRange;
427
428    fn from_str(s: &str) -> Result<Self, Self::Err> {
429        const SEPARATOR: &str = "-";
430        let mut metadata = s.split(SEPARATOR);
431
432        let start = metadata
433            .next()
434            .ok_or_else(|| InvalidPriorityRange::InvalidSyntax {
435                found: s.to_string(),
436            })?
437            .parse::<u8>()
438            .map(Priority::try_from)
439            .map_err(|err| InvalidPriorityRange::InvalidBound {
440                message: err.to_string(),
441            })?
442            .map_err(|err| InvalidPriorityRange::InvalidBound {
443                message: err.to_string(),
444            })?;
445
446        match metadata.next() {
447            Some(slice) => {
448                let end = slice
449                    .parse::<u8>()
450                    .map(Priority::try_from)
451                    .map_err(|err| InvalidPriorityRange::InvalidBound {
452                        message: err.to_string(),
453                    })?
454                    .map_err(|err| InvalidPriorityRange::InvalidBound {
455                        message: err.to_string(),
456                    })?;
457
458                if metadata.next().is_some() {
459                    return Err(InvalidPriorityRange::InvalidSyntax {
460                        found: s.to_string(),
461                    });
462                };
463
464                Ok(PriorityRange::new(start..=end))
465            }
466            None => Ok(PriorityRange::new(start..=start)),
467        }
468    }
469}
470
471impl Priority {
472    /// Default
473    pub const DEFAULT: Self = Self::Data;
474    /// The lowest Priority
475    pub const MIN: Self = Self::Background;
476    /// The highest Priority
477    pub const MAX: Self = Self::Control;
478    /// The number of available priorities
479    pub const NUM: usize = 1 + Self::MIN as usize - Self::MAX as usize;
480}
481
482impl TryFrom<u8> for Priority {
483    type Error = zenoh_result::Error;
484
485    fn try_from(v: u8) -> Result<Self, Self::Error> {
486        match v {
487            0 => Ok(Priority::Control),
488            1 => Ok(Priority::RealTime),
489            2 => Ok(Priority::InteractiveHigh),
490            3 => Ok(Priority::InteractiveLow),
491            4 => Ok(Priority::DataHigh),
492            5 => Ok(Priority::Data),
493            6 => Ok(Priority::DataLow),
494            7 => Ok(Priority::Background),
495            unknown => bail!(
496                "{} is not a valid priority value. Admitted values are: [{}-{}].",
497                unknown,
498                Self::MAX as u8,
499                Self::MIN as u8
500            ),
501        }
502    }
503}
504
505/// Reliability guarantees for message delivery.
506#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
507#[repr(u8)]
508pub enum Reliability {
509    /// Messages may be lost.
510    BestEffort = 0,
511    /// Messages are guaranteed to be delivered.
512    #[default]
513    Reliable = 1,
514}
515
516impl Reliability {
517    pub const DEFAULT: Self = Self::Reliable;
518
519    #[cfg(feature = "test")]
520    #[doc(hidden)]
521    pub fn rand() -> Self {
522        use rand::Rng;
523
524        let mut rng = rand::thread_rng();
525
526        if rng.gen_bool(0.5) {
527            Reliability::Reliable
528        } else {
529            Reliability::BestEffort
530        }
531    }
532}
533
534impl From<bool> for Reliability {
535    fn from(value: bool) -> Self {
536        if value {
537            Reliability::Reliable
538        } else {
539            Reliability::BestEffort
540        }
541    }
542}
543
544impl From<Reliability> for bool {
545    fn from(value: Reliability) -> Self {
546        match value {
547            Reliability::BestEffort => false,
548            Reliability::Reliable => true,
549        }
550    }
551}
552
553impl Display for Reliability {
554    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
555        write!(f, "{}", *self as u8)
556    }
557}
558
559#[derive(Debug)]
560pub struct InvalidReliability {
561    found: String,
562}
563
564impl Display for InvalidReliability {
565    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
566        write!(
567            f,
568            "invalid Reliability string, expected `{}` or `{}` but found {}",
569            Reliability::Reliable as u8,
570            Reliability::BestEffort as u8,
571            self.found
572        )
573    }
574}
575
576#[cfg(feature = "std")]
577impl std::error::Error for InvalidReliability {}
578
579impl FromStr for Reliability {
580    type Err = InvalidReliability;
581
582    fn from_str(s: &str) -> Result<Self, Self::Err> {
583        let Ok(desc) = s.parse::<u8>() else {
584            return Err(InvalidReliability {
585                found: s.to_string(),
586            });
587        };
588
589        if desc == Reliability::BestEffort as u8 {
590            Ok(Reliability::BestEffort)
591        } else if desc == Reliability::Reliable as u8 {
592            Ok(Reliability::Reliable)
593        } else {
594            Err(InvalidReliability {
595                found: s.to_string(),
596            })
597        }
598    }
599}
600
601#[derive(Debug, Copy, Clone, PartialEq, Eq, Default)]
602pub struct Channel {
603    pub priority: Priority,
604    pub reliability: Reliability,
605}
606
607impl Channel {
608    pub const DEFAULT: Self = Self {
609        priority: Priority::DEFAULT,
610        reliability: Reliability::DEFAULT,
611    };
612}
613
614/// Congestion control strategy.
615#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, Hash, Deserialize)]
616#[repr(u8)]
617pub enum CongestionControl {
618    #[default]
619    /// When transmitting a message in a node with a full queue, the node may drop the message.
620    Drop = 0,
621    /// When transmitting a message in a node with a full queue, the node will wait for queue to
622    /// progress.
623    Block = 1,
624    #[cfg(feature = "unstable")]
625    /// When transmitting a message in a node with a full queue, the node will wait for queue to
626    /// progress, but only for the first message sent with this strategy; other messages will be
627    /// dropped.
628    BlockFirst = 2,
629}
630
631impl CongestionControl {
632    pub const DEFAULT: Self = Self::Drop;
633
634    #[cfg(feature = "internal")]
635    pub const DEFAULT_PUSH: Self = Self::Drop;
636    #[cfg(not(feature = "internal"))]
637    pub(crate) const DEFAULT_PUSH: Self = Self::Drop;
638
639    #[cfg(feature = "internal")]
640    pub const DEFAULT_REQUEST: Self = Self::Block;
641    #[cfg(not(feature = "internal"))]
642    pub(crate) const DEFAULT_REQUEST: Self = Self::Block;
643
644    #[cfg(feature = "internal")]
645    pub const DEFAULT_DECLARE: Self = Self::Block;
646    #[cfg(not(feature = "internal"))]
647    pub(crate) const DEFAULT_DECLARE: Self = Self::Block;
648
649    #[cfg(feature = "internal")]
650    pub const DEFAULT_INTEREST: Self = Self::Block;
651    #[cfg(not(feature = "internal"))]
652    pub(crate) const DEFAULT_INTEREST: Self = Self::Block;
653
654    #[cfg(feature = "internal")]
655    pub const DEFAULT_OAM: Self = Self::Block;
656    #[cfg(not(feature = "internal"))]
657    pub(crate) const DEFAULT_OAM: Self = Self::Block;
658}
659
660#[cfg(test)]
661mod tests {
662    use core::str::FromStr;
663
664    use crate::core::{Priority, PriorityRange, RegionName, ZenohIdProto};
665
666    #[test]
667    fn test_priority_range() {
668        assert_eq!(
669            PriorityRange::from_str("2-3"),
670            Ok(PriorityRange::new(
671                Priority::InteractiveHigh..=Priority::InteractiveLow
672            ))
673        );
674
675        assert_eq!(
676            PriorityRange::from_str("7"),
677            Ok(PriorityRange::new(
678                Priority::Background..=Priority::Background
679            ))
680        );
681
682        assert!(PriorityRange::from_str("1-").is_err());
683        assert!(PriorityRange::from_str("-5").is_err());
684    }
685
686    #[test]
687    fn test_region_name_ok() {
688        assert!(RegionName::from_str("1234567812345678").is_ok());
689    }
690
691    #[test]
692    fn test_region_name_err() {
693        assert!(RegionName::from_str(&std::iter::repeat_n("Z", 33).collect::<String>()).is_err());
694        assert!(RegionName::from_str("").is_err());
695    }
696
697    #[test]
698    fn test_short_zid() {
699        assert_eq!(
700            &format!(
701                "{}",
702                "a1b2c3d4e5f6".parse::<ZenohIdProto>().unwrap().short()
703            ),
704            "a1b2c3d4"
705        );
706
707        assert_eq!(
708            &format!("{}", "a1b2".parse::<ZenohIdProto>().unwrap().short()),
709            "a1b2"
710        );
711    }
712}