1use std::{
15 collections::VecDeque,
16 future::{IntoFuture, Ready},
17 ops::{Bound, RangeBounds},
18 sync::{Arc, RwLock},
19};
20
21use zenoh::{
22 internal::{bail, traits::QoSBuilderTrait},
23 key_expr::{
24 format::{ke, kedefine},
25 keyexpr, KeyExpr,
26 },
27 liveliness::LivelinessToken,
28 qos::{CongestionControl, Priority},
29 query::{Queryable, ZenohParameters},
30 sample::{Locality, Sample, SampleBuilder},
31 Resolvable, Result as ZResult, Session, Wait, KE_ADV_PREFIX, KE_STARSTAR,
32};
33
34pub(crate) static KE_UHLC: &keyexpr = ke!("uhlc");
35#[zenoh_macros::unstable]
36kedefine!(
37 pub(crate) ke_liveliness: "${remaining:**}/@adv/${entity:*}/${zid:*}/${eid:*}/${meta:**}",
38);
39
40#[zenoh_macros::unstable]
41#[derive(Clone, Debug)]
43pub struct RepliesConfig {
44 priority: Priority,
45 congestion_control: CongestionControl,
46 is_express: bool,
47}
48
49#[zenoh_macros::unstable]
50impl Default for RepliesConfig {
51 fn default() -> Self {
52 Self {
53 priority: Priority::Data,
54 congestion_control: CongestionControl::Block,
55 is_express: false,
56 }
57 }
58}
59
60#[zenoh_macros::internal_trait]
61#[zenoh_macros::unstable]
62impl QoSBuilderTrait for RepliesConfig {
63 #[allow(unused_mut)]
64 #[zenoh_macros::unstable]
65 fn congestion_control(mut self, congestion_control: CongestionControl) -> Self {
66 self.congestion_control = congestion_control;
67 self
68 }
69
70 #[allow(unused_mut)]
71 #[zenoh_macros::unstable]
72 fn priority(mut self, priority: Priority) -> Self {
73 self.priority = priority;
74 self
75 }
76
77 #[allow(unused_mut)]
78 #[zenoh_macros::unstable]
79 fn express(mut self, is_express: bool) -> Self {
80 self.is_express = is_express;
81 self
82 }
83}
84
85#[derive(Debug, Clone)]
86#[zenoh_macros::unstable]
88pub struct CacheConfig {
89 max_samples: usize,
90 replies_config: RepliesConfig,
91}
92
93#[zenoh_macros::unstable]
94impl Default for CacheConfig {
95 fn default() -> Self {
96 Self {
97 max_samples: 1,
98 replies_config: RepliesConfig::default(),
99 }
100 }
101}
102
103#[zenoh_macros::unstable]
104impl CacheConfig {
105 #[zenoh_macros::unstable]
107 pub fn max_samples(mut self, depth: usize) -> Self {
108 self.max_samples = depth;
109 self
110 }
111
112 #[zenoh_macros::unstable]
114 pub fn replies_config(mut self, qos: RepliesConfig) -> Self {
115 self.replies_config = qos;
116 self
117 }
118}
119
120#[zenoh_macros::unstable]
122pub struct AdvancedCacheBuilder<'a, 'b, 'c> {
123 session: &'a Session,
124 pub_key_expr: ZResult<KeyExpr<'b>>,
125 queryable_suffix: Option<ZResult<KeyExpr<'c>>>,
126 queryable_origin: Locality,
127 history: CacheConfig,
128 liveliness: bool,
129}
130
131#[zenoh_macros::unstable]
132impl<'a, 'b, 'c> AdvancedCacheBuilder<'a, 'b, 'c> {
133 #[zenoh_macros::unstable]
134 pub(crate) fn new(
135 session: &'a Session,
136 pub_key_expr: ZResult<KeyExpr<'b>>,
137 ) -> AdvancedCacheBuilder<'a, 'b, 'c> {
138 AdvancedCacheBuilder {
139 session,
140 pub_key_expr,
141 queryable_suffix: Some(Ok((KE_ADV_PREFIX / KE_STARSTAR).into())),
142 queryable_origin: Locality::default(),
143 history: CacheConfig::default(),
144 liveliness: false,
145 }
146 }
147
148 #[zenoh_macros::unstable]
150 pub fn queryable_suffix<TryIntoKeyExpr>(mut self, queryable_suffix: TryIntoKeyExpr) -> Self
151 where
152 TryIntoKeyExpr: TryInto<KeyExpr<'c>>,
153 <TryIntoKeyExpr as TryInto<KeyExpr<'c>>>::Error: Into<zenoh::Error>,
154 {
155 self.queryable_suffix = Some(queryable_suffix.try_into().map_err(Into::into));
156 self
157 }
158
159 #[zenoh_macros::unstable]
161 pub fn history(mut self, history: CacheConfig) -> Self {
162 self.history = history;
163 self
164 }
165}
166
167#[zenoh_macros::unstable]
168impl Resolvable for AdvancedCacheBuilder<'_, '_, '_> {
169 type To = ZResult<AdvancedCache>;
170}
171
172#[zenoh_macros::unstable]
173impl Wait for AdvancedCacheBuilder<'_, '_, '_> {
174 fn wait(self) -> <Self as Resolvable>::To {
175 AdvancedCache::new(self)
176 }
177}
178
179#[zenoh_macros::unstable]
180impl IntoFuture for AdvancedCacheBuilder<'_, '_, '_> {
181 type Output = <Self as Resolvable>::To;
182 type IntoFuture = Ready<<Self as Resolvable>::To>;
183
184 #[zenoh_macros::unstable]
185 fn into_future(self) -> Self::IntoFuture {
186 std::future::ready(self.wait())
187 }
188}
189
190#[zenoh_macros::unstable]
191fn decode_range(range: &str) -> (Bound<u32>, Bound<u32>) {
192 let mut split = range.split("..");
193 let start = split
194 .next()
195 .and_then(|s| s.parse::<u32>().ok().map(Bound::Included))
196 .unwrap_or(Bound::Unbounded);
197 let end = split
198 .next()
199 .map(|s| {
200 s.parse::<u32>()
201 .ok()
202 .map(Bound::Included)
203 .unwrap_or(Bound::Unbounded)
204 })
205 .unwrap_or(start);
206 (start, end)
207}
208
209#[zenoh_macros::unstable]
211pub struct AdvancedCache {
212 cache: Arc<RwLock<VecDeque<Sample>>>,
213 max_samples: usize,
214 _queryable: Queryable<()>,
215 _token: Option<LivelinessToken>,
216}
217
218#[zenoh_macros::unstable]
219impl AdvancedCache {
220 #[zenoh_macros::unstable]
221 fn new(conf: AdvancedCacheBuilder<'_, '_, '_>) -> ZResult<AdvancedCache> {
222 let key_expr = conf.pub_key_expr?.into_owned();
223 let queryable_key_expr = match conf.queryable_suffix {
225 None => key_expr.clone(),
226 Some(Ok(ke)) => &key_expr / &ke,
227 Some(Err(e)) => bail!("Invalid key expression for queryable_suffix: {}", e),
228 };
229 tracing::debug!(
230 "Create AdvancedCache{{key_expr: {}, max_samples: {:?}}}",
231 &key_expr,
232 conf.history,
233 );
234 let cache = Arc::new(RwLock::new(VecDeque::<Sample>::new()));
235
236 let queryable = conf
238 .session
239 .declare_queryable(&queryable_key_expr)
240 .allowed_origin(conf.queryable_origin)
241 .callback({
242 let cache = cache.clone();
243 move |query| {
244 tracing::trace!("AdvancedCache{{}} Handle query {}", query.selector());
245 let range = query
246 .parameters()
247 .get("_sn")
248 .map(decode_range)
249 .unwrap_or((Bound::Unbounded, Bound::Unbounded));
250 let max = query
251 .parameters()
252 .get("_max")
253 .and_then(|s| s.parse::<u32>().ok());
254 if let Ok(queue) = cache.read() {
255 if let Some(max) = max {
256 let mut samples = VecDeque::new();
257 for sample in queue.iter() {
258 if range == (Bound::Unbounded, Bound::Unbounded)
259 || sample
260 .source_info()
261 .source_sn()
262 .is_some_and(|sn| range.contains(&sn))
263 {
264 if let (Some(Ok(time_range)), Some(timestamp)) =
265 (query.parameters().time_range(), sample.timestamp())
266 {
267 if !time_range
268 .contains(timestamp.get_time().to_system_time())
269 {
270 continue;
271 }
272 }
273 samples.push_front(sample);
274 samples.truncate(max as usize);
275 }
276 }
277 for sample in samples.drain(..).rev() {
278 if let Err(e) = query
279 .reply_sample(
280 SampleBuilder::from(sample.clone())
281 .congestion_control(
282 conf.history.replies_config.congestion_control,
283 )
284 .priority(conf.history.replies_config.priority)
285 .express(conf.history.replies_config.is_express)
286 .into(),
287 )
288 .wait()
289 {
290 tracing::warn!("AdvancedCache{{}} Error replying to query: {}", e);
291 } else {
292 tracing::trace!(
293 "AdvancedCache{{}} Replied to query {} with Sample{{info:{:?}, ts:{:?}}}",
294 query.selector(),
295 sample.source_info(),
296 sample.timestamp()
297 );
298 }
299 }
300 } else {
301 for sample in queue.iter() {
302 if range == (Bound::Unbounded, Bound::Unbounded)
303 || sample
304 .source_info()
305 .source_sn()
306 .is_some_and(|sn| range.contains(&sn))
307 {
308 if let (Some(Ok(time_range)), Some(timestamp)) =
309 (query.parameters().time_range(), sample.timestamp())
310 {
311 if !time_range
312 .contains(timestamp.get_time().to_system_time())
313 {
314 continue;
315 }
316 }
317 if let Err(e) = query
318 .reply_sample(
319 SampleBuilder::from(sample.clone())
320 .congestion_control(
321 conf.history.replies_config.congestion_control,
322 )
323 .priority(conf.history.replies_config.priority)
324 .express(conf.history.replies_config.is_express)
325 .into(),
326 )
327 .wait()
328 {
329 tracing::warn!("AdvancedCache{{}} Error replying to query: {}", e);
330 } else {
331 tracing::trace!(
332 "AdvancedCache{{}} Replied to query {} with Sample{{info:{:?}, ts:{:?}}}",
333 query.selector(),
334 sample.source_info(),
335 sample.timestamp()
336 );
337 }
338 }
339 }
340 }
341 } else {
342 tracing::error!("AdvancedCache{{}} Unable to take AdvancedPublisher cache read lock");
343 }
344 }
345 })
346 .wait()?;
347
348 let token = if conf.liveliness {
349 Some(
350 conf.session
351 .liveliness()
352 .declare_token(queryable_key_expr)
353 .wait()?,
354 )
355 } else {
356 None
357 };
358
359 Ok(AdvancedCache {
360 cache,
361 max_samples: conf.history.max_samples,
362 _queryable: queryable,
363 _token: token,
364 })
365 }
366
367 #[zenoh_macros::unstable]
368 pub(crate) fn cache_sample(&self, sample: Sample) {
369 if let Ok(mut queue) = self.cache.write() {
370 if queue.len() >= self.max_samples {
371 queue.pop_front();
372 }
373 queue.push_back(sample);
374 } else {
375 tracing::error!("AdvancedCache{{}} Unable to take AdvancedPublisher cache write lock");
376 }
377 }
378}