Skip to main content

zenoh_ext/
publication_cache.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::{
15    collections::{HashMap, VecDeque},
16    convert::TryInto,
17    future::{IntoFuture, Ready},
18    time::Duration,
19};
20
21use zenoh::{
22    handlers::FifoChannelHandler,
23    internal::{bail, runtime::ZRuntime, ResolveFuture, TerminatableTask},
24    key_expr::{keyexpr, KeyExpr, OwnedKeyExpr},
25    pubsub::Subscriber,
26    query::{Query, Queryable, ZenohParameters},
27    sample::{Locality, Sample},
28    Error, Resolvable, Resolve, Result as ZResult, Session, Wait,
29};
30
31/// The builder of PublicationCache, allowing to configure it.
32#[zenoh_macros::unstable]
33#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
34#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
35pub struct PublicationCacheBuilder<'a, 'b, 'c, const BACKGROUND: bool = false> {
36    session: &'a Session,
37    pub_key_expr: ZResult<KeyExpr<'b>>,
38    queryable_suffix: Option<ZResult<KeyExpr<'c>>>,
39    queryable_origin: Option<Locality>,
40    complete: Option<bool>,
41    history: usize,
42    resources_limit: Option<usize>,
43}
44
45#[allow(deprecated)]
46#[zenoh_macros::unstable]
47impl<'a, 'b, 'c> PublicationCacheBuilder<'a, 'b, 'c> {
48    pub(crate) fn new(
49        session: &'a Session,
50        pub_key_expr: ZResult<KeyExpr<'b>>,
51    ) -> PublicationCacheBuilder<'a, 'b, 'c> {
52        PublicationCacheBuilder {
53            session,
54            pub_key_expr,
55            queryable_suffix: None,
56            queryable_origin: None,
57            complete: None,
58            history: 1,
59            resources_limit: None,
60        }
61    }
62
63    /// Change the suffix used for queryable.
64    #[zenoh_macros::unstable]
65    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
66    pub fn queryable_suffix<TryIntoKeyExpr>(mut self, queryable_suffix: TryIntoKeyExpr) -> Self
67    where
68        TryIntoKeyExpr: TryInto<KeyExpr<'c>>,
69        <TryIntoKeyExpr as TryInto<KeyExpr<'c>>>::Error: Into<Error>,
70    {
71        self.queryable_suffix = Some(queryable_suffix.try_into().map_err(Into::into));
72        self
73    }
74
75    /// Restrict the matching queries that will be received by this [`PublicationCache`]'s queryable to the ones that have the given [`Locality`](zenoh::sample::Locality).
76    #[zenoh_macros::unstable]
77    #[inline]
78    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
79    pub fn queryable_allowed_origin(mut self, origin: Locality) -> Self {
80        self.queryable_origin = Some(origin);
81        self
82    }
83
84    /// Set completeness option for the queryable.
85    #[zenoh_macros::unstable]
86    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
87    pub fn queryable_complete(mut self, complete: bool) -> Self {
88        self.complete = Some(complete);
89        self
90    }
91
92    /// Change the history size for each resource.
93    #[zenoh_macros::unstable]
94    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
95    pub fn history(mut self, history: usize) -> Self {
96        self.history = history;
97        self
98    }
99
100    /// Change the limit number of cached resources.
101    #[zenoh_macros::unstable]
102    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
103    pub fn resources_limit(mut self, limit: usize) -> Self {
104        self.resources_limit = Some(limit);
105        self
106    }
107
108    #[zenoh_macros::unstable]
109    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
110    pub fn background(self) -> PublicationCacheBuilder<'a, 'b, 'c, true> {
111        PublicationCacheBuilder {
112            session: self.session,
113            pub_key_expr: self.pub_key_expr,
114            queryable_suffix: self.queryable_suffix,
115            queryable_origin: self.queryable_origin,
116            complete: self.complete,
117            history: self.history,
118            resources_limit: self.resources_limit,
119        }
120    }
121}
122
123#[zenoh_macros::unstable]
124#[allow(deprecated)]
125impl Resolvable for PublicationCacheBuilder<'_, '_, '_> {
126    type To = ZResult<PublicationCache>;
127}
128
129#[zenoh_macros::unstable]
130#[allow(deprecated)]
131impl Wait for PublicationCacheBuilder<'_, '_, '_> {
132    #[zenoh_macros::unstable]
133    fn wait(self) -> <Self as Resolvable>::To {
134        PublicationCache::new(self)
135    }
136}
137
138#[zenoh_macros::unstable]
139#[allow(deprecated)]
140impl IntoFuture for PublicationCacheBuilder<'_, '_, '_> {
141    type Output = <Self as Resolvable>::To;
142    type IntoFuture = Ready<<Self as Resolvable>::To>;
143
144    #[zenoh_macros::unstable]
145    fn into_future(self) -> Self::IntoFuture {
146        std::future::ready(self.wait())
147    }
148}
149
150#[zenoh_macros::unstable]
151#[allow(deprecated)]
152impl Resolvable for PublicationCacheBuilder<'_, '_, '_, true> {
153    type To = ZResult<()>;
154}
155
156#[zenoh_macros::unstable]
157#[allow(deprecated)]
158impl Wait for PublicationCacheBuilder<'_, '_, '_, true> {
159    #[zenoh_macros::unstable]
160    fn wait(self) -> <Self as Resolvable>::To {
161        PublicationCache::new(self).map(|_| ())
162    }
163}
164
165#[zenoh_macros::unstable]
166#[allow(deprecated)]
167impl IntoFuture for PublicationCacheBuilder<'_, '_, '_, true> {
168    type Output = <Self as Resolvable>::To;
169    type IntoFuture = Ready<<Self as Resolvable>::To>;
170
171    #[zenoh_macros::unstable]
172    fn into_future(self) -> Self::IntoFuture {
173        std::future::ready(self.wait())
174    }
175}
176
177/// [`PublicationCache`].
178#[zenoh_macros::unstable]
179#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
180pub struct PublicationCache {
181    local_sub: Subscriber<FifoChannelHandler<Sample>>,
182    _queryable: Queryable<FifoChannelHandler<Query>>,
183    task: TerminatableTask,
184}
185
186#[zenoh_macros::unstable]
187#[allow(deprecated)]
188impl PublicationCache {
189    #[zenoh_macros::unstable]
190    fn new<const BACKGROUND: bool>(
191        conf: PublicationCacheBuilder<'_, '_, '_, BACKGROUND>,
192    ) -> ZResult<PublicationCache> {
193        let key_expr = conf.pub_key_expr?;
194        // the queryable_suffix (optional), and the key_expr for PublicationCache's queryable ("<pub_key_expr>/[<queryable_suffix>]")
195        let (queryable_suffix, queryable_key_expr): (Option<OwnedKeyExpr>, KeyExpr) =
196            match conf.queryable_suffix {
197                None => (None, key_expr.clone()),
198                Some(Ok(ke)) => {
199                    let queryable_key_expr = &key_expr / &ke;
200                    (Some(ke.into()), queryable_key_expr)
201                }
202                Some(Err(e)) => bail!("Invalid key expression for queryable_suffix: {}", e),
203            };
204        tracing::debug!(
205            "Create PublicationCache on {} with history={} resource_limit={:?}",
206            &key_expr,
207            conf.history,
208            conf.resources_limit
209        );
210
211        if conf.session.hlc().is_none() {
212            bail!(
213                "Failed requirement for PublicationCache on {}: \
214                     the 'timestamping' setting must be enabled in the Zenoh configuration",
215                key_expr,
216            )
217        }
218
219        // declare the local subscriber that will store the local publications
220        let mut local_sub = conf
221            .session
222            .declare_subscriber(&key_expr)
223            .allowed_origin(Locality::SessionLocal)
224            .wait()?;
225        if BACKGROUND {
226            local_sub.set_background(true);
227        }
228
229        // declare the queryable which returns the cached publications
230        let mut queryable = conf.session.declare_queryable(&queryable_key_expr);
231        if let Some(origin) = conf.queryable_origin {
232            queryable = queryable.allowed_origin(origin);
233        }
234        if let Some(complete) = conf.complete {
235            queryable = queryable.complete(complete);
236        }
237        let mut queryable = queryable.wait()?;
238        if BACKGROUND {
239            queryable.set_background(true);
240        }
241
242        // take local ownership of stuff to be moved into task
243        let sub_recv = local_sub.handler().clone();
244        let quer_recv = queryable.handler().clone();
245        let pub_key_expr = key_expr.into_owned();
246        let resources_limit = conf.resources_limit;
247        let history = conf.history;
248
249        // TODO(yuyuan): use CancellationToken to manage it
250        let token = TerminatableTask::create_cancellation_token();
251        let token2 = token.clone();
252        let task = TerminatableTask::spawn(
253            ZRuntime::Application,
254            async move {
255                let mut cache: HashMap<OwnedKeyExpr, VecDeque<Sample>> =
256                    HashMap::with_capacity(resources_limit.unwrap_or(32));
257                let limit = resources_limit.unwrap_or(usize::MAX);
258                loop {
259                    tokio::select! {
260                        // on publication received by the local subscriber, store it
261                        sample = sub_recv.recv_async() => {
262                            if let Ok(sample) = sample {
263                                let queryable_key_expr: KeyExpr<'_> = if let Some(suffix) = &queryable_suffix {
264                                    sample.key_expr() / suffix
265                                } else {
266                                    sample.key_expr().clone()
267                                };
268
269                                if let Some(queue) = cache.get_mut(queryable_key_expr.as_keyexpr()) {
270                                    if queue.len() >= history {
271                                        queue.pop_front();
272                                    }
273                                    queue.push_back(sample);
274                                } else if cache.len() >= limit {
275                                    tracing::error!("PublicationCache on {}: resource_limit exceeded - can't cache publication for a new resource",
276                                    pub_key_expr);
277                                } else {
278                                    let mut queue: VecDeque<Sample> = VecDeque::new();
279                                    queue.push_back(sample);
280                                    cache.insert(queryable_key_expr.into(), queue);
281                                }
282                            }
283                        },
284
285                        // on query, reply with cached content
286                        query = quer_recv.recv_async() => {
287                            if let Ok(query) = query {
288                                if !query.key_expr().as_str().contains('*') {
289                                    if let Some(queue) = cache.get(query.key_expr().as_keyexpr()) {
290                                        for sample in queue {
291                                            if let (Some(Ok(time_range)), Some(timestamp)) = (query.parameters().time_range(), sample.timestamp()) {
292                                                if !time_range.contains(timestamp.get_time().to_system_time()){
293                                                    continue;
294                                                }
295                                            }
296                                            if let Err(e) = query.reply_sample(sample.clone()).await {
297                                                tracing::warn!("Error replying to query: {}", e);
298                                            }
299                                        }
300                                    }
301                                } else {
302                                    for (key_expr, queue) in cache.iter() {
303                                        if query.key_expr().intersects(unsafe{ keyexpr::from_str_unchecked(key_expr) }) {
304                                            for sample in queue {
305                                                if let (Some(Ok(time_range)), Some(timestamp)) = (query.parameters().time_range(), sample.timestamp()) {
306                                                    if !time_range.contains(timestamp.get_time().to_system_time()){
307                                                        continue;
308                                                    }
309                                                }
310                                                if let Err(e) = query.reply_sample(sample.clone()).await {
311                                                    tracing::warn!("Error replying to query: {}", e);
312                                                }
313                                            }
314                                        }
315                                    }
316                                }
317                            }
318                        },
319                        _ = token2.cancelled() => return
320                    }
321                }
322            },
323            token,
324        );
325
326        Ok(PublicationCache {
327            local_sub,
328            _queryable: queryable,
329            task,
330        })
331    }
332
333    /// Undeclare this [`PublicationCache`]`.
334    #[zenoh_macros::unstable]
335    #[inline]
336    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
337    pub fn undeclare(self) -> impl Resolve<ZResult<()>> {
338        ResolveFuture::new(async move {
339            let PublicationCache {
340                _queryable,
341                local_sub,
342                mut task,
343            } = self;
344            _queryable.undeclare().await?;
345            local_sub.undeclare().await?;
346            task.terminate(Duration::from_secs(10));
347            Ok(())
348        })
349    }
350
351    #[zenoh_macros::internal]
352    #[zenoh_macros::unstable]
353    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
354    pub fn set_background(&mut self, background: bool) {
355        self.local_sub.set_background(background);
356        self._queryable.set_background(background);
357    }
358
359    #[zenoh_macros::unstable]
360    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
361    pub fn key_expr(&self) -> &KeyExpr<'static> {
362        self.local_sub.key_expr()
363    }
364}