1use std::{
15 collections::VecDeque,
16 future::{IntoFuture, Ready},
17 ops::{Bound, RangeBounds},
18 sync::{Arc, RwLock},
19};
20
21use zenoh::{
22 internal::{bail, traits::QoSBuilderTrait},
23 key_expr::{
24 format::{ke, kedefine},
25 keyexpr, KeyExpr,
26 },
27 liveliness::LivelinessToken,
28 qos::{CongestionControl, Priority},
29 query::{Queryable, ZenohParameters},
30 sample::{Locality, Sample, SampleBuilder},
31 Resolvable, Result as ZResult, Session, Wait, KE_ADV_PREFIX, KE_STARSTAR,
32};
33
34use crate::utils::WrappingSn;
35
36pub(crate) static KE_UHLC: &keyexpr = ke!("uhlc");
37#[zenoh_macros::unstable]
38kedefine!(
39 pub(crate) ke_liveliness: "${remaining:**}/@adv/${entity:*}/${zid:*}/${eid:*}/${meta:**}",
40);
41
42#[zenoh_macros::unstable]
43#[derive(Clone, Debug)]
45pub struct RepliesConfig {
46 priority: Priority,
47 congestion_control: CongestionControl,
48 is_express: bool,
49}
50
51#[zenoh_macros::unstable]
52impl Default for RepliesConfig {
53 fn default() -> Self {
54 Self {
55 priority: Priority::Data,
56 congestion_control: CongestionControl::Block,
57 is_express: false,
58 }
59 }
60}
61
62#[zenoh_macros::internal_trait]
63#[zenoh_macros::unstable]
64impl QoSBuilderTrait for RepliesConfig {
65 #[allow(unused_mut)]
66 #[zenoh_macros::unstable]
67 fn congestion_control(mut self, congestion_control: CongestionControl) -> Self {
68 self.congestion_control = congestion_control;
69 self
70 }
71
72 #[allow(unused_mut)]
73 #[zenoh_macros::unstable]
74 fn priority(mut self, priority: Priority) -> Self {
75 self.priority = priority;
76 self
77 }
78
79 #[allow(unused_mut)]
80 #[zenoh_macros::unstable]
81 fn express(mut self, is_express: bool) -> Self {
82 self.is_express = is_express;
83 self
84 }
85}
86
87#[derive(Debug, Clone)]
88#[zenoh_macros::unstable]
90pub struct CacheConfig {
91 max_samples: usize,
92 replies_config: RepliesConfig,
93}
94
95#[zenoh_macros::unstable]
96impl Default for CacheConfig {
97 fn default() -> Self {
98 Self {
99 max_samples: 1,
100 replies_config: RepliesConfig::default(),
101 }
102 }
103}
104
105#[zenoh_macros::unstable]
106impl CacheConfig {
107 #[zenoh_macros::unstable]
109 pub fn max_samples(mut self, depth: usize) -> Self {
110 self.max_samples = depth;
111 self
112 }
113
114 #[zenoh_macros::unstable]
116 pub fn replies_config(mut self, qos: RepliesConfig) -> Self {
117 self.replies_config = qos;
118 self
119 }
120}
121
122#[zenoh_macros::unstable]
124pub struct AdvancedCacheBuilder<'a, 'b, 'c> {
125 session: &'a Session,
126 pub_key_expr: ZResult<KeyExpr<'b>>,
127 queryable_suffix: Option<ZResult<KeyExpr<'c>>>,
128 queryable_origin: Locality,
129 history: CacheConfig,
130 liveliness: bool,
131}
132
133#[zenoh_macros::unstable]
134impl<'a, 'b, 'c> AdvancedCacheBuilder<'a, 'b, 'c> {
135 #[zenoh_macros::unstable]
136 pub(crate) fn new(
137 session: &'a Session,
138 pub_key_expr: ZResult<KeyExpr<'b>>,
139 ) -> AdvancedCacheBuilder<'a, 'b, 'c> {
140 AdvancedCacheBuilder {
141 session,
142 pub_key_expr,
143 queryable_suffix: Some(Ok((KE_ADV_PREFIX / KE_STARSTAR).into())),
144 queryable_origin: Locality::default(),
145 history: CacheConfig::default(),
146 liveliness: false,
147 }
148 }
149
150 #[zenoh_macros::unstable]
152 pub fn queryable_suffix<TryIntoKeyExpr>(mut self, queryable_suffix: TryIntoKeyExpr) -> Self
153 where
154 TryIntoKeyExpr: TryInto<KeyExpr<'c>>,
155 <TryIntoKeyExpr as TryInto<KeyExpr<'c>>>::Error: Into<zenoh::Error>,
156 {
157 self.queryable_suffix = Some(queryable_suffix.try_into().map_err(Into::into));
158 self
159 }
160
161 #[zenoh_macros::unstable]
163 pub fn history(mut self, history: CacheConfig) -> Self {
164 self.history = history;
165 self
166 }
167}
168
169#[zenoh_macros::unstable]
170impl Resolvable for AdvancedCacheBuilder<'_, '_, '_> {
171 type To = ZResult<AdvancedCache>;
172}
173
174#[zenoh_macros::unstable]
175impl Wait for AdvancedCacheBuilder<'_, '_, '_> {
176 fn wait(self) -> <Self as Resolvable>::To {
177 AdvancedCache::new(self)
178 }
179}
180
181#[zenoh_macros::unstable]
182impl IntoFuture for AdvancedCacheBuilder<'_, '_, '_> {
183 type Output = <Self as Resolvable>::To;
184 type IntoFuture = Ready<<Self as Resolvable>::To>;
185
186 #[zenoh_macros::unstable]
187 fn into_future(self) -> Self::IntoFuture {
188 std::future::ready(self.wait())
189 }
190}
191
192#[zenoh_macros::unstable]
193fn decode_sn_range(range: &str) -> (Bound<WrappingSn>, Bound<WrappingSn>) {
194 let mut split = range.split("..");
195 let start = split
196 .next()
197 .and_then(|s| s.parse::<WrappingSn>().ok().map(Bound::Included))
198 .unwrap_or(Bound::Unbounded);
199 let end = split
200 .next()
201 .map(|s| {
202 s.parse::<WrappingSn>()
203 .ok()
204 .map(Bound::Included)
205 .unwrap_or(Bound::Unbounded)
206 })
207 .unwrap_or(start);
208 (start, end)
209}
210
211#[zenoh_macros::unstable]
213pub struct AdvancedCache {
214 cache: Arc<RwLock<VecDeque<Sample>>>,
215 max_samples: usize,
216 _queryable: Queryable<()>,
217 _token: Option<LivelinessToken>,
218}
219
220#[zenoh_macros::unstable]
221impl AdvancedCache {
222 #[zenoh_macros::unstable]
223 fn new(conf: AdvancedCacheBuilder<'_, '_, '_>) -> ZResult<AdvancedCache> {
224 let key_expr = conf.pub_key_expr?.into_owned();
225 let queryable_key_expr = match conf.queryable_suffix {
227 None => key_expr.clone(),
228 Some(Ok(ke)) => &key_expr / &ke,
229 Some(Err(e)) => bail!("Invalid key expression for queryable_suffix: {}", e),
230 };
231 tracing::debug!(
232 "Create AdvancedCache{{key_expr: {}, max_samples: {:?}}}",
233 &key_expr,
234 conf.history,
235 );
236 let cache = Arc::new(RwLock::new(VecDeque::<Sample>::new()));
237
238 let queryable = conf
240 .session
241 .declare_queryable(&queryable_key_expr)
242 .allowed_origin(conf.queryable_origin)
243 .callback({
244 let cache = cache.clone();
245 move |query| {
246 tracing::trace!("AdvancedCache{{}} Handle query {}", query.selector());
247 let range = query
248 .parameters()
249 .get("_sn")
250 .map(decode_sn_range)
251 .unwrap_or((Bound::Unbounded, Bound::Unbounded));
252 let max = query
253 .parameters()
254 .get("_max")
255 .and_then(|s| s.parse::<u32>().ok());
256 if let Ok(queue) = cache.read() {
257 if let Some(max) = max {
258 let mut samples = VecDeque::new();
259 for sample in queue.iter() {
260 if range == (Bound::Unbounded, Bound::Unbounded)
261 || sample
262 .source_info()
263 .is_some_and(|si| range.contains(&si.source_sn()))
264 {
265 if let (Some(Ok(time_range)), Some(timestamp)) =
266 (query.parameters().time_range(), sample.timestamp())
267 {
268 if !time_range
269 .contains(timestamp.get_time().to_system_time())
270 {
271 continue;
272 }
273 }
274 samples.push_front(sample);
275 samples.truncate(max as usize);
276 }
277 }
278 for sample in samples.drain(..).rev() {
279 if let Err(e) = query
280 .reply_sample(
281 SampleBuilder::from(sample.clone())
282 .congestion_control(
283 conf.history.replies_config.congestion_control,
284 )
285 .priority(conf.history.replies_config.priority)
286 .express(conf.history.replies_config.is_express)
287 .into(),
288 )
289 .wait()
290 {
291 tracing::warn!("AdvancedCache{{}} Error replying to query: {}", e);
292 } else {
293 tracing::trace!(
294 "AdvancedCache{{}} Replied to query {} with Sample{{info:{:?}, ts:{:?}}}",
295 query.selector(),
296 sample.source_info(),
297 sample.timestamp()
298 );
299 }
300 }
301 } else {
302 for sample in queue.iter() {
303 if range == (Bound::Unbounded, Bound::Unbounded)
304 || sample
305 .source_info()
306 .is_some_and(|si| range.contains(&si.source_sn()))
307 {
308 if let (Some(Ok(time_range)), Some(timestamp)) =
309 (query.parameters().time_range(), sample.timestamp())
310 {
311 if !time_range
312 .contains(timestamp.get_time().to_system_time())
313 {
314 continue;
315 }
316 }
317 if let Err(e) = query
318 .reply_sample(
319 SampleBuilder::from(sample.clone())
320 .congestion_control(
321 conf.history.replies_config.congestion_control,
322 )
323 .priority(conf.history.replies_config.priority)
324 .express(conf.history.replies_config.is_express)
325 .into(),
326 )
327 .wait()
328 {
329 tracing::warn!("AdvancedCache{{}} Error replying to query: {}", e);
330 } else {
331 tracing::trace!(
332 "AdvancedCache{{}} Replied to query {} with Sample{{info:{:?}, ts:{:?}}}",
333 query.selector(),
334 sample.source_info(),
335 sample.timestamp()
336 );
337 }
338 }
339 }
340 }
341 } else {
342 tracing::error!("AdvancedCache{{}} Unable to take AdvancedPublisher cache read lock");
343 }
344 }
345 })
346 .wait()?;
347
348 let token = if conf.liveliness {
349 Some(
350 conf.session
351 .liveliness()
352 .declare_token(queryable_key_expr)
353 .wait()?,
354 )
355 } else {
356 None
357 };
358
359 Ok(AdvancedCache {
360 cache,
361 max_samples: conf.history.max_samples,
362 _queryable: queryable,
363 _token: token,
364 })
365 }
366
367 #[zenoh_macros::unstable]
368 pub(crate) fn cache_sample(&self, sample: Sample) {
369 if let Ok(mut queue) = self.cache.write() {
370 if queue.len() >= self.max_samples {
371 queue.pop_front();
372 }
373 queue.push_back(sample);
374 } else {
375 tracing::error!("AdvancedCache{{}} Unable to take AdvancedPublisher cache write lock");
376 }
377 }
378}