zenoh_ext/
advanced_cache.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::{
15    collections::VecDeque,
16    future::{IntoFuture, Ready},
17    ops::{Bound, RangeBounds},
18    sync::{Arc, RwLock},
19};
20
21use zenoh::{
22    internal::{bail, traits::QoSBuilderTrait},
23    key_expr::{
24        format::{ke, kedefine},
25        keyexpr, KeyExpr,
26    },
27    liveliness::LivelinessToken,
28    qos::{CongestionControl, Priority},
29    query::{Queryable, ZenohParameters},
30    sample::{Locality, Sample, SampleBuilder},
31    Resolvable, Result as ZResult, Session, Wait, KE_ADV_PREFIX, KE_STARSTAR,
32};
33
34pub(crate) static KE_UHLC: &keyexpr = ke!("uhlc");
35#[zenoh_macros::unstable]
36kedefine!(
37    pub(crate) ke_liveliness: "${remaining:**}/@adv/${entity:*}/${zid:*}/${eid:*}/${meta:**}",
38);
39
40#[zenoh_macros::unstable]
41/// Configure replies.
42#[derive(Clone, Debug)]
43pub struct RepliesConfig {
44    priority: Priority,
45    congestion_control: CongestionControl,
46    is_express: bool,
47}
48
49#[zenoh_macros::unstable]
50impl Default for RepliesConfig {
51    fn default() -> Self {
52        Self {
53            priority: Priority::Data,
54            congestion_control: CongestionControl::Block,
55            is_express: false,
56        }
57    }
58}
59
60#[zenoh_macros::internal_trait]
61#[zenoh_macros::unstable]
62impl QoSBuilderTrait for RepliesConfig {
63    #[allow(unused_mut)]
64    #[zenoh_macros::unstable]
65    fn congestion_control(mut self, congestion_control: CongestionControl) -> Self {
66        self.congestion_control = congestion_control;
67        self
68    }
69
70    #[allow(unused_mut)]
71    #[zenoh_macros::unstable]
72    fn priority(mut self, priority: Priority) -> Self {
73        self.priority = priority;
74        self
75    }
76
77    #[allow(unused_mut)]
78    #[zenoh_macros::unstable]
79    fn express(mut self, is_express: bool) -> Self {
80        self.is_express = is_express;
81        self
82    }
83}
84
85#[derive(Debug, Clone)]
86/// Configure an [`AdvancedPublisher`](crate::AdvancedPublisher) cache.
87#[zenoh_macros::unstable]
88pub struct CacheConfig {
89    max_samples: usize,
90    replies_config: RepliesConfig,
91}
92
93#[zenoh_macros::unstable]
94impl Default for CacheConfig {
95    fn default() -> Self {
96        Self {
97            max_samples: 1,
98            replies_config: RepliesConfig::default(),
99        }
100    }
101}
102
103#[zenoh_macros::unstable]
104impl CacheConfig {
105    /// Specify how many samples to keep for each resource.
106    #[zenoh_macros::unstable]
107    pub fn max_samples(mut self, depth: usize) -> Self {
108        self.max_samples = depth;
109        self
110    }
111
112    /// The QoS to apply to replies.
113    #[zenoh_macros::unstable]
114    pub fn replies_config(mut self, qos: RepliesConfig) -> Self {
115        self.replies_config = qos;
116        self
117    }
118}
119
120/// The builder of an [`AdvancedCache`], allowing to configure it.
121#[zenoh_macros::unstable]
122pub struct AdvancedCacheBuilder<'a, 'b, 'c> {
123    session: &'a Session,
124    pub_key_expr: ZResult<KeyExpr<'b>>,
125    queryable_suffix: Option<ZResult<KeyExpr<'c>>>,
126    queryable_origin: Locality,
127    history: CacheConfig,
128    liveliness: bool,
129}
130
131#[zenoh_macros::unstable]
132impl<'a, 'b, 'c> AdvancedCacheBuilder<'a, 'b, 'c> {
133    #[zenoh_macros::unstable]
134    pub(crate) fn new(
135        session: &'a Session,
136        pub_key_expr: ZResult<KeyExpr<'b>>,
137    ) -> AdvancedCacheBuilder<'a, 'b, 'c> {
138        AdvancedCacheBuilder {
139            session,
140            pub_key_expr,
141            queryable_suffix: Some(Ok((KE_ADV_PREFIX / KE_STARSTAR).into())),
142            queryable_origin: Locality::default(),
143            history: CacheConfig::default(),
144            liveliness: false,
145        }
146    }
147
148    /// Change the suffix used for queryable.
149    #[zenoh_macros::unstable]
150    pub fn queryable_suffix<TryIntoKeyExpr>(mut self, queryable_suffix: TryIntoKeyExpr) -> Self
151    where
152        TryIntoKeyExpr: TryInto<KeyExpr<'c>>,
153        <TryIntoKeyExpr as TryInto<KeyExpr<'c>>>::Error: Into<zenoh::Error>,
154    {
155        self.queryable_suffix = Some(queryable_suffix.try_into().map_err(Into::into));
156        self
157    }
158
159    /// Change the history size for each resource.
160    #[zenoh_macros::unstable]
161    pub fn history(mut self, history: CacheConfig) -> Self {
162        self.history = history;
163        self
164    }
165}
166
167#[zenoh_macros::unstable]
168impl Resolvable for AdvancedCacheBuilder<'_, '_, '_> {
169    type To = ZResult<AdvancedCache>;
170}
171
172#[zenoh_macros::unstable]
173impl Wait for AdvancedCacheBuilder<'_, '_, '_> {
174    fn wait(self) -> <Self as Resolvable>::To {
175        AdvancedCache::new(self)
176    }
177}
178
179#[zenoh_macros::unstable]
180impl IntoFuture for AdvancedCacheBuilder<'_, '_, '_> {
181    type Output = <Self as Resolvable>::To;
182    type IntoFuture = Ready<<Self as Resolvable>::To>;
183
184    #[zenoh_macros::unstable]
185    fn into_future(self) -> Self::IntoFuture {
186        std::future::ready(self.wait())
187    }
188}
189
190#[zenoh_macros::unstable]
191fn decode_range(range: &str) -> (Bound<u32>, Bound<u32>) {
192    let mut split = range.split("..");
193    let start = split
194        .next()
195        .and_then(|s| s.parse::<u32>().ok().map(Bound::Included))
196        .unwrap_or(Bound::Unbounded);
197    let end = split
198        .next()
199        .map(|s| {
200            s.parse::<u32>()
201                .ok()
202                .map(Bound::Included)
203                .unwrap_or(Bound::Unbounded)
204        })
205        .unwrap_or(start);
206    (start, end)
207}
208
209/// [`AdvancedCache`].
210#[zenoh_macros::unstable]
211pub struct AdvancedCache {
212    cache: Arc<RwLock<VecDeque<Sample>>>,
213    max_samples: usize,
214    _queryable: Queryable<()>,
215    _token: Option<LivelinessToken>,
216}
217
218#[zenoh_macros::unstable]
219impl AdvancedCache {
220    #[zenoh_macros::unstable]
221    fn new(conf: AdvancedCacheBuilder<'_, '_, '_>) -> ZResult<AdvancedCache> {
222        let key_expr = conf.pub_key_expr?.into_owned();
223        // the queryable_suffix (optional), and the key_expr for AdvancedCache's queryable ("<pub_key_expr>/[<queryable_suffix>]")
224        let queryable_key_expr = match conf.queryable_suffix {
225            None => key_expr.clone(),
226            Some(Ok(ke)) => &key_expr / &ke,
227            Some(Err(e)) => bail!("Invalid key expression for queryable_suffix: {}", e),
228        };
229        tracing::debug!(
230            "Create AdvancedCache{{key_expr: {}, max_samples: {:?}}}",
231            &key_expr,
232            conf.history,
233        );
234        let cache = Arc::new(RwLock::new(VecDeque::<Sample>::new()));
235
236        // declare the queryable that will answer to queries on cache
237        let queryable = conf
238            .session
239            .declare_queryable(&queryable_key_expr)
240            .allowed_origin(conf.queryable_origin)
241            .callback({
242                let cache = cache.clone();
243                move |query| {
244                    tracing::trace!("AdvancedCache{{}} Handle query {}", query.selector());
245                    let range = query
246                        .parameters()
247                        .get("_sn")
248                        .map(decode_range)
249                        .unwrap_or((Bound::Unbounded, Bound::Unbounded));
250                    let max = query
251                        .parameters()
252                        .get("_max")
253                        .and_then(|s| s.parse::<u32>().ok());
254                    if let Ok(queue) = cache.read() {
255                        if let Some(max) = max {
256                            let mut samples = VecDeque::new();
257                            for sample in queue.iter() {
258                                if range == (Bound::Unbounded, Bound::Unbounded)
259                                    || sample
260                                        .source_info()
261                                        .source_sn()
262                                        .is_some_and(|sn| range.contains(&sn))
263                                {
264                                    if let (Some(Ok(time_range)), Some(timestamp)) =
265                                        (query.parameters().time_range(), sample.timestamp())
266                                    {
267                                        if !time_range
268                                            .contains(timestamp.get_time().to_system_time())
269                                        {
270                                            continue;
271                                        }
272                                    }
273                                    samples.push_front(sample);
274                                    samples.truncate(max as usize);
275                                }
276                            }
277                            for sample in samples.drain(..).rev() {
278                                if let Err(e) = query
279                                    .reply_sample(
280                                        SampleBuilder::from(sample.clone())
281                                            .congestion_control(
282                                                conf.history.replies_config.congestion_control,
283                                            )
284                                            .priority(conf.history.replies_config.priority)
285                                            .express(conf.history.replies_config.is_express)
286                                            .into(),
287                                    )
288                                    .wait()
289                                {
290                                    tracing::warn!("AdvancedCache{{}} Error replying to query: {}", e);
291                                } else {
292                                    tracing::trace!(
293                                        "AdvancedCache{{}} Replied to query {} with Sample{{info:{:?}, ts:{:?}}}",
294                                        query.selector(),
295                                        sample.source_info(),
296                                        sample.timestamp()
297                                    );
298                                }
299                            }
300                        } else {
301                            for sample in queue.iter() {
302                                if range == (Bound::Unbounded, Bound::Unbounded)
303                                    || sample
304                                        .source_info()
305                                        .source_sn()
306                                        .is_some_and(|sn| range.contains(&sn))
307                                {
308                                    if let (Some(Ok(time_range)), Some(timestamp)) =
309                                        (query.parameters().time_range(), sample.timestamp())
310                                    {
311                                        if !time_range
312                                            .contains(timestamp.get_time().to_system_time())
313                                        {
314                                            continue;
315                                        }
316                                    }
317                                    if let Err(e) = query
318                                        .reply_sample(
319                                            SampleBuilder::from(sample.clone())
320                                                .congestion_control(
321                                                    conf.history.replies_config.congestion_control,
322                                                )
323                                                .priority(conf.history.replies_config.priority)
324                                                .express(conf.history.replies_config.is_express)
325                                                .into(),
326                                        )
327                                        .wait()
328                                    {
329                                        tracing::warn!("AdvancedCache{{}} Error replying to query: {}", e);
330                                    } else {
331                                        tracing::trace!(
332                                            "AdvancedCache{{}} Replied to query {} with Sample{{info:{:?}, ts:{:?}}}",
333                                            query.selector(),
334                                            sample.source_info(),
335                                            sample.timestamp()
336                                        );
337                                    }
338                                }
339                            }
340                        }
341                    } else {
342                        tracing::error!("AdvancedCache{{}} Unable to take AdvancedPublisher cache read lock");
343                    }
344                }
345            })
346            .wait()?;
347
348        let token = if conf.liveliness {
349            Some(
350                conf.session
351                    .liveliness()
352                    .declare_token(queryable_key_expr)
353                    .wait()?,
354            )
355        } else {
356            None
357        };
358
359        Ok(AdvancedCache {
360            cache,
361            max_samples: conf.history.max_samples,
362            _queryable: queryable,
363            _token: token,
364        })
365    }
366
367    #[zenoh_macros::unstable]
368    pub(crate) fn cache_sample(&self, sample: Sample) {
369        if let Ok(mut queue) = self.cache.write() {
370            if queue.len() >= self.max_samples {
371                queue.pop_front();
372            }
373            queue.push_back(sample);
374        } else {
375            tracing::error!("AdvancedCache{{}} Unable to take AdvancedPublisher cache write lock");
376        }
377    }
378}