Skip to main content

zenoh_ext/
advanced_cache.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::{
15    collections::VecDeque,
16    future::{IntoFuture, Ready},
17    ops::{Bound, RangeBounds},
18    sync::{Arc, RwLock},
19};
20
21use zenoh::{
22    internal::{bail, traits::QoSBuilderTrait},
23    key_expr::{
24        format::{ke, kedefine},
25        keyexpr, KeyExpr,
26    },
27    liveliness::LivelinessToken,
28    qos::{CongestionControl, Priority},
29    query::{Queryable, ZenohParameters},
30    sample::{Locality, Sample, SampleBuilder},
31    Resolvable, Result as ZResult, Session, Wait, KE_ADV_PREFIX, KE_STARSTAR,
32};
33
34use crate::utils::WrappingSn;
35
36pub(crate) static KE_UHLC: &keyexpr = ke!("uhlc");
37#[zenoh_macros::unstable]
38kedefine!(
39    pub(crate) ke_liveliness: "${remaining:**}/@adv/${entity:*}/${zid:*}/${eid:*}/${meta:**}",
40);
41
42#[zenoh_macros::unstable]
43/// Configure replies.
44#[derive(Clone, Debug)]
45pub struct RepliesConfig {
46    priority: Priority,
47    congestion_control: CongestionControl,
48    is_express: bool,
49}
50
51#[zenoh_macros::unstable]
52impl Default for RepliesConfig {
53    fn default() -> Self {
54        Self {
55            priority: Priority::Data,
56            congestion_control: CongestionControl::Block,
57            is_express: false,
58        }
59    }
60}
61
62#[zenoh_macros::internal_trait]
63#[zenoh_macros::unstable]
64impl QoSBuilderTrait for RepliesConfig {
65    #[allow(unused_mut)]
66    #[zenoh_macros::unstable]
67    /// Changes the [`CongestionControl`] to apply when routing the data.
68    fn congestion_control(mut self, congestion_control: CongestionControl) -> Self {
69        self.congestion_control = congestion_control;
70        self
71    }
72
73    #[allow(unused_mut)]
74    #[zenoh_macros::unstable]
75    /// Changes the [`Priority`] to apply when routing the data.
76    fn priority(mut self, priority: Priority) -> Self {
77        self.priority = priority;
78        self
79    }
80
81    #[allow(unused_mut)]
82    #[zenoh_macros::unstable]
83    /// Changes the Express policy to apply when routing the data.
84    ///
85    /// When express is set to `true`, then the message will not be batched.
86    /// This usually has a positive impact on latency but a negative impact on throughput.
87    fn express(mut self, is_express: bool) -> Self {
88        self.is_express = is_express;
89        self
90    }
91}
92
93#[derive(Debug, Clone)]
94/// Configure an [`AdvancedPublisher`](crate::AdvancedPublisher) cache.
95#[zenoh_macros::unstable]
96pub struct CacheConfig {
97    max_samples: usize,
98    replies_config: RepliesConfig,
99}
100
101#[zenoh_macros::unstable]
102impl Default for CacheConfig {
103    fn default() -> Self {
104        Self {
105            max_samples: 1,
106            replies_config: RepliesConfig::default(),
107        }
108    }
109}
110
111#[zenoh_macros::unstable]
112impl CacheConfig {
113    /// Specify how many samples to keep for each resource.
114    #[zenoh_macros::unstable]
115    pub fn max_samples(mut self, depth: usize) -> Self {
116        self.max_samples = depth;
117        self
118    }
119
120    /// The QoS to apply to replies.
121    #[zenoh_macros::unstable]
122    pub fn replies_config(mut self, qos: RepliesConfig) -> Self {
123        self.replies_config = qos;
124        self
125    }
126}
127
128/// The builder of an [`AdvancedCache`], allowing to configure it.
129#[zenoh_macros::unstable]
130pub struct AdvancedCacheBuilder<'a, 'b, 'c> {
131    session: &'a Session,
132    pub_key_expr: ZResult<KeyExpr<'b>>,
133    queryable_suffix: Option<ZResult<KeyExpr<'c>>>,
134    queryable_origin: Locality,
135    history: CacheConfig,
136    liveliness: bool,
137}
138
139#[zenoh_macros::unstable]
140impl<'a, 'b, 'c> AdvancedCacheBuilder<'a, 'b, 'c> {
141    #[zenoh_macros::unstable]
142    pub(crate) fn new(
143        session: &'a Session,
144        pub_key_expr: ZResult<KeyExpr<'b>>,
145    ) -> AdvancedCacheBuilder<'a, 'b, 'c> {
146        AdvancedCacheBuilder {
147            session,
148            pub_key_expr,
149            queryable_suffix: Some(Ok((KE_ADV_PREFIX / KE_STARSTAR).into())),
150            queryable_origin: Locality::default(),
151            history: CacheConfig::default(),
152            liveliness: false,
153        }
154    }
155
156    /// Change the suffix used for queryable.
157    #[zenoh_macros::unstable]
158    pub fn queryable_suffix<TryIntoKeyExpr>(mut self, queryable_suffix: TryIntoKeyExpr) -> Self
159    where
160        TryIntoKeyExpr: TryInto<KeyExpr<'c>>,
161        <TryIntoKeyExpr as TryInto<KeyExpr<'c>>>::Error: Into<zenoh::Error>,
162    {
163        self.queryable_suffix = Some(queryable_suffix.try_into().map_err(Into::into));
164        self
165    }
166
167    /// Change the history size for each resource.
168    #[zenoh_macros::unstable]
169    pub fn history(mut self, history: CacheConfig) -> Self {
170        self.history = history;
171        self
172    }
173}
174
175#[zenoh_macros::unstable]
176impl Resolvable for AdvancedCacheBuilder<'_, '_, '_> {
177    type To = ZResult<AdvancedCache>;
178}
179
180#[zenoh_macros::unstable]
181impl Wait for AdvancedCacheBuilder<'_, '_, '_> {
182    fn wait(self) -> <Self as Resolvable>::To {
183        AdvancedCache::new(self)
184    }
185}
186
187#[zenoh_macros::unstable]
188impl IntoFuture for AdvancedCacheBuilder<'_, '_, '_> {
189    type Output = <Self as Resolvable>::To;
190    type IntoFuture = Ready<<Self as Resolvable>::To>;
191
192    #[zenoh_macros::unstable]
193    fn into_future(self) -> Self::IntoFuture {
194        std::future::ready(self.wait())
195    }
196}
197
198#[zenoh_macros::unstable]
199fn decode_sn_range(range: &str) -> (Bound<WrappingSn>, Bound<WrappingSn>) {
200    let mut split = range.split("..");
201    let start = split
202        .next()
203        .and_then(|s| s.parse::<WrappingSn>().ok().map(Bound::Included))
204        .unwrap_or(Bound::Unbounded);
205    let end = split
206        .next()
207        .map(|s| {
208            s.parse::<WrappingSn>()
209                .ok()
210                .map(Bound::Included)
211                .unwrap_or(Bound::Unbounded)
212        })
213        .unwrap_or(start);
214    (start, end)
215}
216
217/// [`AdvancedCache`].
218#[zenoh_macros::unstable]
219pub struct AdvancedCache {
220    cache: Arc<RwLock<VecDeque<Sample>>>,
221    max_samples: usize,
222    _queryable: Queryable<()>,
223    _token: Option<LivelinessToken>,
224}
225
226#[zenoh_macros::unstable]
227impl AdvancedCache {
228    #[zenoh_macros::unstable]
229    fn new(conf: AdvancedCacheBuilder<'_, '_, '_>) -> ZResult<AdvancedCache> {
230        let key_expr = conf.pub_key_expr?.into_owned();
231        // the queryable_suffix (optional), and the key_expr for AdvancedCache's queryable ("<pub_key_expr>/[<queryable_suffix>]")
232        let queryable_key_expr = match conf.queryable_suffix {
233            None => key_expr.clone(),
234            Some(Ok(ke)) => &key_expr / &ke,
235            Some(Err(e)) => bail!("Invalid key expression for queryable_suffix: {}", e),
236        };
237        tracing::debug!(
238            "Create AdvancedCache{{key_expr: {}, max_samples: {:?}}}",
239            &key_expr,
240            conf.history,
241        );
242        let cache = Arc::new(RwLock::new(VecDeque::<Sample>::new()));
243
244        // declare the queryable that will answer to queries on cache
245        let queryable = conf
246            .session
247            .declare_queryable(&queryable_key_expr)
248            .allowed_origin(conf.queryable_origin)
249            .callback({
250                let cache = cache.clone();
251                move |query| {
252                    tracing::trace!("AdvancedCache{{}} Handle query {}", query.selector());
253                    let range = query
254                        .parameters()
255                        .get("_sn")
256                        .map(decode_sn_range)
257                        .unwrap_or((Bound::Unbounded, Bound::Unbounded));
258                    let max = query
259                        .parameters()
260                        .get("_max")
261                        .and_then(|s| s.parse::<u32>().ok());
262                    if let Ok(queue) = cache.read() {
263                        if let Some(max) = max {
264                            let mut samples = VecDeque::new();
265                            for sample in queue.iter() {
266                                if range == (Bound::Unbounded, Bound::Unbounded)
267                                    || sample
268                                        .source_info()
269                                        .is_some_and(|si| range.contains(&si.source_sn()))
270                                {
271                                    if let (Some(Ok(time_range)), Some(timestamp)) =
272                                        (query.parameters().time_range(), sample.timestamp())
273                                    {
274                                        if !time_range
275                                            .contains(timestamp.get_time().to_system_time())
276                                        {
277                                            continue;
278                                        }
279                                    }
280                                    samples.push_front(sample);
281                                    samples.truncate(max as usize);
282                                }
283                            }
284                            for sample in samples.drain(..).rev() {
285                                if let Err(e) = query
286                                    .reply_sample(
287                                        SampleBuilder::from(sample.clone())
288                                            .congestion_control(
289                                                conf.history.replies_config.congestion_control,
290                                            )
291                                            .priority(conf.history.replies_config.priority)
292                                            .express(conf.history.replies_config.is_express)
293                                            .into(),
294                                    )
295                                    .wait()
296                                {
297                                    tracing::warn!("AdvancedCache{{}} Error replying to query: {}", e);
298                                } else {
299                                    tracing::trace!(
300                                        "AdvancedCache{{}} Replied to query {} with Sample{{info:{:?}, ts:{:?}}}",
301                                        query.selector(),
302                                        sample.source_info(),
303                                        sample.timestamp()
304                                    );
305                                }
306                            }
307                        } else {
308                            for sample in queue.iter() {
309                                if range == (Bound::Unbounded, Bound::Unbounded)
310                                    || sample
311                                        .source_info()
312                                        .is_some_and(|si| range.contains(&si.source_sn()))
313                                {
314                                    if let (Some(Ok(time_range)), Some(timestamp)) =
315                                        (query.parameters().time_range(), sample.timestamp())
316                                    {
317                                        if !time_range
318                                            .contains(timestamp.get_time().to_system_time())
319                                        {
320                                            continue;
321                                        }
322                                    }
323                                    if let Err(e) = query
324                                        .reply_sample(
325                                            SampleBuilder::from(sample.clone())
326                                                .congestion_control(
327                                                    conf.history.replies_config.congestion_control,
328                                                )
329                                                .priority(conf.history.replies_config.priority)
330                                                .express(conf.history.replies_config.is_express)
331                                                .into(),
332                                        )
333                                        .wait()
334                                    {
335                                        tracing::warn!("AdvancedCache{{}} Error replying to query: {}", e);
336                                    } else {
337                                        tracing::trace!(
338                                            "AdvancedCache{{}} Replied to query {} with Sample{{info:{:?}, ts:{:?}}}",
339                                            query.selector(),
340                                            sample.source_info(),
341                                            sample.timestamp()
342                                        );
343                                    }
344                                }
345                            }
346                        }
347                    } else {
348                        tracing::error!("AdvancedCache{{}} Unable to take AdvancedPublisher cache read lock");
349                    }
350                }
351            })
352            .wait()?;
353
354        let token = if conf.liveliness {
355            Some(
356                conf.session
357                    .liveliness()
358                    .declare_token(queryable_key_expr)
359                    .wait()?,
360            )
361        } else {
362            None
363        };
364
365        Ok(AdvancedCache {
366            cache,
367            max_samples: conf.history.max_samples,
368            _queryable: queryable,
369            _token: token,
370        })
371    }
372
373    #[zenoh_macros::unstable]
374    pub(crate) fn cache_sample(&self, sample: Sample) {
375        if let Ok(mut queue) = self.cache.write() {
376            if queue.len() >= self.max_samples {
377                queue.pop_front();
378            }
379            queue.push_back(sample);
380        } else {
381            tracing::error!("AdvancedCache{{}} Unable to take AdvancedPublisher cache write lock");
382        }
383    }
384}