1use std::{
15 collections::VecDeque,
16 future::{IntoFuture, Ready},
17 ops::{Bound, RangeBounds},
18 sync::{Arc, RwLock},
19};
20
21use zenoh::{
22 internal::{bail, traits::QoSBuilderTrait},
23 key_expr::{
24 format::{ke, kedefine},
25 keyexpr, KeyExpr,
26 },
27 liveliness::LivelinessToken,
28 qos::{CongestionControl, Priority},
29 query::{Queryable, ZenohParameters},
30 sample::{Locality, Sample, SampleBuilder},
31 Resolvable, Result as ZResult, Session, Wait, KE_ADV_PREFIX, KE_STARSTAR,
32};
33
34use crate::utils::WrappingSn;
35
36pub(crate) static KE_UHLC: &keyexpr = ke!("uhlc");
37#[zenoh_macros::unstable]
38kedefine!(
39 pub(crate) ke_liveliness: "${remaining:**}/@adv/${entity:*}/${zid:*}/${eid:*}/${meta:**}",
40);
41
42#[zenoh_macros::unstable]
43#[derive(Clone, Debug)]
45pub struct RepliesConfig {
46 priority: Priority,
47 congestion_control: CongestionControl,
48 is_express: bool,
49}
50
51#[zenoh_macros::unstable]
52impl Default for RepliesConfig {
53 fn default() -> Self {
54 Self {
55 priority: Priority::Data,
56 congestion_control: CongestionControl::Block,
57 is_express: false,
58 }
59 }
60}
61
62#[zenoh_macros::internal_trait]
63#[zenoh_macros::unstable]
64impl QoSBuilderTrait for RepliesConfig {
65 #[allow(unused_mut)]
66 #[zenoh_macros::unstable]
67 fn congestion_control(mut self, congestion_control: CongestionControl) -> Self {
69 self.congestion_control = congestion_control;
70 self
71 }
72
73 #[allow(unused_mut)]
74 #[zenoh_macros::unstable]
75 fn priority(mut self, priority: Priority) -> Self {
77 self.priority = priority;
78 self
79 }
80
81 #[allow(unused_mut)]
82 #[zenoh_macros::unstable]
83 fn express(mut self, is_express: bool) -> Self {
88 self.is_express = is_express;
89 self
90 }
91}
92
93#[derive(Debug, Clone)]
94#[zenoh_macros::unstable]
96pub struct CacheConfig {
97 max_samples: usize,
98 replies_config: RepliesConfig,
99}
100
101#[zenoh_macros::unstable]
102impl Default for CacheConfig {
103 fn default() -> Self {
104 Self {
105 max_samples: 1,
106 replies_config: RepliesConfig::default(),
107 }
108 }
109}
110
111#[zenoh_macros::unstable]
112impl CacheConfig {
113 #[zenoh_macros::unstable]
115 pub fn max_samples(mut self, depth: usize) -> Self {
116 self.max_samples = depth;
117 self
118 }
119
120 #[zenoh_macros::unstable]
122 pub fn replies_config(mut self, qos: RepliesConfig) -> Self {
123 self.replies_config = qos;
124 self
125 }
126}
127
128#[zenoh_macros::unstable]
130pub struct AdvancedCacheBuilder<'a, 'b, 'c> {
131 session: &'a Session,
132 pub_key_expr: ZResult<KeyExpr<'b>>,
133 queryable_suffix: Option<ZResult<KeyExpr<'c>>>,
134 queryable_origin: Locality,
135 history: CacheConfig,
136 liveliness: bool,
137}
138
139#[zenoh_macros::unstable]
140impl<'a, 'b, 'c> AdvancedCacheBuilder<'a, 'b, 'c> {
141 #[zenoh_macros::unstable]
142 pub(crate) fn new(
143 session: &'a Session,
144 pub_key_expr: ZResult<KeyExpr<'b>>,
145 ) -> AdvancedCacheBuilder<'a, 'b, 'c> {
146 AdvancedCacheBuilder {
147 session,
148 pub_key_expr,
149 queryable_suffix: Some(Ok((KE_ADV_PREFIX / KE_STARSTAR).into())),
150 queryable_origin: Locality::default(),
151 history: CacheConfig::default(),
152 liveliness: false,
153 }
154 }
155
156 #[zenoh_macros::unstable]
158 pub fn queryable_suffix<TryIntoKeyExpr>(mut self, queryable_suffix: TryIntoKeyExpr) -> Self
159 where
160 TryIntoKeyExpr: TryInto<KeyExpr<'c>>,
161 <TryIntoKeyExpr as TryInto<KeyExpr<'c>>>::Error: Into<zenoh::Error>,
162 {
163 self.queryable_suffix = Some(queryable_suffix.try_into().map_err(Into::into));
164 self
165 }
166
167 #[zenoh_macros::unstable]
169 pub fn history(mut self, history: CacheConfig) -> Self {
170 self.history = history;
171 self
172 }
173}
174
175#[zenoh_macros::unstable]
176impl Resolvable for AdvancedCacheBuilder<'_, '_, '_> {
177 type To = ZResult<AdvancedCache>;
178}
179
180#[zenoh_macros::unstable]
181impl Wait for AdvancedCacheBuilder<'_, '_, '_> {
182 fn wait(self) -> <Self as Resolvable>::To {
183 AdvancedCache::new(self)
184 }
185}
186
187#[zenoh_macros::unstable]
188impl IntoFuture for AdvancedCacheBuilder<'_, '_, '_> {
189 type Output = <Self as Resolvable>::To;
190 type IntoFuture = Ready<<Self as Resolvable>::To>;
191
192 #[zenoh_macros::unstable]
193 fn into_future(self) -> Self::IntoFuture {
194 std::future::ready(self.wait())
195 }
196}
197
198#[zenoh_macros::unstable]
199fn decode_sn_range(range: &str) -> (Bound<WrappingSn>, Bound<WrappingSn>) {
200 let mut split = range.split("..");
201 let start = split
202 .next()
203 .and_then(|s| s.parse::<WrappingSn>().ok().map(Bound::Included))
204 .unwrap_or(Bound::Unbounded);
205 let end = split
206 .next()
207 .map(|s| {
208 s.parse::<WrappingSn>()
209 .ok()
210 .map(Bound::Included)
211 .unwrap_or(Bound::Unbounded)
212 })
213 .unwrap_or(start);
214 (start, end)
215}
216
217#[zenoh_macros::unstable]
219pub struct AdvancedCache {
220 cache: Arc<RwLock<VecDeque<Sample>>>,
221 max_samples: usize,
222 _queryable: Queryable<()>,
223 _token: Option<LivelinessToken>,
224}
225
226#[zenoh_macros::unstable]
227impl AdvancedCache {
228 #[zenoh_macros::unstable]
229 fn new(conf: AdvancedCacheBuilder<'_, '_, '_>) -> ZResult<AdvancedCache> {
230 let key_expr = conf.pub_key_expr?.into_owned();
231 let queryable_key_expr = match conf.queryable_suffix {
233 None => key_expr.clone(),
234 Some(Ok(ke)) => &key_expr / &ke,
235 Some(Err(e)) => bail!("Invalid key expression for queryable_suffix: {}", e),
236 };
237 tracing::debug!(
238 "Create AdvancedCache{{key_expr: {}, max_samples: {:?}}}",
239 &key_expr,
240 conf.history,
241 );
242 let cache = Arc::new(RwLock::new(VecDeque::<Sample>::new()));
243
244 let queryable = conf
246 .session
247 .declare_queryable(&queryable_key_expr)
248 .allowed_origin(conf.queryable_origin)
249 .callback({
250 let cache = cache.clone();
251 move |query| {
252 tracing::trace!("AdvancedCache{{}} Handle query {}", query.selector());
253 let range = query
254 .parameters()
255 .get("_sn")
256 .map(decode_sn_range)
257 .unwrap_or((Bound::Unbounded, Bound::Unbounded));
258 let max = query
259 .parameters()
260 .get("_max")
261 .and_then(|s| s.parse::<u32>().ok());
262 if let Ok(queue) = cache.read() {
263 if let Some(max) = max {
264 let mut samples = VecDeque::new();
265 for sample in queue.iter() {
266 if range == (Bound::Unbounded, Bound::Unbounded)
267 || sample
268 .source_info()
269 .is_some_and(|si| range.contains(&si.source_sn()))
270 {
271 if let (Some(Ok(time_range)), Some(timestamp)) =
272 (query.parameters().time_range(), sample.timestamp())
273 {
274 if !time_range
275 .contains(timestamp.get_time().to_system_time())
276 {
277 continue;
278 }
279 }
280 samples.push_front(sample);
281 samples.truncate(max as usize);
282 }
283 }
284 for sample in samples.drain(..).rev() {
285 if let Err(e) = query
286 .reply_sample(
287 SampleBuilder::from(sample.clone())
288 .congestion_control(
289 conf.history.replies_config.congestion_control,
290 )
291 .priority(conf.history.replies_config.priority)
292 .express(conf.history.replies_config.is_express)
293 .into(),
294 )
295 .wait()
296 {
297 tracing::warn!("AdvancedCache{{}} Error replying to query: {}", e);
298 } else {
299 tracing::trace!(
300 "AdvancedCache{{}} Replied to query {} with Sample{{info:{:?}, ts:{:?}}}",
301 query.selector(),
302 sample.source_info(),
303 sample.timestamp()
304 );
305 }
306 }
307 } else {
308 for sample in queue.iter() {
309 if range == (Bound::Unbounded, Bound::Unbounded)
310 || sample
311 .source_info()
312 .is_some_and(|si| range.contains(&si.source_sn()))
313 {
314 if let (Some(Ok(time_range)), Some(timestamp)) =
315 (query.parameters().time_range(), sample.timestamp())
316 {
317 if !time_range
318 .contains(timestamp.get_time().to_system_time())
319 {
320 continue;
321 }
322 }
323 if let Err(e) = query
324 .reply_sample(
325 SampleBuilder::from(sample.clone())
326 .congestion_control(
327 conf.history.replies_config.congestion_control,
328 )
329 .priority(conf.history.replies_config.priority)
330 .express(conf.history.replies_config.is_express)
331 .into(),
332 )
333 .wait()
334 {
335 tracing::warn!("AdvancedCache{{}} Error replying to query: {}", e);
336 } else {
337 tracing::trace!(
338 "AdvancedCache{{}} Replied to query {} with Sample{{info:{:?}, ts:{:?}}}",
339 query.selector(),
340 sample.source_info(),
341 sample.timestamp()
342 );
343 }
344 }
345 }
346 }
347 } else {
348 tracing::error!("AdvancedCache{{}} Unable to take AdvancedPublisher cache read lock");
349 }
350 }
351 })
352 .wait()?;
353
354 let token = if conf.liveliness {
355 Some(
356 conf.session
357 .liveliness()
358 .declare_token(queryable_key_expr)
359 .wait()?,
360 )
361 } else {
362 None
363 };
364
365 Ok(AdvancedCache {
366 cache,
367 max_samples: conf.history.max_samples,
368 _queryable: queryable,
369 _token: token,
370 })
371 }
372
373 #[zenoh_macros::unstable]
374 pub(crate) fn cache_sample(&self, sample: Sample) {
375 if let Ok(mut queue) = self.cache.write() {
376 if queue.len() >= self.max_samples {
377 queue.pop_front();
378 }
379 queue.push_back(sample);
380 } else {
381 tracing::error!("AdvancedCache{{}} Unable to take AdvancedPublisher cache write lock");
382 }
383 }
384}