zenoh_ext/
advanced_cache.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::{
15    collections::VecDeque,
16    future::{IntoFuture, Ready},
17    ops::{Bound, RangeBounds},
18    sync::{Arc, RwLock},
19};
20
21use zenoh::{
22    internal::{bail, traits::QoSBuilderTrait},
23    key_expr::{
24        format::{ke, kedefine},
25        keyexpr, KeyExpr,
26    },
27    liveliness::LivelinessToken,
28    qos::{CongestionControl, Priority},
29    query::{Queryable, ZenohParameters},
30    sample::{Locality, Sample, SampleBuilder},
31    Resolvable, Result as ZResult, Session, Wait, KE_ADV_PREFIX, KE_STARSTAR,
32};
33
34use crate::utils::WrappingSn;
35
36pub(crate) static KE_UHLC: &keyexpr = ke!("uhlc");
37#[zenoh_macros::unstable]
38kedefine!(
39    pub(crate) ke_liveliness: "${remaining:**}/@adv/${entity:*}/${zid:*}/${eid:*}/${meta:**}",
40);
41
42#[zenoh_macros::unstable]
43/// Configure replies.
44#[derive(Clone, Debug)]
45pub struct RepliesConfig {
46    priority: Priority,
47    congestion_control: CongestionControl,
48    is_express: bool,
49}
50
51#[zenoh_macros::unstable]
52impl Default for RepliesConfig {
53    fn default() -> Self {
54        Self {
55            priority: Priority::Data,
56            congestion_control: CongestionControl::Block,
57            is_express: false,
58        }
59    }
60}
61
62#[zenoh_macros::internal_trait]
63#[zenoh_macros::unstable]
64impl QoSBuilderTrait for RepliesConfig {
65    #[allow(unused_mut)]
66    #[zenoh_macros::unstable]
67    fn congestion_control(mut self, congestion_control: CongestionControl) -> Self {
68        self.congestion_control = congestion_control;
69        self
70    }
71
72    #[allow(unused_mut)]
73    #[zenoh_macros::unstable]
74    fn priority(mut self, priority: Priority) -> Self {
75        self.priority = priority;
76        self
77    }
78
79    #[allow(unused_mut)]
80    #[zenoh_macros::unstable]
81    fn express(mut self, is_express: bool) -> Self {
82        self.is_express = is_express;
83        self
84    }
85}
86
87#[derive(Debug, Clone)]
88/// Configure an [`AdvancedPublisher`](crate::AdvancedPublisher) cache.
89#[zenoh_macros::unstable]
90pub struct CacheConfig {
91    max_samples: usize,
92    replies_config: RepliesConfig,
93}
94
95#[zenoh_macros::unstable]
96impl Default for CacheConfig {
97    fn default() -> Self {
98        Self {
99            max_samples: 1,
100            replies_config: RepliesConfig::default(),
101        }
102    }
103}
104
105#[zenoh_macros::unstable]
106impl CacheConfig {
107    /// Specify how many samples to keep for each resource.
108    #[zenoh_macros::unstable]
109    pub fn max_samples(mut self, depth: usize) -> Self {
110        self.max_samples = depth;
111        self
112    }
113
114    /// The QoS to apply to replies.
115    #[zenoh_macros::unstable]
116    pub fn replies_config(mut self, qos: RepliesConfig) -> Self {
117        self.replies_config = qos;
118        self
119    }
120}
121
122/// The builder of an [`AdvancedCache`], allowing to configure it.
123#[zenoh_macros::unstable]
124pub struct AdvancedCacheBuilder<'a, 'b, 'c> {
125    session: &'a Session,
126    pub_key_expr: ZResult<KeyExpr<'b>>,
127    queryable_suffix: Option<ZResult<KeyExpr<'c>>>,
128    queryable_origin: Locality,
129    history: CacheConfig,
130    liveliness: bool,
131}
132
133#[zenoh_macros::unstable]
134impl<'a, 'b, 'c> AdvancedCacheBuilder<'a, 'b, 'c> {
135    #[zenoh_macros::unstable]
136    pub(crate) fn new(
137        session: &'a Session,
138        pub_key_expr: ZResult<KeyExpr<'b>>,
139    ) -> AdvancedCacheBuilder<'a, 'b, 'c> {
140        AdvancedCacheBuilder {
141            session,
142            pub_key_expr,
143            queryable_suffix: Some(Ok((KE_ADV_PREFIX / KE_STARSTAR).into())),
144            queryable_origin: Locality::default(),
145            history: CacheConfig::default(),
146            liveliness: false,
147        }
148    }
149
150    /// Change the suffix used for queryable.
151    #[zenoh_macros::unstable]
152    pub fn queryable_suffix<TryIntoKeyExpr>(mut self, queryable_suffix: TryIntoKeyExpr) -> Self
153    where
154        TryIntoKeyExpr: TryInto<KeyExpr<'c>>,
155        <TryIntoKeyExpr as TryInto<KeyExpr<'c>>>::Error: Into<zenoh::Error>,
156    {
157        self.queryable_suffix = Some(queryable_suffix.try_into().map_err(Into::into));
158        self
159    }
160
161    /// Change the history size for each resource.
162    #[zenoh_macros::unstable]
163    pub fn history(mut self, history: CacheConfig) -> Self {
164        self.history = history;
165        self
166    }
167}
168
169#[zenoh_macros::unstable]
170impl Resolvable for AdvancedCacheBuilder<'_, '_, '_> {
171    type To = ZResult<AdvancedCache>;
172}
173
174#[zenoh_macros::unstable]
175impl Wait for AdvancedCacheBuilder<'_, '_, '_> {
176    fn wait(self) -> <Self as Resolvable>::To {
177        AdvancedCache::new(self)
178    }
179}
180
181#[zenoh_macros::unstable]
182impl IntoFuture for AdvancedCacheBuilder<'_, '_, '_> {
183    type Output = <Self as Resolvable>::To;
184    type IntoFuture = Ready<<Self as Resolvable>::To>;
185
186    #[zenoh_macros::unstable]
187    fn into_future(self) -> Self::IntoFuture {
188        std::future::ready(self.wait())
189    }
190}
191
192#[zenoh_macros::unstable]
193fn decode_sn_range(range: &str) -> (Bound<WrappingSn>, Bound<WrappingSn>) {
194    let mut split = range.split("..");
195    let start = split
196        .next()
197        .and_then(|s| s.parse::<WrappingSn>().ok().map(Bound::Included))
198        .unwrap_or(Bound::Unbounded);
199    let end = split
200        .next()
201        .map(|s| {
202            s.parse::<WrappingSn>()
203                .ok()
204                .map(Bound::Included)
205                .unwrap_or(Bound::Unbounded)
206        })
207        .unwrap_or(start);
208    (start, end)
209}
210
211/// [`AdvancedCache`].
212#[zenoh_macros::unstable]
213pub struct AdvancedCache {
214    cache: Arc<RwLock<VecDeque<Sample>>>,
215    max_samples: usize,
216    _queryable: Queryable<()>,
217    _token: Option<LivelinessToken>,
218}
219
220#[zenoh_macros::unstable]
221impl AdvancedCache {
222    #[zenoh_macros::unstable]
223    fn new(conf: AdvancedCacheBuilder<'_, '_, '_>) -> ZResult<AdvancedCache> {
224        let key_expr = conf.pub_key_expr?.into_owned();
225        // the queryable_suffix (optional), and the key_expr for AdvancedCache's queryable ("<pub_key_expr>/[<queryable_suffix>]")
226        let queryable_key_expr = match conf.queryable_suffix {
227            None => key_expr.clone(),
228            Some(Ok(ke)) => &key_expr / &ke,
229            Some(Err(e)) => bail!("Invalid key expression for queryable_suffix: {}", e),
230        };
231        tracing::debug!(
232            "Create AdvancedCache{{key_expr: {}, max_samples: {:?}}}",
233            &key_expr,
234            conf.history,
235        );
236        let cache = Arc::new(RwLock::new(VecDeque::<Sample>::new()));
237
238        // declare the queryable that will answer to queries on cache
239        let queryable = conf
240            .session
241            .declare_queryable(&queryable_key_expr)
242            .allowed_origin(conf.queryable_origin)
243            .callback({
244                let cache = cache.clone();
245                move |query| {
246                    tracing::trace!("AdvancedCache{{}} Handle query {}", query.selector());
247                    let range = query
248                        .parameters()
249                        .get("_sn")
250                        .map(decode_sn_range)
251                        .unwrap_or((Bound::Unbounded, Bound::Unbounded));
252                    let max = query
253                        .parameters()
254                        .get("_max")
255                        .and_then(|s| s.parse::<u32>().ok());
256                    if let Ok(queue) = cache.read() {
257                        if let Some(max) = max {
258                            let mut samples = VecDeque::new();
259                            for sample in queue.iter() {
260                                if range == (Bound::Unbounded, Bound::Unbounded)
261                                    || sample
262                                        .source_info()
263                                        .is_some_and(|si| range.contains(&si.source_sn()))
264                                {
265                                    if let (Some(Ok(time_range)), Some(timestamp)) =
266                                        (query.parameters().time_range(), sample.timestamp())
267                                    {
268                                        if !time_range
269                                            .contains(timestamp.get_time().to_system_time())
270                                        {
271                                            continue;
272                                        }
273                                    }
274                                    samples.push_front(sample);
275                                    samples.truncate(max as usize);
276                                }
277                            }
278                            for sample in samples.drain(..).rev() {
279                                if let Err(e) = query
280                                    .reply_sample(
281                                        SampleBuilder::from(sample.clone())
282                                            .congestion_control(
283                                                conf.history.replies_config.congestion_control,
284                                            )
285                                            .priority(conf.history.replies_config.priority)
286                                            .express(conf.history.replies_config.is_express)
287                                            .into(),
288                                    )
289                                    .wait()
290                                {
291                                    tracing::warn!("AdvancedCache{{}} Error replying to query: {}", e);
292                                } else {
293                                    tracing::trace!(
294                                        "AdvancedCache{{}} Replied to query {} with Sample{{info:{:?}, ts:{:?}}}",
295                                        query.selector(),
296                                        sample.source_info(),
297                                        sample.timestamp()
298                                    );
299                                }
300                            }
301                        } else {
302                            for sample in queue.iter() {
303                                if range == (Bound::Unbounded, Bound::Unbounded)
304                                    || sample
305                                        .source_info()
306                                        .is_some_and(|si| range.contains(&si.source_sn()))
307                                {
308                                    if let (Some(Ok(time_range)), Some(timestamp)) =
309                                        (query.parameters().time_range(), sample.timestamp())
310                                    {
311                                        if !time_range
312                                            .contains(timestamp.get_time().to_system_time())
313                                        {
314                                            continue;
315                                        }
316                                    }
317                                    if let Err(e) = query
318                                        .reply_sample(
319                                            SampleBuilder::from(sample.clone())
320                                                .congestion_control(
321                                                    conf.history.replies_config.congestion_control,
322                                                )
323                                                .priority(conf.history.replies_config.priority)
324                                                .express(conf.history.replies_config.is_express)
325                                                .into(),
326                                        )
327                                        .wait()
328                                    {
329                                        tracing::warn!("AdvancedCache{{}} Error replying to query: {}", e);
330                                    } else {
331                                        tracing::trace!(
332                                            "AdvancedCache{{}} Replied to query {} with Sample{{info:{:?}, ts:{:?}}}",
333                                            query.selector(),
334                                            sample.source_info(),
335                                            sample.timestamp()
336                                        );
337                                    }
338                                }
339                            }
340                        }
341                    } else {
342                        tracing::error!("AdvancedCache{{}} Unable to take AdvancedPublisher cache read lock");
343                    }
344                }
345            })
346            .wait()?;
347
348        let token = if conf.liveliness {
349            Some(
350                conf.session
351                    .liveliness()
352                    .declare_token(queryable_key_expr)
353                    .wait()?,
354            )
355        } else {
356            None
357        };
358
359        Ok(AdvancedCache {
360            cache,
361            max_samples: conf.history.max_samples,
362            _queryable: queryable,
363            _token: token,
364        })
365    }
366
367    #[zenoh_macros::unstable]
368    pub(crate) fn cache_sample(&self, sample: Sample) {
369        if let Ok(mut queue) = self.cache.write() {
370            if queue.len() >= self.max_samples {
371                queue.pop_front();
372            }
373            queue.push_back(sample);
374        } else {
375            tracing::error!("AdvancedCache{{}} Unable to take AdvancedPublisher cache write lock");
376        }
377    }
378}