Skip to main content

zenoh_config/
qos.rs

1use std::fmt;
2
3//
4// Copyright (c) 2024 ZettaScale Technology
5//
6// This program and the accompanying materials are made available under the
7// terms of the Eclipse Public License 2.0 which is available at
8// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
9// which is available at https://www.apache.org/licenses/LICENSE-2.0.
10//
11// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
12//
13// Contributors:
14//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
15//
16use serde::{Deserialize, Serialize};
17use zenoh_keyexpr::keyexpr_tree::{IKeyExprTreeMut, KeBoxTree};
18use zenoh_protocol::core::{key_expr::OwnedKeyExpr, CongestionControl, Priority, Reliability};
19
20#[derive(Debug, Deserialize, Default, Serialize, Clone)]
21pub struct PublisherQoSConfList(pub(crate) Vec<PublisherQoSConf>);
22
23impl From<PublisherQoSConfList> for KeBoxTree<PublisherQoSConfig> {
24    fn from(value: PublisherQoSConfList) -> KeBoxTree<PublisherQoSConfig> {
25        let mut tree = KeBoxTree::new();
26        for conf in value.0 {
27            for key_expr in conf.key_exprs {
28                // NOTE: we don't check key_expr unicity
29                tree.insert(&key_expr, conf.config.clone());
30            }
31        }
32        tree
33    }
34}
35
36#[derive(Debug, Deserialize, Serialize, Clone)]
37pub(crate) struct PublisherQoSConf {
38    pub key_exprs: Vec<OwnedKeyExpr>,
39    pub config: PublisherQoSConfig,
40}
41
42#[derive(Debug, Default, Deserialize, Serialize, Clone)]
43pub struct PublisherQoSConfig {
44    pub congestion_control: Option<CongestionControlConf>,
45    pub priority: Option<PriorityConf>,
46    pub express: Option<bool>,
47    #[cfg(feature = "unstable")]
48    pub reliability: Option<ReliabilityConf>,
49    #[cfg(feature = "unstable")]
50    pub allowed_destination: Option<PublisherLocalityConf>,
51}
52
53#[derive(Debug, Deserialize, Serialize, Clone, Copy, PartialEq, Eq)]
54#[serde(rename_all = "snake_case")]
55pub enum CongestionControlConf {
56    Drop,
57    Block,
58    #[cfg(feature = "unstable")]
59    BlockFirst,
60}
61
62impl From<CongestionControlConf> for CongestionControl {
63    fn from(value: CongestionControlConf) -> Self {
64        match value {
65            CongestionControlConf::Drop => Self::Drop,
66            CongestionControlConf::Block => Self::Block,
67            #[cfg(feature = "unstable")]
68            CongestionControlConf::BlockFirst => Self::BlockFirst,
69        }
70    }
71}
72
73impl From<CongestionControl> for CongestionControlConf {
74    fn from(value: CongestionControl) -> Self {
75        match value {
76            CongestionControl::Drop => Self::Drop,
77            CongestionControl::Block => Self::Block,
78            #[cfg(feature = "unstable")]
79            CongestionControl::BlockFirst => Self::BlockFirst,
80        }
81    }
82}
83
84#[derive(Debug, Deserialize, Serialize, Clone, Copy, PartialEq, Eq)]
85#[serde(rename_all = "snake_case")]
86pub enum PriorityConf {
87    RealTime = 1,
88    InteractiveHigh = 2,
89    InteractiveLow = 3,
90    DataHigh = 4,
91    Data = 5,
92    DataLow = 6,
93    Background = 7,
94}
95
96impl From<PriorityConf> for Priority {
97    fn from(value: PriorityConf) -> Self {
98        match value {
99            PriorityConf::RealTime => Self::RealTime,
100            PriorityConf::InteractiveHigh => Self::InteractiveHigh,
101            PriorityConf::InteractiveLow => Self::InteractiveLow,
102            PriorityConf::DataHigh => Self::DataHigh,
103            PriorityConf::Data => Self::Data,
104            PriorityConf::DataLow => Self::DataLow,
105            PriorityConf::Background => Self::Background,
106        }
107    }
108}
109
110#[derive(Debug, Clone, Copy, PartialEq, Eq)]
111pub enum PriorityUpdateConf {
112    Priority(PriorityConf),
113    Increment(i8),
114}
115
116impl serde::Serialize for PriorityUpdateConf {
117    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
118    where
119        S: serde::Serializer,
120    {
121        match self {
122            PriorityUpdateConf::Priority(value) => value.serialize(serializer),
123            PriorityUpdateConf::Increment(value) => value.serialize(serializer),
124        }
125    }
126}
127
128impl<'a> serde::Deserialize<'a> for PriorityUpdateConf {
129    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
130    where
131        D: serde::Deserializer<'a>,
132    {
133        struct PriorityOrIncrement<U>(std::marker::PhantomData<fn() -> U>);
134
135        impl serde::de::Visitor<'_> for PriorityOrIncrement<PriorityUpdateConf> {
136            type Value = PriorityUpdateConf;
137
138            fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
139                formatter.write_str("priority string or increment integer")
140            }
141
142            fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
143            where
144                E: serde::de::Error,
145            {
146                PriorityConf::deserialize(serde::de::value::StrDeserializer::new(v))
147                    .map(PriorityUpdateConf::Priority)
148            }
149
150            fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E>
151            where
152                E: serde::de::Error,
153            {
154                if v > 7 {
155                    Err(serde::de::Error::custom(
156                        "invalid priority increment (> +7)",
157                    ))
158                } else if v < -7 {
159                    Err(serde::de::Error::custom(
160                        "invalid priority increment (< -7)",
161                    ))
162                } else {
163                    Ok(PriorityUpdateConf::Increment(v as i8))
164                }
165            }
166        }
167        deserializer.deserialize_any(PriorityOrIncrement(std::marker::PhantomData))
168    }
169}
170
171#[derive(Debug, Deserialize, Serialize, Clone, Copy, PartialEq, Eq)]
172#[serde(rename_all = "snake_case")]
173pub enum ReliabilityConf {
174    BestEffort,
175    Reliable,
176}
177
178impl From<ReliabilityConf> for Reliability {
179    fn from(value: ReliabilityConf) -> Self {
180        match value {
181            ReliabilityConf::BestEffort => Self::BestEffort,
182            ReliabilityConf::Reliable => Self::Reliable,
183        }
184    }
185}
186
187impl From<Reliability> for ReliabilityConf {
188    fn from(value: Reliability) -> Self {
189        match value {
190            Reliability::BestEffort => Self::BestEffort,
191            Reliability::Reliable => Self::Reliable,
192        }
193    }
194}
195
196#[derive(Debug, Deserialize, Serialize, Clone, Copy)]
197#[serde(rename_all = "snake_case")]
198pub enum PublisherLocalityConf {
199    SessionLocal,
200    Remote,
201    Any,
202}
203
204#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, Hash, PartialEq)]
205#[serde(rename_all = "snake_case")]
206pub enum QosOverwriteMessage {
207    Put,
208    Delete,
209    Query,
210}
211
212#[derive(Debug, Default, Deserialize, Serialize, Clone, PartialEq, Eq)]
213pub struct QosFilter {
214    pub congestion_control: Option<CongestionControlConf>,
215    pub priority: Option<PriorityConf>,
216    pub express: Option<bool>,
217    pub reliability: Option<ReliabilityConf>,
218}
219
220#[derive(Debug, Default, Deserialize, Serialize, Clone)]
221pub struct QosOverwrites {
222    pub congestion_control: Option<CongestionControlConf>,
223    pub priority: Option<PriorityUpdateConf>,
224    pub express: Option<bool>,
225    // TODO: Add support for reliability overwrite (it is not possible right now, since reliability is not a part of RoutingContext, nor NetworkMessage)
226    // #[cfg(feature = "unstable")]
227    // pub reliability: Option<ReliabilityConf>,
228}