zenoh_ext/
publication_cache.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::{
15    collections::{HashMap, VecDeque},
16    convert::TryInto,
17    future::{IntoFuture, Ready},
18    time::Duration,
19};
20
21use zenoh::{
22    handlers::FifoChannelHandler,
23    internal::{bail, runtime::ZRuntime, ResolveFuture, TerminatableTask},
24    key_expr::{keyexpr, KeyExpr, OwnedKeyExpr},
25    pubsub::Subscriber,
26    query::{Query, Queryable, ZenohParameters},
27    sample::{Locality, Sample},
28    Error, Resolvable, Resolve, Result as ZResult, Session, Wait,
29};
30
31/// The builder of PublicationCache, allowing to configure it.
32#[zenoh_macros::unstable]
33#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
34#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
35pub struct PublicationCacheBuilder<'a, 'b, 'c, const BACKGROUND: bool = false> {
36    session: &'a Session,
37    pub_key_expr: ZResult<KeyExpr<'b>>,
38    queryable_suffix: Option<ZResult<KeyExpr<'c>>>,
39    queryable_origin: Option<Locality>,
40    complete: Option<bool>,
41    history: usize,
42    resources_limit: Option<usize>,
43}
44
45#[allow(deprecated)]
46#[zenoh_macros::unstable]
47impl<'a, 'b, 'c> PublicationCacheBuilder<'a, 'b, 'c> {
48    pub(crate) fn new(
49        session: &'a Session,
50        pub_key_expr: ZResult<KeyExpr<'b>>,
51    ) -> PublicationCacheBuilder<'a, 'b, 'c> {
52        PublicationCacheBuilder {
53            session,
54            pub_key_expr,
55            queryable_suffix: None,
56            queryable_origin: None,
57            complete: None,
58            history: 1,
59            resources_limit: None,
60        }
61    }
62
63    /// Change the suffix used for queryable.
64    #[zenoh_macros::unstable]
65    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
66    pub fn queryable_suffix<TryIntoKeyExpr>(mut self, queryable_suffix: TryIntoKeyExpr) -> Self
67    where
68        TryIntoKeyExpr: TryInto<KeyExpr<'c>>,
69        <TryIntoKeyExpr as TryInto<KeyExpr<'c>>>::Error: Into<Error>,
70    {
71        self.queryable_suffix = Some(queryable_suffix.try_into().map_err(Into::into));
72        self
73    }
74
75    /// Restrict the matching queries that will be receive by this [`PublicationCache`]'s queryable
76    /// to the ones that have the given [`Locality`](zenoh::prelude::Locality).
77    #[zenoh_macros::unstable]
78    #[inline]
79    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
80    pub fn queryable_allowed_origin(mut self, origin: Locality) -> Self {
81        self.queryable_origin = Some(origin);
82        self
83    }
84
85    /// Set completeness option for the queryable.
86    #[zenoh_macros::unstable]
87    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
88    pub fn queryable_complete(mut self, complete: bool) -> Self {
89        self.complete = Some(complete);
90        self
91    }
92
93    /// Change the history size for each resource.
94    #[zenoh_macros::unstable]
95    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
96    pub fn history(mut self, history: usize) -> Self {
97        self.history = history;
98        self
99    }
100
101    /// Change the limit number of cached resources.
102    #[zenoh_macros::unstable]
103    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
104    pub fn resources_limit(mut self, limit: usize) -> Self {
105        self.resources_limit = Some(limit);
106        self
107    }
108
109    #[zenoh_macros::unstable]
110    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
111    pub fn background(self) -> PublicationCacheBuilder<'a, 'b, 'c, true> {
112        PublicationCacheBuilder {
113            session: self.session,
114            pub_key_expr: self.pub_key_expr,
115            queryable_suffix: self.queryable_suffix,
116            queryable_origin: self.queryable_origin,
117            complete: self.complete,
118            history: self.history,
119            resources_limit: self.resources_limit,
120        }
121    }
122}
123
124#[zenoh_macros::unstable]
125#[allow(deprecated)]
126impl Resolvable for PublicationCacheBuilder<'_, '_, '_> {
127    type To = ZResult<PublicationCache>;
128}
129
130#[zenoh_macros::unstable]
131#[allow(deprecated)]
132impl Wait for PublicationCacheBuilder<'_, '_, '_> {
133    #[zenoh_macros::unstable]
134    fn wait(self) -> <Self as Resolvable>::To {
135        PublicationCache::new(self)
136    }
137}
138
139#[zenoh_macros::unstable]
140#[allow(deprecated)]
141impl IntoFuture for PublicationCacheBuilder<'_, '_, '_> {
142    type Output = <Self as Resolvable>::To;
143    type IntoFuture = Ready<<Self as Resolvable>::To>;
144
145    #[zenoh_macros::unstable]
146    fn into_future(self) -> Self::IntoFuture {
147        std::future::ready(self.wait())
148    }
149}
150
151#[zenoh_macros::unstable]
152#[allow(deprecated)]
153impl Resolvable for PublicationCacheBuilder<'_, '_, '_, true> {
154    type To = ZResult<()>;
155}
156
157#[zenoh_macros::unstable]
158#[allow(deprecated)]
159impl Wait for PublicationCacheBuilder<'_, '_, '_, true> {
160    #[zenoh_macros::unstable]
161    fn wait(self) -> <Self as Resolvable>::To {
162        PublicationCache::new(self).map(|_| ())
163    }
164}
165
166#[zenoh_macros::unstable]
167#[allow(deprecated)]
168impl IntoFuture for PublicationCacheBuilder<'_, '_, '_, true> {
169    type Output = <Self as Resolvable>::To;
170    type IntoFuture = Ready<<Self as Resolvable>::To>;
171
172    #[zenoh_macros::unstable]
173    fn into_future(self) -> Self::IntoFuture {
174        std::future::ready(self.wait())
175    }
176}
177
178/// [`PublicationCache`].
179#[zenoh_macros::unstable]
180#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
181pub struct PublicationCache {
182    local_sub: Subscriber<FifoChannelHandler<Sample>>,
183    _queryable: Queryable<FifoChannelHandler<Query>>,
184    task: TerminatableTask,
185}
186
187#[zenoh_macros::unstable]
188#[allow(deprecated)]
189impl PublicationCache {
190    #[zenoh_macros::unstable]
191    fn new<const BACKGROUND: bool>(
192        conf: PublicationCacheBuilder<'_, '_, '_, BACKGROUND>,
193    ) -> ZResult<PublicationCache> {
194        let key_expr = conf.pub_key_expr?;
195        // the queryable_suffix (optional), and the key_expr for PublicationCache's queryable ("<pub_key_expr>/[<queryable_suffix>]")
196        let (queryable_suffix, queryable_key_expr): (Option<OwnedKeyExpr>, KeyExpr) =
197            match conf.queryable_suffix {
198                None => (None, key_expr.clone()),
199                Some(Ok(ke)) => {
200                    let queryable_key_expr = &key_expr / &ke;
201                    (Some(ke.into()), queryable_key_expr)
202                }
203                Some(Err(e)) => bail!("Invalid key expression for queryable_suffix: {}", e),
204            };
205        tracing::debug!(
206            "Create PublicationCache on {} with history={} resource_limit={:?}",
207            &key_expr,
208            conf.history,
209            conf.resources_limit
210        );
211
212        if conf.session.hlc().is_none() {
213            bail!(
214                "Failed requirement for PublicationCache on {}: \
215                     the 'timestamping' setting must be enabled in the Zenoh configuration",
216                key_expr,
217            )
218        }
219
220        // declare the local subscriber that will store the local publications
221        let mut local_sub = conf
222            .session
223            .declare_subscriber(&key_expr)
224            .allowed_origin(Locality::SessionLocal)
225            .wait()?;
226        if BACKGROUND {
227            local_sub.set_background(true);
228        }
229
230        // declare the queryable which returns the cached publications
231        let mut queryable = conf.session.declare_queryable(&queryable_key_expr);
232        if let Some(origin) = conf.queryable_origin {
233            queryable = queryable.allowed_origin(origin);
234        }
235        if let Some(complete) = conf.complete {
236            queryable = queryable.complete(complete);
237        }
238        let mut queryable = queryable.wait()?;
239        if BACKGROUND {
240            queryable.set_background(true);
241        }
242
243        // take local ownership of stuff to be moved into task
244        let sub_recv = local_sub.handler().clone();
245        let quer_recv = queryable.handler().clone();
246        let pub_key_expr = key_expr.into_owned();
247        let resources_limit = conf.resources_limit;
248        let history = conf.history;
249
250        // TODO(yuyuan): use CancellationToken to manage it
251        let token = TerminatableTask::create_cancellation_token();
252        let token2 = token.clone();
253        let task = TerminatableTask::spawn(
254            ZRuntime::Application,
255            async move {
256                let mut cache: HashMap<OwnedKeyExpr, VecDeque<Sample>> =
257                    HashMap::with_capacity(resources_limit.unwrap_or(32));
258                let limit = resources_limit.unwrap_or(usize::MAX);
259                loop {
260                    tokio::select! {
261                        // on publication received by the local subscriber, store it
262                        sample = sub_recv.recv_async() => {
263                            if let Ok(sample) = sample {
264                                let queryable_key_expr: KeyExpr<'_> = if let Some(suffix) = &queryable_suffix {
265                                    sample.key_expr() / suffix
266                                } else {
267                                    sample.key_expr().clone()
268                                };
269
270                                if let Some(queue) = cache.get_mut(queryable_key_expr.as_keyexpr()) {
271                                    if queue.len() >= history {
272                                        queue.pop_front();
273                                    }
274                                    queue.push_back(sample);
275                                } else if cache.len() >= limit {
276                                    tracing::error!("PublicationCache on {}: resource_limit exceeded - can't cache publication for a new resource",
277                                    pub_key_expr);
278                                } else {
279                                    let mut queue: VecDeque<Sample> = VecDeque::new();
280                                    queue.push_back(sample);
281                                    cache.insert(queryable_key_expr.into(), queue);
282                                }
283                            }
284                        },
285
286                        // on query, reply with cached content
287                        query = quer_recv.recv_async() => {
288                            if let Ok(query) = query {
289                                if !query.key_expr().as_str().contains('*') {
290                                    if let Some(queue) = cache.get(query.key_expr().as_keyexpr()) {
291                                        for sample in queue {
292                                            if let (Some(Ok(time_range)), Some(timestamp)) = (query.parameters().time_range(), sample.timestamp()) {
293                                                if !time_range.contains(timestamp.get_time().to_system_time()){
294                                                    continue;
295                                                }
296                                            }
297                                            if let Err(e) = query.reply_sample(sample.clone()).await {
298                                                tracing::warn!("Error replying to query: {}", e);
299                                            }
300                                        }
301                                    }
302                                } else {
303                                    for (key_expr, queue) in cache.iter() {
304                                        if query.key_expr().intersects(unsafe{ keyexpr::from_str_unchecked(key_expr) }) {
305                                            for sample in queue {
306                                                if let (Some(Ok(time_range)), Some(timestamp)) = (query.parameters().time_range(), sample.timestamp()) {
307                                                    if !time_range.contains(timestamp.get_time().to_system_time()){
308                                                        continue;
309                                                    }
310                                                }
311                                                if let Err(e) = query.reply_sample(sample.clone()).await {
312                                                    tracing::warn!("Error replying to query: {}", e);
313                                                }
314                                            }
315                                        }
316                                    }
317                                }
318                            }
319                        },
320                        _ = token2.cancelled() => return
321                    }
322                }
323            },
324            token,
325        );
326
327        Ok(PublicationCache {
328            local_sub,
329            _queryable: queryable,
330            task,
331        })
332    }
333
334    /// Undeclare this [`PublicationCache`]`.
335    #[zenoh_macros::unstable]
336    #[inline]
337    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
338    pub fn undeclare(self) -> impl Resolve<ZResult<()>> {
339        ResolveFuture::new(async move {
340            let PublicationCache {
341                _queryable,
342                local_sub,
343                mut task,
344            } = self;
345            _queryable.undeclare().await?;
346            local_sub.undeclare().await?;
347            task.terminate(Duration::from_secs(10));
348            Ok(())
349        })
350    }
351
352    #[zenoh_macros::internal]
353    #[zenoh_macros::unstable]
354    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
355    pub fn set_background(&mut self, background: bool) {
356        self.local_sub.set_background(background);
357        self._queryable.set_background(background);
358    }
359
360    #[zenoh_macros::unstable]
361    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
362    pub fn key_expr(&self) -> &KeyExpr<'static> {
363        self.local_sub.key_expr()
364    }
365}