zenoh_ext/
querying_subscriber.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::{
15    collections::{btree_map, BTreeMap, VecDeque},
16    convert::TryInto,
17    future::{IntoFuture, Ready},
18    mem::swap,
19    sync::{Arc, Mutex},
20    time::{Duration, SystemTime, UNIX_EPOCH},
21};
22
23use zenoh::{
24    handlers::{locked, Callback, DefaultHandler, IntoHandler},
25    internal::{zerror, zlock},
26    key_expr::KeyExpr,
27    pubsub::Subscriber,
28    query::{QueryConsolidation, QueryTarget, Reply, ReplyKeyExpr, Selector},
29    sample::{Locality, Sample, SampleBuilder},
30    time::Timestamp,
31    Error, Resolvable, Resolve, Result as ZResult, Session, Wait,
32};
33
34/// The space of keys to use in a [`FetchingSubscriber`].
35#[zenoh_macros::unstable]
36#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
37pub enum KeySpace {
38    User,
39    Liveliness,
40}
41
42/// The key space for user data.
43#[zenoh_macros::unstable]
44#[non_exhaustive]
45#[derive(Debug, Clone, Copy)]
46#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
47pub struct UserSpace;
48
49#[allow(deprecated)]
50impl From<UserSpace> for KeySpace {
51    fn from(_: UserSpace) -> Self {
52        KeySpace::User
53    }
54}
55
56/// The key space for liveliness tokens.
57#[zenoh_macros::unstable]
58#[non_exhaustive]
59#[derive(Debug, Clone, Copy)]
60#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
61pub struct LivelinessSpace;
62
63#[zenoh_macros::unstable]
64#[allow(deprecated)]
65impl From<LivelinessSpace> for KeySpace {
66    #[zenoh_macros::unstable]
67    fn from(_: LivelinessSpace) -> Self {
68        KeySpace::Liveliness
69    }
70}
71
72/// The builder of [`FetchingSubscriber`], allowing to configure it.
73#[zenoh_macros::unstable]
74#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
75#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
76pub struct QueryingSubscriberBuilder<'a, 'b, KeySpace, Handler, const BACKGROUND: bool = false> {
77    pub(crate) session: &'a Session,
78    pub(crate) key_expr: ZResult<KeyExpr<'b>>,
79    pub(crate) key_space: KeySpace,
80    pub(crate) origin: Locality,
81    pub(crate) query_selector: Option<ZResult<Selector<'b>>>,
82    pub(crate) query_target: QueryTarget,
83    pub(crate) query_consolidation: QueryConsolidation,
84    pub(crate) query_accept_replies: ReplyKeyExpr,
85    pub(crate) query_timeout: Duration,
86    pub(crate) handler: Handler,
87}
88
89#[zenoh_macros::unstable]
90#[allow(deprecated)]
91impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandler> {
92    /// Add callback to [`FetchingSubscriber`].
93    #[zenoh_macros::unstable]
94    #[inline]
95    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
96    pub fn callback<F>(
97        self,
98        callback: F,
99    ) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>>
100    where
101        F: Fn(Sample) + Send + Sync + 'static,
102    {
103        self.with(Callback::from(callback))
104    }
105
106    /// Add callback to [`FetchingSubscriber`].
107    ///
108    /// Using this guarantees that your callback will never be called concurrently.
109    /// If your callback is also accepted by the [`callback`](QueryingSubscriberBuilder::callback)
110    /// method, we suggest you use it instead of `callback_mut`.
111    ///
112    /// Subscriber will not be undeclared when dropped, with the callback running
113    /// in background until the session is closed.
114    #[zenoh_macros::unstable]
115    #[inline]
116    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
117    pub fn callback_mut<F>(
118        self,
119        callback: F,
120    ) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>>
121    where
122        F: FnMut(Sample) + Send + Sync + 'static,
123    {
124        self.callback(locked(callback))
125    }
126
127    /// Use the given handler to receive Samples.
128    #[zenoh_macros::unstable]
129    #[inline]
130    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
131    pub fn with<Handler>(
132        self,
133        handler: Handler,
134    ) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, Handler>
135    where
136        Handler: IntoHandler<Sample>,
137    {
138        let QueryingSubscriberBuilder {
139            session,
140            key_expr,
141            key_space,
142            origin,
143            query_selector,
144            query_target,
145            query_consolidation,
146            query_accept_replies,
147            query_timeout,
148            handler: _,
149        } = self;
150        QueryingSubscriberBuilder {
151            session,
152            key_expr,
153            key_space,
154            origin,
155            query_selector,
156            query_target,
157            query_consolidation,
158            query_accept_replies,
159            query_timeout,
160            handler,
161        }
162    }
163}
164
165#[zenoh_macros::unstable]
166#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
167#[allow(deprecated)]
168impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>> {
169    /// Make the subscriber to run in background until the session is closed.
170    ///
171    /// Background builder doesn't return a `FetchingSubscriber` object anymore.
172    #[zenoh_macros::unstable]
173    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
174    pub fn background(self) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>, true> {
175        QueryingSubscriberBuilder {
176            session: self.session,
177            key_expr: self.key_expr,
178            key_space: self.key_space,
179            origin: self.origin,
180            query_selector: self.query_selector,
181            query_target: self.query_target,
182            query_consolidation: self.query_consolidation,
183            query_accept_replies: self.query_accept_replies,
184            query_timeout: self.query_timeout,
185            handler: self.handler,
186        }
187    }
188}
189
190#[zenoh_macros::unstable]
191#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
192#[allow(deprecated)]
193impl<'b, Handler, const BACKGROUND: bool>
194    QueryingSubscriberBuilder<'_, 'b, UserSpace, Handler, BACKGROUND>
195{
196    ///
197    ///
198    /// Restrict the matching publications that will be receive by this [`Subscriber`]
199    /// to the ones that have the given [`Locality`](Locality).
200    #[zenoh_macros::unstable]
201    #[inline]
202    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
203    pub fn allowed_origin(mut self, origin: Locality) -> Self {
204        self.origin = origin;
205        self
206    }
207
208    /// Change the selector to be used for queries.
209    #[zenoh_macros::unstable]
210    #[inline]
211    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
212    pub fn query_selector<IntoSelector>(mut self, query_selector: IntoSelector) -> Self
213    where
214        IntoSelector: TryInto<Selector<'b>>,
215        <IntoSelector as TryInto<Selector<'b>>>::Error: Into<Error>,
216    {
217        self.query_selector = Some(query_selector.try_into().map_err(Into::into));
218        self
219    }
220
221    /// Change the target to be used for queries.
222    #[zenoh_macros::unstable]
223    #[inline]
224    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
225    pub fn query_target(mut self, query_target: QueryTarget) -> Self {
226        self.query_target = query_target;
227        self
228    }
229
230    /// Change the consolidation mode to be used for queries.
231    #[zenoh_macros::unstable]
232    #[inline]
233    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
234    pub fn query_consolidation<QC: Into<QueryConsolidation>>(
235        mut self,
236        query_consolidation: QC,
237    ) -> Self {
238        self.query_consolidation = query_consolidation.into();
239        self
240    }
241
242    /// Change the accepted replies for queries.
243    #[zenoh_macros::unstable]
244    #[inline]
245    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
246    pub fn query_accept_replies(mut self, accept_replies: ReplyKeyExpr) -> Self {
247        self.query_accept_replies = accept_replies;
248        self
249    }
250}
251
252#[zenoh_macros::unstable]
253#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
254#[allow(deprecated)]
255impl<'a, 'b, KeySpace, Handler, const BACKGROUND: bool>
256    QueryingSubscriberBuilder<'a, 'b, KeySpace, Handler, BACKGROUND>
257{
258    /// Change the timeout to be used for queries.
259    #[zenoh_macros::unstable]
260    #[inline]
261    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
262    pub fn query_timeout(mut self, query_timeout: Duration) -> Self {
263        self.query_timeout = query_timeout;
264        self
265    }
266
267    #[zenoh_macros::unstable]
268    #[allow(clippy::type_complexity)]
269    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
270    fn into_fetching_subscriber_builder(
271        self,
272    ) -> ZResult<
273        FetchingSubscriberBuilder<
274            'a,
275            'b,
276            KeySpace,
277            Handler,
278            impl FnOnce(Box<dyn Fn(Reply) + Send + Sync>) -> ZResult<()>,
279            Reply,
280            BACKGROUND,
281        >,
282    >
283    where
284        KeySpace: Into<self::KeySpace> + Clone,
285        Handler: IntoHandler<Sample>,
286        Handler::Handler: Send,
287    {
288        let session = self.session.clone();
289        let key_expr = self.key_expr?.into_owned();
290        let key_space = self.key_space.clone().into();
291        let query_selector = match self.query_selector {
292            Some(s) => Some(s?.into_owned()),
293            None => None,
294        };
295        let query_target = self.query_target;
296        let query_consolidation = self.query_consolidation;
297        let query_accept_replies = self.query_accept_replies;
298        let query_timeout = self.query_timeout;
299        Ok(FetchingSubscriberBuilder {
300            session: self.session,
301            key_expr: Ok(key_expr.clone()),
302            key_space: self.key_space,
303            origin: self.origin,
304            fetch: move |cb| match key_space {
305                self::KeySpace::User => match query_selector {
306                    Some(s) => session.get(s),
307                    None => session.get(key_expr),
308                }
309                .callback(cb)
310                .target(query_target)
311                .consolidation(query_consolidation)
312                .accept_replies(query_accept_replies)
313                .timeout(query_timeout)
314                .wait(),
315                self::KeySpace::Liveliness => session
316                    .liveliness()
317                    .get(key_expr)
318                    .callback(cb)
319                    .timeout(query_timeout)
320                    .wait(),
321            },
322            handler: self.handler,
323            phantom: std::marker::PhantomData,
324        })
325    }
326}
327
328#[zenoh_macros::unstable]
329#[allow(deprecated)]
330impl<KeySpace, Handler> Resolvable for QueryingSubscriberBuilder<'_, '_, KeySpace, Handler>
331where
332    Handler: IntoHandler<Sample>,
333    Handler::Handler: Send,
334{
335    type To = ZResult<FetchingSubscriber<Handler::Handler>>;
336}
337
338#[zenoh_macros::unstable]
339#[allow(deprecated)]
340impl<KeySpace, Handler> Wait for QueryingSubscriberBuilder<'_, '_, KeySpace, Handler>
341where
342    KeySpace: Into<self::KeySpace> + Clone,
343    Handler: IntoHandler<Sample> + Send,
344    Handler::Handler: Send,
345{
346    #[zenoh_macros::unstable]
347    fn wait(self) -> <Self as Resolvable>::To {
348        self.into_fetching_subscriber_builder()?.wait()
349    }
350}
351
352#[zenoh_macros::unstable]
353#[allow(deprecated)]
354impl<KeySpace, Handler> IntoFuture for QueryingSubscriberBuilder<'_, '_, KeySpace, Handler>
355where
356    KeySpace: Into<self::KeySpace> + Clone,
357    Handler: IntoHandler<Sample> + Send,
358    Handler::Handler: Send,
359{
360    type Output = <Self as Resolvable>::To;
361    type IntoFuture = Ready<<Self as Resolvable>::To>;
362
363    #[zenoh_macros::unstable]
364    fn into_future(self) -> Self::IntoFuture {
365        std::future::ready(self.wait())
366    }
367}
368
369#[zenoh_macros::unstable]
370#[allow(deprecated)]
371impl<KeySpace> Resolvable for QueryingSubscriberBuilder<'_, '_, KeySpace, Callback<Sample>, true> {
372    type To = ZResult<()>;
373}
374
375#[zenoh_macros::unstable]
376#[allow(deprecated)]
377impl<KeySpace> Wait for QueryingSubscriberBuilder<'_, '_, KeySpace, Callback<Sample>, true>
378where
379    KeySpace: Into<self::KeySpace> + Clone,
380{
381    #[zenoh_macros::unstable]
382    fn wait(self) -> <Self as Resolvable>::To {
383        self.into_fetching_subscriber_builder()?.wait()
384    }
385}
386
387#[zenoh_macros::unstable]
388#[allow(deprecated)]
389impl<KeySpace> IntoFuture for QueryingSubscriberBuilder<'_, '_, KeySpace, Callback<Sample>, true>
390where
391    KeySpace: Into<self::KeySpace> + Clone,
392{
393    type Output = <Self as Resolvable>::To;
394    type IntoFuture = Ready<<Self as Resolvable>::To>;
395
396    #[zenoh_macros::unstable]
397    fn into_future(self) -> Self::IntoFuture {
398        std::future::ready(self.wait())
399    }
400}
401
402// Collects samples in their Timestamp order, if any,
403// and ignores repeating samples with duplicate timestamps.
404// Samples without Timestamps are kept in a separate Vector,
405// and are considered as older than any sample with Timestamp.
406struct MergeQueue {
407    untimestamped: VecDeque<Sample>,
408    timestamped: BTreeMap<Timestamp, Sample>,
409}
410
411impl MergeQueue {
412    fn new() -> Self {
413        MergeQueue {
414            untimestamped: VecDeque::new(),
415            timestamped: BTreeMap::new(),
416        }
417    }
418
419    fn len(&self) -> usize {
420        self.untimestamped.len() + self.timestamped.len()
421    }
422
423    fn push(&mut self, sample: Sample) {
424        if let Some(ts) = sample.timestamp() {
425            self.timestamped.entry(*ts).or_insert(sample);
426        } else {
427            self.untimestamped.push_back(sample);
428        }
429    }
430
431    fn drain(&mut self) -> MergeQueueValues {
432        let mut vec = VecDeque::new();
433        let mut queue = BTreeMap::new();
434        swap(&mut self.untimestamped, &mut vec);
435        swap(&mut self.timestamped, &mut queue);
436        MergeQueueValues {
437            untimestamped: vec,
438            timestamped: queue.into_values(),
439        }
440    }
441}
442
443struct MergeQueueValues {
444    untimestamped: VecDeque<Sample>,
445    timestamped: btree_map::IntoValues<Timestamp, Sample>,
446}
447
448impl Iterator for MergeQueueValues {
449    type Item = Sample;
450    fn next(&mut self) -> Option<Self::Item> {
451        self.untimestamped
452            .pop_front()
453            .or_else(|| self.timestamped.next())
454    }
455}
456
457struct InnerState {
458    pending_fetches: u64,
459    merge_queue: MergeQueue,
460}
461
462/// The builder of [`FetchingSubscriber`], allowing to configure it.
463#[zenoh_macros::unstable]
464#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
465#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
466#[allow(deprecated)]
467pub struct FetchingSubscriberBuilder<
468    'a,
469    'b,
470    KeySpace,
471    Handler,
472    Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
473    TryIntoSample,
474    const BACKGROUND: bool = false,
475> where
476    TryIntoSample: ExtractSample,
477{
478    pub(crate) session: &'a Session,
479    pub(crate) key_expr: ZResult<KeyExpr<'b>>,
480    pub(crate) key_space: KeySpace,
481    pub(crate) origin: Locality,
482    pub(crate) fetch: Fetch,
483    pub(crate) handler: Handler,
484    pub(crate) phantom: std::marker::PhantomData<TryIntoSample>,
485}
486
487#[zenoh_macros::unstable]
488#[allow(deprecated)]
489impl<
490        'a,
491        KeySpace,
492        Handler,
493        Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
494        TryIntoSample,
495        const BACKGROUND: bool,
496    > FetchingSubscriberBuilder<'a, '_, KeySpace, Handler, Fetch, TryIntoSample, BACKGROUND>
497where
498    TryIntoSample: ExtractSample,
499{
500    #[zenoh_macros::unstable]
501    fn with_static_keys(
502        self,
503    ) -> FetchingSubscriberBuilder<'a, 'static, KeySpace, Handler, Fetch, TryIntoSample> {
504        FetchingSubscriberBuilder {
505            session: self.session,
506            key_expr: self.key_expr.map(|s| s.into_owned()),
507            key_space: self.key_space,
508            origin: self.origin,
509            fetch: self.fetch,
510            handler: self.handler,
511            phantom: std::marker::PhantomData,
512        }
513    }
514}
515
516#[zenoh_macros::unstable]
517#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
518#[allow(deprecated)]
519impl<
520        'a,
521        'b,
522        KeySpace,
523        Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
524        TryIntoSample,
525    > FetchingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandler, Fetch, TryIntoSample>
526where
527    TryIntoSample: ExtractSample,
528{
529    /// Add callback to [`FetchingSubscriber`].
530    #[zenoh_macros::unstable]
531    #[inline]
532    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
533    pub fn callback<F>(
534        self,
535        callback: F,
536    ) -> FetchingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>, Fetch, TryIntoSample>
537    where
538        F: Fn(Sample) + Send + Sync + 'static,
539    {
540        self.with(Callback::from(callback))
541    }
542
543    /// Add callback to [`FetchingSubscriber`].
544    ///
545    /// Using this guarantees that your callback will never be called concurrently.
546    /// If your callback is also accepted by the [`callback`](FetchingSubscriberBuilder::callback)
547    /// method, we suggest you use it instead of `callback_mut`.
548    ///
549    /// Subscriber will not be undeclared when dropped, with the callback running
550    /// in background until the session is closed.
551    #[zenoh_macros::unstable]
552    #[inline]
553    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
554    pub fn callback_mut<F>(
555        self,
556        callback: F,
557    ) -> FetchingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>, Fetch, TryIntoSample>
558    where
559        F: FnMut(Sample) + Send + Sync + 'static,
560    {
561        self.callback(locked(callback))
562    }
563
564    /// Use the given handler to receive Samples.
565    #[zenoh_macros::unstable]
566    #[inline]
567    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
568    pub fn with<Handler>(
569        self,
570        handler: Handler,
571    ) -> FetchingSubscriberBuilder<'a, 'b, KeySpace, Handler, Fetch, TryIntoSample>
572    where
573        Handler: IntoHandler<Sample>,
574    {
575        let FetchingSubscriberBuilder {
576            session,
577            key_expr,
578            key_space,
579            origin,
580            fetch,
581            handler: _,
582            phantom,
583        } = self;
584        FetchingSubscriberBuilder {
585            session,
586            key_expr,
587            key_space,
588            origin,
589            fetch,
590            handler,
591            phantom,
592        }
593    }
594}
595
596#[zenoh_macros::unstable]
597#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
598#[allow(deprecated)]
599impl<
600        'a,
601        'b,
602        KeySpace,
603        Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
604        TryIntoSample,
605    > FetchingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>, Fetch, TryIntoSample>
606where
607    TryIntoSample: ExtractSample,
608{
609    /// Make the subscriber to run in background until the session is closed.
610    ///
611    /// Background builder doesn't return a `FetchingSubscriber` object anymore.
612    #[zenoh_macros::unstable]
613    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
614    pub fn background(
615        self,
616    ) -> FetchingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>, Fetch, TryIntoSample, true>
617    {
618        FetchingSubscriberBuilder {
619            session: self.session,
620            key_expr: self.key_expr,
621            key_space: self.key_space,
622            origin: self.origin,
623            fetch: self.fetch,
624            handler: self.handler,
625            phantom: self.phantom,
626        }
627    }
628}
629
630#[zenoh_macros::unstable]
631#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
632#[allow(deprecated)]
633impl<
634        Handler,
635        Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
636        TryIntoSample,
637        const BACKGROUND: bool,
638    > FetchingSubscriberBuilder<'_, '_, UserSpace, Handler, Fetch, TryIntoSample, BACKGROUND>
639where
640    TryIntoSample: ExtractSample,
641{
642    /// Restrict the matching publications that will be received by this [`FetchingSubscriber`]
643    /// to the ones that have the given [`Locality`](Locality).
644    #[zenoh_macros::unstable]
645    #[inline]
646    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
647    pub fn allowed_origin(mut self, origin: Locality) -> Self {
648        self.origin = origin;
649        self
650    }
651}
652
653#[zenoh_macros::unstable]
654#[allow(deprecated)]
655impl<
656        KeySpace,
657        Handler,
658        Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
659        TryIntoSample,
660    > Resolvable for FetchingSubscriberBuilder<'_, '_, KeySpace, Handler, Fetch, TryIntoSample>
661where
662    Handler: IntoHandler<Sample>,
663    Handler::Handler: Send,
664    TryIntoSample: ExtractSample,
665{
666    type To = ZResult<FetchingSubscriber<Handler::Handler>>;
667}
668
669#[zenoh_macros::unstable]
670#[allow(deprecated)]
671impl<
672        KeySpace,
673        Handler,
674        Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
675        TryIntoSample,
676    > Wait for FetchingSubscriberBuilder<'_, '_, KeySpace, Handler, Fetch, TryIntoSample>
677where
678    KeySpace: Into<self::KeySpace>,
679    Handler: IntoHandler<Sample> + Send,
680    Handler::Handler: Send,
681    TryIntoSample: ExtractSample + Send + Sync,
682{
683    #[zenoh_macros::unstable]
684    fn wait(self) -> <Self as Resolvable>::To {
685        FetchingSubscriber::new(self.with_static_keys())
686    }
687}
688
689#[zenoh_macros::unstable]
690#[allow(deprecated)]
691impl<
692        KeySpace,
693        Handler,
694        Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
695        TryIntoSample,
696    > IntoFuture for FetchingSubscriberBuilder<'_, '_, KeySpace, Handler, Fetch, TryIntoSample>
697where
698    KeySpace: Into<self::KeySpace>,
699    Handler: IntoHandler<Sample> + Send,
700    Handler::Handler: Send,
701    TryIntoSample: ExtractSample + Send + Sync,
702{
703    type Output = <Self as Resolvable>::To;
704    type IntoFuture = Ready<<Self as Resolvable>::To>;
705
706    #[zenoh_macros::unstable]
707    fn into_future(self) -> Self::IntoFuture {
708        std::future::ready(self.wait())
709    }
710}
711
712#[zenoh_macros::unstable]
713#[allow(deprecated)]
714impl<
715        KeySpace,
716        Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
717        TryIntoSample,
718    > Resolvable
719    for FetchingSubscriberBuilder<'_, '_, KeySpace, Callback<Sample>, Fetch, TryIntoSample, true>
720where
721    TryIntoSample: ExtractSample,
722{
723    type To = ZResult<()>;
724}
725
726#[zenoh_macros::unstable]
727#[allow(deprecated)]
728impl<
729        KeySpace,
730        Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
731        TryIntoSample,
732    > Wait
733    for FetchingSubscriberBuilder<'_, '_, KeySpace, Callback<Sample>, Fetch, TryIntoSample, true>
734where
735    KeySpace: Into<self::KeySpace>,
736    TryIntoSample: ExtractSample + Send + Sync,
737{
738    #[zenoh_macros::unstable]
739    fn wait(self) -> <Self as Resolvable>::To {
740        FetchingSubscriber::new(self.with_static_keys())?
741            .subscriber
742            .set_background(true);
743        Ok(())
744    }
745}
746
747#[zenoh_macros::unstable]
748#[allow(deprecated)]
749impl<
750        KeySpace,
751        Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
752        TryIntoSample,
753    > IntoFuture
754    for FetchingSubscriberBuilder<'_, '_, KeySpace, Callback<Sample>, Fetch, TryIntoSample, true>
755where
756    KeySpace: Into<self::KeySpace>,
757    TryIntoSample: ExtractSample + Send + Sync,
758{
759    type Output = <Self as Resolvable>::To;
760    type IntoFuture = Ready<<Self as Resolvable>::To>;
761
762    #[zenoh_macros::unstable]
763    fn into_future(self) -> Self::IntoFuture {
764        std::future::ready(self.wait())
765    }
766}
767
768/// A Subscriber that will run the given user defined `fetch` function at startup.
769///
770/// The user defined `fetch` function should fetch some samples and return them through the callback function
771/// (it could typically be a Session::get()). Those samples will be merged with the received publications and made available in the receiver.
772/// Later on, new fetches can be performed again, calling [`FetchingSubscriber::fetch()`](super::FetchingSubscriber::fetch()).
773///
774/// A typical usage of the `FetchingSubscriber` is to retrieve publications that were made in the past, but stored in some zenoh Storage.
775///
776/// # Examples
777/// ```no_run
778/// # #[tokio::main]
779/// # async fn main() {
780/// use zenoh::Wait;
781/// use zenoh_ext::*;
782///
783/// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
784/// let subscriber = session
785///     .declare_subscriber("key/expr")
786///     .fetching( |cb| {
787///         session
788///             .get("key/expr")
789///             .callback(cb)
790///             .wait()
791///     })
792///     .await
793///     .unwrap();
794/// while let Ok(sample) = subscriber.recv_async().await {
795///     println!("Received: {:?}", sample);
796/// }
797/// # }
798/// ```
799#[zenoh_macros::unstable]
800#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
801pub struct FetchingSubscriber<Handler> {
802    subscriber: Subscriber<()>,
803    callback: Callback<Sample>,
804    state: Arc<Mutex<InnerState>>,
805    handler: Handler,
806}
807
808#[zenoh_macros::unstable]
809#[allow(deprecated)]
810impl<Handler> std::ops::Deref for FetchingSubscriber<Handler> {
811    type Target = Handler;
812    #[zenoh_macros::unstable]
813    fn deref(&self) -> &Self::Target {
814        &self.handler
815    }
816}
817
818#[zenoh_macros::unstable]
819#[allow(deprecated)]
820impl<Handler> std::ops::DerefMut for FetchingSubscriber<Handler> {
821    #[zenoh_macros::unstable]
822    fn deref_mut(&mut self) -> &mut Self::Target {
823        &mut self.handler
824    }
825}
826
827#[zenoh_macros::unstable]
828#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
829#[allow(deprecated)]
830impl<Handler> FetchingSubscriber<Handler> {
831    fn new<
832        'a,
833        KeySpace,
834        InputHandler,
835        Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
836        TryIntoSample,
837    >(
838        conf: FetchingSubscriberBuilder<'a, 'a, KeySpace, InputHandler, Fetch, TryIntoSample>,
839    ) -> ZResult<Self>
840    where
841        KeySpace: Into<self::KeySpace>,
842        InputHandler: IntoHandler<Sample, Handler = Handler> + Send,
843        TryIntoSample: ExtractSample + Send + Sync,
844    {
845        let session_id = conf.session.zid();
846
847        let state = Arc::new(Mutex::new(InnerState {
848            pending_fetches: 0,
849            merge_queue: MergeQueue::new(),
850        }));
851        let (callback, receiver) = conf.handler.into_handler();
852
853        let sub_callback = {
854            let state = state.clone();
855            let callback = callback.clone();
856            move |s| {
857                let state = &mut zlock!(state);
858                if state.pending_fetches == 0 {
859                    callback.call(s);
860                } else {
861                    tracing::trace!(
862                        "Sample received while fetch in progress: push it to merge_queue"
863                    );
864
865                    // ensure the sample has a timestamp, thus it will always be sorted into the MergeQueue
866                    // after any timestamped Sample possibly coming from a fetch reply.
867                    let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().into(); // UNIX_EPOCH is Returns a Timespec::zero(), Unwrap Should be permissible here
868                    let timestamp = s
869                        .timestamp()
870                        .cloned()
871                        .unwrap_or(Timestamp::new(now, session_id.into()));
872                    state
873                        .merge_queue
874                        .push(SampleBuilder::from(s).timestamp(timestamp).into());
875                }
876            }
877        };
878
879        let key_expr = conf.key_expr?;
880
881        // register fetch handler
882        let handler = register_handler(state.clone(), callback.clone());
883        // declare subscriber
884        let subscriber = match conf.key_space.into() {
885            self::KeySpace::User => conf
886                .session
887                .declare_subscriber(&key_expr)
888                .callback(sub_callback)
889                .allowed_origin(conf.origin)
890                .wait()?,
891            self::KeySpace::Liveliness => conf
892                .session
893                .liveliness()
894                .declare_subscriber(&key_expr)
895                .callback(sub_callback)
896                .wait()?,
897        };
898
899        let fetch_subscriber = FetchingSubscriber {
900            subscriber,
901            callback,
902            state,
903            handler: receiver,
904        };
905
906        // run fetch
907        run_fetch(conf.fetch, handler)?;
908
909        Ok(fetch_subscriber)
910    }
911
912    /// Undeclare this [`FetchingSubscriber`]`.
913    #[zenoh_macros::unstable]
914    #[inline]
915    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
916    pub fn undeclare(self) -> impl Resolve<ZResult<()>> {
917        self.subscriber.undeclare()
918    }
919
920    #[zenoh_macros::unstable]
921    #[zenoh_macros::internal]
922    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
923    pub fn set_background(&mut self, background: bool) {
924        self.subscriber.set_background(background)
925    }
926
927    /// Return the key expression of this FetchingSubscriber
928    #[zenoh_macros::unstable]
929    #[inline]
930    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
931    pub fn key_expr(&self) -> &KeyExpr<'static> {
932        self.subscriber.key_expr()
933    }
934
935    /// Perform an additional `fetch`.
936    ///
937    /// The provided `fetch` function should fetch some samples and return them through the callback function
938    /// (it could typically be a Session::get()). Those samples will be merged with the received publications and made available in the receiver.
939    ///
940    /// # Examples
941    /// ```no_run
942    /// # #[tokio::main]
943    /// # async fn main() {
944    /// use zenoh::Wait;
945    /// use zenoh_ext::*;
946    ///
947    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
948    /// let mut subscriber = session
949    ///     .declare_subscriber("key/expr")
950    ///     .fetching( |cb| {
951    ///         session
952    ///             .get("key/expr")
953    ///             .callback(cb)
954    ///             .wait()
955    ///     })
956    ///     .await
957    ///     .unwrap();
958    ///
959    /// // perform an additional fetch
960    /// subscriber
961    ///     .fetch( |cb| {
962    ///         session
963    ///             .get("key/expr")
964    ///             .callback(cb)
965    ///             .wait()
966    ///     })
967    ///     .await
968    ///     .unwrap();
969    /// # }
970    /// ```
971    #[zenoh_macros::unstable]
972    #[inline]
973    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
974    pub fn fetch<
975        Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
976        TryIntoSample,
977    >(
978        &self,
979        fetch: Fetch,
980    ) -> impl Resolve<ZResult<()>>
981    where
982        TryIntoSample: ExtractSample + Send + Sync,
983    {
984        FetchBuilder {
985            fetch,
986            phantom: std::marker::PhantomData,
987            state: self.state.clone(),
988            callback: self.callback.clone(),
989        }
990    }
991}
992
993struct RepliesHandler {
994    state: Arc<Mutex<InnerState>>,
995    callback: Callback<Sample>,
996}
997
998impl Drop for RepliesHandler {
999    fn drop(&mut self) {
1000        let mut state = zlock!(self.state);
1001        state.pending_fetches -= 1;
1002        tracing::trace!(
1003            "Fetch done - {} fetches still in progress",
1004            state.pending_fetches
1005        );
1006        if state.pending_fetches == 0 {
1007            tracing::debug!(
1008                "All fetches done. Replies and live publications merged - {} samples to propagate",
1009                state.merge_queue.len()
1010            );
1011            for s in state.merge_queue.drain() {
1012                self.callback.call(s);
1013            }
1014        }
1015    }
1016}
1017
1018/// The builder returned by [`FetchingSubscriber::fetch`](FetchingSubscriber::fetch).
1019///
1020/// # Examples
1021/// ```no_run
1022/// # #[tokio::main]
1023/// # async fn main() {
1024/// # use zenoh::Wait;
1025/// # use zenoh_ext::*;
1026/// #
1027/// # let session = zenoh::open(zenoh::Config::default()).await.unwrap();
1028/// # let mut fetching_subscriber = session
1029/// #     .declare_subscriber("key/expr")
1030/// #     .fetching( |cb| {
1031/// #         session
1032/// #             .get("key/expr")
1033/// #             .callback(cb)
1034/// #            .wait()
1035/// #     })
1036/// #     .await
1037/// #     .unwrap();
1038/// #
1039/// fetching_subscriber
1040///     .fetch( |cb| {
1041///         session
1042///             .get("key/expr")
1043///             .callback(cb)
1044///             .wait()
1045///     })
1046///     .await
1047///     .unwrap();
1048/// # }
1049/// ```
1050#[zenoh_macros::unstable]
1051#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
1052#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
1053#[allow(deprecated)]
1054pub struct FetchBuilder<
1055    Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
1056    TryIntoSample,
1057> where
1058    TryIntoSample: ExtractSample,
1059{
1060    fetch: Fetch,
1061    phantom: std::marker::PhantomData<TryIntoSample>,
1062    state: Arc<Mutex<InnerState>>,
1063    callback: Callback<Sample>,
1064}
1065
1066#[zenoh_macros::unstable]
1067#[allow(deprecated)]
1068impl<Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>, TryIntoSample>
1069    Resolvable for FetchBuilder<Fetch, TryIntoSample>
1070where
1071    TryIntoSample: ExtractSample,
1072{
1073    type To = ZResult<()>;
1074}
1075
1076#[zenoh_macros::unstable]
1077#[allow(deprecated)]
1078impl<Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>, TryIntoSample> Wait
1079    for FetchBuilder<Fetch, TryIntoSample>
1080where
1081    TryIntoSample: ExtractSample,
1082{
1083    #[zenoh_macros::unstable]
1084    fn wait(self) -> <Self as Resolvable>::To {
1085        let handler = register_handler(self.state, self.callback);
1086        run_fetch(self.fetch, handler)
1087    }
1088}
1089
1090#[zenoh_macros::unstable]
1091#[allow(deprecated)]
1092impl<Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>, TryIntoSample>
1093    IntoFuture for FetchBuilder<Fetch, TryIntoSample>
1094where
1095    TryIntoSample: ExtractSample,
1096{
1097    type Output = <Self as Resolvable>::To;
1098    type IntoFuture = Ready<<Self as Resolvable>::To>;
1099
1100    #[zenoh_macros::unstable]
1101    fn into_future(self) -> Self::IntoFuture {
1102        std::future::ready(self.wait())
1103    }
1104}
1105
1106fn register_handler(state: Arc<Mutex<InnerState>>, callback: Callback<Sample>) -> RepliesHandler {
1107    zlock!(state).pending_fetches += 1;
1108    // pending fetches will be decremented in RepliesHandler drop()
1109    RepliesHandler { state, callback }
1110}
1111
1112#[zenoh_macros::unstable]
1113#[allow(deprecated)]
1114fn run_fetch<
1115    Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
1116    TryIntoSample,
1117>(
1118    fetch: Fetch,
1119    handler: RepliesHandler,
1120) -> ZResult<()>
1121where
1122    TryIntoSample: ExtractSample,
1123{
1124    tracing::debug!("Fetch data for FetchingSubscriber");
1125    (fetch)(Box::new(move |s: TryIntoSample| match s.extract() {
1126        Ok(s) => {
1127            let mut state = zlock!(handler.state);
1128            tracing::trace!("Fetched sample received: push it to merge_queue");
1129            state.merge_queue.push(s);
1130        }
1131        Err(e) => tracing::debug!("Received error fetching data: {}", e),
1132    }))
1133}
1134
1135/// [`ExtractSample`].
1136#[zenoh_macros::unstable]
1137#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
1138#[allow(deprecated)]
1139pub trait ExtractSample {
1140    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
1141    fn extract(self) -> ZResult<Sample>;
1142}
1143
1144#[allow(deprecated)]
1145impl ExtractSample for Reply {
1146    fn extract(self) -> ZResult<Sample> {
1147        self.into_result().map_err(|e| zerror!("{:?}", e).into())
1148    }
1149}