1use std::fmt;
2
3use serde::{Deserialize, Serialize};
17use zenoh_keyexpr::keyexpr_tree::{IKeyExprTreeMut, KeBoxTree};
18use zenoh_protocol::core::{key_expr::OwnedKeyExpr, CongestionControl, Priority, Reliability};
19
20#[derive(Debug, Deserialize, Default, Serialize, Clone)]
21pub struct PublisherQoSConfList(pub(crate) Vec<PublisherQoSConf>);
22
23impl From<PublisherQoSConfList> for KeBoxTree<PublisherQoSConfig> {
24 fn from(value: PublisherQoSConfList) -> KeBoxTree<PublisherQoSConfig> {
25 let mut tree = KeBoxTree::new();
26 for conf in value.0 {
27 for key_expr in conf.key_exprs {
28 tree.insert(&key_expr, conf.config.clone());
30 }
31 }
32 tree
33 }
34}
35
36#[derive(Debug, Deserialize, Serialize, Clone)]
37pub(crate) struct PublisherQoSConf {
38 pub key_exprs: Vec<OwnedKeyExpr>,
39 pub config: PublisherQoSConfig,
40}
41
42#[derive(Debug, Default, Deserialize, Serialize, Clone)]
43pub struct PublisherQoSConfig {
44 pub congestion_control: Option<CongestionControlConf>,
45 pub priority: Option<PriorityConf>,
46 pub express: Option<bool>,
47 #[cfg(feature = "unstable")]
48 pub reliability: Option<ReliabilityConf>,
49 #[cfg(feature = "unstable")]
50 pub allowed_destination: Option<PublisherLocalityConf>,
51}
52
53#[derive(Debug, Deserialize, Serialize, Clone, Copy, PartialEq, Eq)]
54#[serde(rename_all = "snake_case")]
55pub enum CongestionControlConf {
56 Drop,
57 Block,
58 #[cfg(feature = "unstable")]
59 BlockFirst,
60}
61
62impl From<CongestionControlConf> for CongestionControl {
63 fn from(value: CongestionControlConf) -> Self {
64 match value {
65 CongestionControlConf::Drop => Self::Drop,
66 CongestionControlConf::Block => Self::Block,
67 #[cfg(feature = "unstable")]
68 CongestionControlConf::BlockFirst => Self::BlockFirst,
69 }
70 }
71}
72
73impl From<CongestionControl> for CongestionControlConf {
74 fn from(value: CongestionControl) -> Self {
75 match value {
76 CongestionControl::Drop => Self::Drop,
77 CongestionControl::Block => Self::Block,
78 #[cfg(feature = "unstable")]
79 CongestionControl::BlockFirst => Self::BlockFirst,
80 }
81 }
82}
83
84#[derive(Debug, Deserialize, Serialize, Clone, Copy, PartialEq, Eq)]
85#[serde(rename_all = "snake_case")]
86pub enum PriorityConf {
87 RealTime = 1,
88 InteractiveHigh = 2,
89 InteractiveLow = 3,
90 DataHigh = 4,
91 Data = 5,
92 DataLow = 6,
93 Background = 7,
94}
95
96impl From<PriorityConf> for Priority {
97 fn from(value: PriorityConf) -> Self {
98 match value {
99 PriorityConf::RealTime => Self::RealTime,
100 PriorityConf::InteractiveHigh => Self::InteractiveHigh,
101 PriorityConf::InteractiveLow => Self::InteractiveLow,
102 PriorityConf::DataHigh => Self::DataHigh,
103 PriorityConf::Data => Self::Data,
104 PriorityConf::DataLow => Self::DataLow,
105 PriorityConf::Background => Self::Background,
106 }
107 }
108}
109
110#[derive(Debug, Clone, Copy, PartialEq, Eq)]
111pub enum PriorityUpdateConf {
112 Priority(PriorityConf),
113 Increment(i8),
114}
115
116impl serde::Serialize for PriorityUpdateConf {
117 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
118 where
119 S: serde::Serializer,
120 {
121 match self {
122 PriorityUpdateConf::Priority(value) => value.serialize(serializer),
123 PriorityUpdateConf::Increment(value) => value.serialize(serializer),
124 }
125 }
126}
127
128impl<'a> serde::Deserialize<'a> for PriorityUpdateConf {
129 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
130 where
131 D: serde::Deserializer<'a>,
132 {
133 struct PriorityOrIncrement<U>(std::marker::PhantomData<fn() -> U>);
134
135 impl serde::de::Visitor<'_> for PriorityOrIncrement<PriorityUpdateConf> {
136 type Value = PriorityUpdateConf;
137
138 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
139 formatter.write_str("priority string or increment integer")
140 }
141
142 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
143 where
144 E: serde::de::Error,
145 {
146 PriorityConf::deserialize(serde::de::value::StrDeserializer::new(v))
147 .map(PriorityUpdateConf::Priority)
148 }
149
150 fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E>
151 where
152 E: serde::de::Error,
153 {
154 if v > 7 {
155 Err(serde::de::Error::custom(
156 "invalid priority increment (> +7)",
157 ))
158 } else if v < -7 {
159 Err(serde::de::Error::custom(
160 "invalid priority increment (< -7)",
161 ))
162 } else {
163 Ok(PriorityUpdateConf::Increment(v as i8))
164 }
165 }
166 }
167 deserializer.deserialize_any(PriorityOrIncrement(std::marker::PhantomData))
168 }
169}
170
171#[derive(Debug, Deserialize, Serialize, Clone, Copy, PartialEq, Eq)]
172#[serde(rename_all = "snake_case")]
173pub enum ReliabilityConf {
174 BestEffort,
175 Reliable,
176}
177
178impl From<ReliabilityConf> for Reliability {
179 fn from(value: ReliabilityConf) -> Self {
180 match value {
181 ReliabilityConf::BestEffort => Self::BestEffort,
182 ReliabilityConf::Reliable => Self::Reliable,
183 }
184 }
185}
186
187impl From<Reliability> for ReliabilityConf {
188 fn from(value: Reliability) -> Self {
189 match value {
190 Reliability::BestEffort => Self::BestEffort,
191 Reliability::Reliable => Self::Reliable,
192 }
193 }
194}
195
196#[derive(Debug, Deserialize, Serialize, Clone, Copy)]
197#[serde(rename_all = "snake_case")]
198pub enum PublisherLocalityConf {
199 SessionLocal,
200 Remote,
201 Any,
202}
203
204#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, Hash, PartialEq)]
205#[serde(rename_all = "snake_case")]
206pub enum QosOverwriteMessage {
207 Put,
208 Delete,
209 Query,
210}
211
212#[derive(Debug, Default, Deserialize, Serialize, Clone, PartialEq, Eq)]
213pub struct QosFilter {
214 pub congestion_control: Option<CongestionControlConf>,
215 pub priority: Option<PriorityConf>,
216 pub express: Option<bool>,
217 pub reliability: Option<ReliabilityConf>,
218}
219
220#[derive(Debug, Default, Deserialize, Serialize, Clone)]
221pub struct QosOverwrites {
222 pub congestion_control: Option<CongestionControlConf>,
223 pub priority: Option<PriorityUpdateConf>,
224 pub express: Option<bool>,
225 }