zenoh_protocol_core/
lib.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14pub mod key_expr;
15
16use key_expr::OwnedKeyExpr;
17use std::convert::{From, TryFrom, TryInto};
18use std::fmt;
19use std::hash::{Hash, Hasher};
20use std::num::NonZeroU64;
21use std::str::FromStr;
22use std::sync::atomic::AtomicU64;
23pub use uhlc::{Timestamp, NTP64};
24use uuid::Uuid;
25use zenoh_core::{bail, zerror};
26
27/// The unique Id of the [`HLC`](uhlc::HLC) that generated the concerned [`Timestamp`].
28pub type TimestampId = uhlc::ID;
29
30/// A zenoh integer.
31pub type ZInt = u64;
32pub type ZiInt = i64;
33pub type AtomicZInt = AtomicU64;
34pub type NonZeroZInt = NonZeroU64;
35pub const ZINT_MAX_BYTES: usize = 10;
36
37// WhatAmI values
38pub type WhatAmI = whatami::WhatAmI;
39
40/// Constants and helpers for zenoh `whatami` flags.
41pub mod whatami;
42
43/// A numerical Id mapped to a key expression.
44pub type ExprId = ZInt;
45
46pub const EMPTY_EXPR_ID: ExprId = 0;
47
48pub mod wire_expr;
49pub use crate::wire_expr::WireExpr;
50
51mod encoding;
52pub use encoding::{Encoding, KnownEncoding};
53
54pub mod locators;
55pub use locators::Locator;
56pub mod endpoints;
57pub use endpoints::EndPoint;
58
59#[derive(Debug, Clone, PartialEq, Eq)]
60pub struct Property {
61    pub key: ZInt,
62    pub value: Vec<u8>,
63}
64
65/// The kind of a `Sample`.
66#[repr(u8)]
67#[derive(Debug, Copy, Clone, PartialEq, Eq)]
68pub enum SampleKind {
69    /// if the `Sample` was issued by a `put` operation.
70    Put = 0,
71    /// if the `Sample` was issued by a `delete` operation.
72    Delete = 1,
73}
74
75impl Default for SampleKind {
76    fn default() -> Self {
77        SampleKind::Put
78    }
79}
80
81impl fmt::Display for SampleKind {
82    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
83        match self {
84            SampleKind::Put => write!(f, "PUT"),
85            SampleKind::Delete => write!(f, "DELETE"),
86        }
87    }
88}
89
90impl TryFrom<ZInt> for SampleKind {
91    type Error = ZInt;
92    fn try_from(kind: ZInt) -> Result<Self, ZInt> {
93        match kind {
94            0 => Ok(SampleKind::Put),
95            1 => Ok(SampleKind::Delete),
96            _ => Err(kind),
97        }
98    }
99}
100
101/// The global unique id of a zenoh peer.
102#[derive(Clone, Copy, Eq)]
103pub struct ZenohId(uhlc::ID);
104
105impl ZenohId {
106    pub const MAX_SIZE: usize = 16;
107
108    #[inline]
109    pub fn size(&self) -> usize {
110        self.0.size()
111    }
112
113    #[inline]
114    pub fn as_slice(&self) -> &[u8] {
115        self.0.as_slice()
116    }
117
118    pub fn rand() -> ZenohId {
119        ZenohId::from(Uuid::new_v4())
120    }
121
122    pub fn into_keyexpr(self) -> OwnedKeyExpr {
123        self.into()
124    }
125}
126
127impl Default for ZenohId {
128    fn default() -> Self {
129        Self::rand()
130    }
131}
132
133impl From<uuid::Uuid> for ZenohId {
134    #[inline]
135    fn from(uuid: uuid::Uuid) -> Self {
136        ZenohId(uuid.into())
137    }
138}
139
140macro_rules! derive_tryfrom {
141    ($T: ty) => {
142        impl TryFrom<$T> for ZenohId {
143            type Error = zenoh_core::Error;
144            fn try_from(val: $T) -> Result<Self, Self::Error> {
145                Ok(Self(val.try_into()?))
146            }
147        }
148    };
149}
150derive_tryfrom!([u8; 1]);
151derive_tryfrom!(&[u8; 1]);
152derive_tryfrom!([u8; 2]);
153derive_tryfrom!(&[u8; 2]);
154derive_tryfrom!([u8; 3]);
155derive_tryfrom!(&[u8; 3]);
156derive_tryfrom!([u8; 4]);
157derive_tryfrom!(&[u8; 4]);
158derive_tryfrom!([u8; 5]);
159derive_tryfrom!(&[u8; 5]);
160derive_tryfrom!([u8; 6]);
161derive_tryfrom!(&[u8; 6]);
162derive_tryfrom!([u8; 7]);
163derive_tryfrom!(&[u8; 7]);
164derive_tryfrom!([u8; 8]);
165derive_tryfrom!(&[u8; 8]);
166derive_tryfrom!([u8; 9]);
167derive_tryfrom!(&[u8; 9]);
168derive_tryfrom!([u8; 10]);
169derive_tryfrom!(&[u8; 10]);
170derive_tryfrom!([u8; 11]);
171derive_tryfrom!(&[u8; 11]);
172derive_tryfrom!([u8; 12]);
173derive_tryfrom!(&[u8; 12]);
174derive_tryfrom!([u8; 13]);
175derive_tryfrom!(&[u8; 13]);
176derive_tryfrom!([u8; 14]);
177derive_tryfrom!(&[u8; 14]);
178derive_tryfrom!([u8; 15]);
179derive_tryfrom!(&[u8; 15]);
180derive_tryfrom!([u8; 16]);
181derive_tryfrom!(&[u8; 16]);
182derive_tryfrom!(&[u8]);
183
184impl FromStr for ZenohId {
185    type Err = zenoh_core::Error;
186
187    fn from_str(s: &str) -> Result<Self, Self::Err> {
188        // filter-out '-' characters (in case s has UUID format)
189        let s = s.replace('-', "");
190        let vec = hex::decode(&s).map_err(|e| zerror!("Invalid id: {} - {}", s, e))?;
191        vec.as_slice().try_into()
192    }
193}
194
195impl PartialEq for ZenohId {
196    #[inline]
197    fn eq(&self, other: &Self) -> bool {
198        self.0.eq(&other.0)
199    }
200}
201
202impl Hash for ZenohId {
203    fn hash<H: Hasher>(&self, state: &mut H) {
204        self.as_slice().hash(state);
205    }
206}
207
208impl fmt::Debug for ZenohId {
209    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
210        write!(f, "{}", hex::encode_upper(self.as_slice()))
211    }
212}
213
214impl fmt::Display for ZenohId {
215    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
216        fmt::Debug::fmt(self, f)
217    }
218}
219
220// A PeerID can be converted into a Timestamp's ID
221impl From<&ZenohId> for uhlc::ID {
222    fn from(zid: &ZenohId) -> Self {
223        zid.0
224    }
225}
226
227impl From<ZenohId> for OwnedKeyExpr {
228    fn from(zid: ZenohId) -> Self {
229        // Safety: zid.to_string() returns an stringified hexadecimal
230        // representation of the zid. Therefore, building a OwnedKeyExpr
231        // by calling from_string_unchecked() is safe because it is
232        // guaranteed that no wildcards nor reserved chars will be present.
233        unsafe { OwnedKeyExpr::from_string_unchecked(zid.to_string()) }
234    }
235}
236
237impl From<&ZenohId> for OwnedKeyExpr {
238    fn from(zid: &ZenohId) -> Self {
239        (*zid).into()
240    }
241}
242
243impl serde::Serialize for ZenohId {
244    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
245    where
246        S: serde::Serializer,
247    {
248        serializer.serialize_str(self.to_string().as_str())
249    }
250}
251
252impl<'de> serde::Deserialize<'de> for ZenohId {
253    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
254    where
255        D: serde::Deserializer<'de>,
256    {
257        struct ZenohIdVisitor;
258
259        impl<'de> serde::de::Visitor<'de> for ZenohIdVisitor {
260            type Value = ZenohId;
261
262            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
263                formatter.write_str(&format!("An hex string of 1-{} bytes", ZenohId::MAX_SIZE))
264            }
265
266            fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
267            where
268                E: serde::de::Error,
269            {
270                v.parse().map_err(serde::de::Error::custom)
271            }
272
273            fn visit_borrowed_str<E>(self, v: &'de str) -> Result<Self::Value, E>
274            where
275                E: serde::de::Error,
276            {
277                self.visit_str(v)
278            }
279
280            fn visit_string<E>(self, v: String) -> Result<Self::Value, E>
281            where
282                E: serde::de::Error,
283            {
284                self.visit_str(&v)
285            }
286        }
287
288        deserializer.deserialize_str(ZenohIdVisitor)
289    }
290}
291
292#[derive(Debug, Copy, Clone, Eq, Hash, PartialEq)]
293#[repr(u8)]
294pub enum Priority {
295    Control = 0,
296    RealTime = 1,
297    InteractiveHigh = 2,
298    InteractiveLow = 3,
299    DataHigh = 4,
300    Data = 5,
301    DataLow = 6,
302    Background = 7,
303}
304
305impl Priority {
306    /// The lowest Priority
307    pub const MIN: Self = Self::Background;
308    /// The highest Priority
309    pub const MAX: Self = Self::Control;
310    /// The number of available priorities
311    pub const NUM: usize = 1 + Self::MIN as usize - Self::MAX as usize;
312}
313
314impl Default for Priority {
315    fn default() -> Priority {
316        Priority::Data
317    }
318}
319
320impl TryFrom<u8> for Priority {
321    type Error = zenoh_core::Error;
322
323    fn try_from(conduit: u8) -> Result<Self, Self::Error> {
324        match conduit {
325            0 => Ok(Priority::Control),
326            1 => Ok(Priority::RealTime),
327            2 => Ok(Priority::InteractiveHigh),
328            3 => Ok(Priority::InteractiveLow),
329            4 => Ok(Priority::DataHigh),
330            5 => Ok(Priority::Data),
331            6 => Ok(Priority::DataLow),
332            7 => Ok(Priority::Background),
333            unknown => bail!(
334                "{} is not a valid priority value. Admitted values are: [{}-{}].",
335                unknown,
336                Self::MAX as u8,
337                Self::MIN as u8
338            ),
339        }
340    }
341}
342
343#[derive(Debug, Copy, Clone, PartialEq, Eq)]
344#[repr(u8)]
345pub enum Reliability {
346    BestEffort,
347    Reliable,
348}
349
350impl Default for Reliability {
351    fn default() -> Reliability {
352        Reliability::BestEffort
353    }
354}
355
356#[derive(Debug, Copy, Clone, PartialEq, Eq, Default)]
357pub struct Channel {
358    pub priority: Priority,
359    pub reliability: Reliability,
360}
361
362#[derive(Debug, Clone, PartialEq, Eq)]
363pub enum ConduitSnList {
364    Plain(ConduitSn),
365    QoS(Box<[ConduitSn; Priority::NUM]>),
366}
367
368impl fmt::Display for ConduitSnList {
369    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
370        write!(f, "[ ")?;
371        match self {
372            ConduitSnList::Plain(sn) => {
373                write!(
374                    f,
375                    "{:?} {{ reliable: {}, best effort: {} }}",
376                    Priority::default(),
377                    sn.reliable,
378                    sn.best_effort
379                )?;
380            }
381            ConduitSnList::QoS(ref sns) => {
382                for (prio, sn) in sns.iter().enumerate() {
383                    let p: Priority = (prio as u8).try_into().unwrap();
384                    write!(
385                        f,
386                        "{:?} {{ reliable: {}, best effort: {} }}",
387                        p, sn.reliable, sn.best_effort
388                    )?;
389                    if p != Priority::Background {
390                        write!(f, ", ")?;
391                    }
392                }
393            }
394        }
395        write!(f, " ]")
396    }
397}
398
399/// The kind of reliability.
400#[derive(Debug, Copy, Clone, PartialEq, Eq, Default)]
401pub struct ConduitSn {
402    pub reliable: ZInt,
403    pub best_effort: ZInt,
404}
405
406/// The kind of congestion control.
407#[derive(Debug, Copy, Clone, PartialEq, Eq)]
408#[repr(u8)]
409pub enum CongestionControl {
410    Block,
411    Drop,
412}
413
414impl Default for CongestionControl {
415    fn default() -> CongestionControl {
416        CongestionControl::Drop
417    }
418}
419
420/// The subscription mode.
421#[derive(Debug, Copy, Clone, PartialEq, Eq)]
422#[repr(u8)]
423pub enum SubMode {
424    Push,
425    Pull,
426}
427
428impl Default for SubMode {
429    #[inline]
430    fn default() -> Self {
431        SubMode::Push
432    }
433}
434
435#[derive(Debug, Clone, PartialEq, Eq, Default)]
436pub struct SubInfo {
437    pub reliability: Reliability,
438    pub mode: SubMode,
439}
440
441#[derive(Debug, Default, Clone, PartialEq, Eq, Hash)]
442pub struct QueryableInfo {
443    pub complete: ZInt, // Default 0: incomplete
444    pub distance: ZInt, // Default 0: no distance
445}
446
447/// The kind of consolidation.
448#[derive(Debug, Clone, PartialEq, Eq, Copy)]
449pub enum ConsolidationMode {
450    /// No consolidation applied: multiple samples may be received for the same key-timestamp.
451    None,
452    /// Monotonic consolidation immediately forwards samples, except if one with an equal or more recent timestamp
453    /// has already been sent with the same key.
454    ///
455    /// This optimizes latency while potentially reducing bandwidth.
456    ///
457    /// Note that this doesn't cause re-ordering, but drops the samples for which a more recent timestamp has already
458    /// been observed with the same key.
459    Monotonic,
460    /// Holds back samples to only send the set of samples that had the highest timestamp for their key.
461    Latest,
462}
463
464/// The `zenoh::queryable::Queryable`s that should be target of a `zenoh::Session::get()`.
465#[derive(Debug, Clone, Copy, PartialEq, Eq)]
466pub enum QueryTarget {
467    BestMatching,
468    All,
469    AllComplete,
470    #[cfg(feature = "complete_n")]
471    Complete(ZInt),
472}
473
474impl Default for QueryTarget {
475    fn default() -> Self {
476        QueryTarget::BestMatching
477    }
478}
479
480pub(crate) fn split_once(s: &str, c: char) -> (&str, &str) {
481    match s.find(c) {
482        Some(index) => {
483            let (l, r) = s.split_at(index);
484            (l, &r[1..])
485        }
486        None => (s, ""),
487    }
488}