Skip to main content

zenoh_ext/
querying_subscriber.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::{
15    collections::{btree_map, BTreeMap, VecDeque},
16    convert::TryInto,
17    future::{IntoFuture, Ready},
18    mem::swap,
19    sync::{Arc, Mutex},
20    time::{Duration, SystemTime, UNIX_EPOCH},
21};
22
23use zenoh::{
24    handlers::{locked, Callback, DefaultHandler, IntoHandler},
25    internal::{zerror, zlock},
26    key_expr::KeyExpr,
27    pubsub::Subscriber,
28    query::{QueryConsolidation, QueryTarget, Reply, ReplyKeyExpr, Selector},
29    sample::{Locality, Sample, SampleBuilder},
30    time::Timestamp,
31    Error, Resolvable, Resolve, Result as ZResult, Session, Wait,
32};
33
34/// The space of keys to use in a [`FetchingSubscriber`].
35#[zenoh_macros::unstable]
36#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
37pub enum KeySpace {
38    User,
39    Liveliness,
40}
41
42/// The key space for user data.
43#[zenoh_macros::unstable]
44#[non_exhaustive]
45#[derive(Debug, Clone, Copy)]
46#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
47pub struct UserSpace;
48
49#[allow(deprecated)]
50impl From<UserSpace> for KeySpace {
51    fn from(_: UserSpace) -> Self {
52        KeySpace::User
53    }
54}
55
56/// The key space for liveliness tokens.
57#[zenoh_macros::unstable]
58#[non_exhaustive]
59#[derive(Debug, Clone, Copy)]
60#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
61pub struct LivelinessSpace;
62
63#[zenoh_macros::unstable]
64#[allow(deprecated)]
65impl From<LivelinessSpace> for KeySpace {
66    #[zenoh_macros::unstable]
67    fn from(_: LivelinessSpace) -> Self {
68        KeySpace::Liveliness
69    }
70}
71
72/// The builder of [`FetchingSubscriber`], allowing to configure it.
73#[zenoh_macros::unstable]
74#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
75#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
76pub struct QueryingSubscriberBuilder<'a, 'b, KeySpace, Handler, const BACKGROUND: bool = false> {
77    pub(crate) session: &'a Session,
78    pub(crate) key_expr: ZResult<KeyExpr<'b>>,
79    pub(crate) key_space: KeySpace,
80    pub(crate) origin: Locality,
81    pub(crate) query_selector: Option<ZResult<Selector<'b>>>,
82    pub(crate) query_target: QueryTarget,
83    pub(crate) query_consolidation: QueryConsolidation,
84    pub(crate) query_accept_replies: ReplyKeyExpr,
85    pub(crate) query_timeout: Duration,
86    pub(crate) handler: Handler,
87}
88
89#[zenoh_macros::unstable]
90#[allow(deprecated)]
91impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandler> {
92    /// Add callback to [`FetchingSubscriber`].
93    #[zenoh_macros::unstable]
94    #[inline]
95    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
96    pub fn callback<F>(
97        self,
98        callback: F,
99    ) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>>
100    where
101        F: Fn(Sample) + Send + Sync + 'static,
102    {
103        self.with(Callback::from(callback))
104    }
105
106    /// Add callback to [`FetchingSubscriber`].
107    ///
108    /// Using this guarantees that your callback will never be called concurrently.
109    /// If your callback is also accepted by the [`callback`](QueryingSubscriberBuilder::callback)
110    /// method, we suggest you use it instead of `callback_mut`.
111    ///
112    /// Subscriber will not be undeclared when dropped, with the callback running
113    /// in background until the session is closed.
114    #[zenoh_macros::unstable]
115    #[inline]
116    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
117    pub fn callback_mut<F>(
118        self,
119        callback: F,
120    ) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>>
121    where
122        F: FnMut(Sample) + Send + Sync + 'static,
123    {
124        self.callback(locked(callback))
125    }
126
127    /// Use the given handler to receive Samples.
128    #[zenoh_macros::unstable]
129    #[inline]
130    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
131    pub fn with<Handler>(
132        self,
133        handler: Handler,
134    ) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, Handler>
135    where
136        Handler: IntoHandler<Sample>,
137    {
138        let QueryingSubscriberBuilder {
139            session,
140            key_expr,
141            key_space,
142            origin,
143            query_selector,
144            query_target,
145            query_consolidation,
146            query_accept_replies,
147            query_timeout,
148            handler: _,
149        } = self;
150        QueryingSubscriberBuilder {
151            session,
152            key_expr,
153            key_space,
154            origin,
155            query_selector,
156            query_target,
157            query_consolidation,
158            query_accept_replies,
159            query_timeout,
160            handler,
161        }
162    }
163}
164
165#[zenoh_macros::unstable]
166#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
167#[allow(deprecated)]
168impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>> {
169    /// Make the subscriber to run in background until the session is closed.
170    ///
171    /// Background builder doesn't return a `FetchingSubscriber` object anymore.
172    #[zenoh_macros::unstable]
173    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
174    pub fn background(self) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>, true> {
175        QueryingSubscriberBuilder {
176            session: self.session,
177            key_expr: self.key_expr,
178            key_space: self.key_space,
179            origin: self.origin,
180            query_selector: self.query_selector,
181            query_target: self.query_target,
182            query_consolidation: self.query_consolidation,
183            query_accept_replies: self.query_accept_replies,
184            query_timeout: self.query_timeout,
185            handler: self.handler,
186        }
187    }
188}
189
190#[zenoh_macros::unstable]
191#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
192#[allow(deprecated)]
193impl<'b, Handler, const BACKGROUND: bool>
194    QueryingSubscriberBuilder<'_, 'b, UserSpace, Handler, BACKGROUND>
195{
196    ///
197    ///
198    /// Restrict the matching publications that will be receive by this [`Subscriber`]
199    /// to the ones that have the given [`Locality`](Locality).
200    #[zenoh_macros::unstable]
201    #[inline]
202    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
203    pub fn allowed_origin(mut self, origin: Locality) -> Self {
204        self.origin = origin;
205        self
206    }
207
208    /// Change the selector to be used for queries.
209    #[zenoh_macros::unstable]
210    #[inline]
211    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
212    pub fn query_selector<IntoSelector>(mut self, query_selector: IntoSelector) -> Self
213    where
214        IntoSelector: TryInto<Selector<'b>>,
215        <IntoSelector as TryInto<Selector<'b>>>::Error: Into<Error>,
216    {
217        self.query_selector = Some(query_selector.try_into().map_err(Into::into));
218        self
219    }
220
221    /// Change the target to be used for queries.
222    #[zenoh_macros::unstable]
223    #[inline]
224    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
225    pub fn query_target(mut self, query_target: QueryTarget) -> Self {
226        self.query_target = query_target;
227        self
228    }
229
230    /// Change the consolidation mode to be used for queries.
231    #[zenoh_macros::unstable]
232    #[inline]
233    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
234    pub fn query_consolidation<QC: Into<QueryConsolidation>>(
235        mut self,
236        query_consolidation: QC,
237    ) -> Self {
238        self.query_consolidation = query_consolidation.into();
239        self
240    }
241
242    /// Change the accepted replies for queries.
243    #[zenoh_macros::unstable]
244    #[inline]
245    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
246    pub fn query_accept_replies(mut self, accept_replies: ReplyKeyExpr) -> Self {
247        self.query_accept_replies = accept_replies;
248        self
249    }
250}
251
252#[zenoh_macros::unstable]
253#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
254#[allow(deprecated)]
255impl<'a, 'b, KeySpace, Handler, const BACKGROUND: bool>
256    QueryingSubscriberBuilder<'a, 'b, KeySpace, Handler, BACKGROUND>
257{
258    /// Change the timeout to be used for queries.
259    #[zenoh_macros::unstable]
260    #[inline]
261    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
262    pub fn query_timeout(mut self, query_timeout: Duration) -> Self {
263        self.query_timeout = query_timeout;
264        self
265    }
266
267    #[zenoh_macros::unstable]
268    #[allow(clippy::type_complexity)]
269    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
270    fn into_fetching_subscriber_builder(
271        self,
272    ) -> ZResult<
273        FetchingSubscriberBuilder<
274            'a,
275            'b,
276            KeySpace,
277            Handler,
278            impl FnOnce(Box<dyn Fn(Reply) + Send + Sync>) -> ZResult<()>,
279            Reply,
280            BACKGROUND,
281        >,
282    >
283    where
284        KeySpace: Into<self::KeySpace> + Clone,
285        Handler: IntoHandler<Sample>,
286        Handler::Handler: Send,
287    {
288        let session = self.session.downgrade();
289        let key_expr = self.key_expr?.into_owned();
290        let key_space = self.key_space.clone().into();
291        let query_selector = match self.query_selector {
292            Some(s) => Some(s?.into_owned()),
293            None => None,
294        };
295        let query_target = self.query_target;
296        let query_consolidation = self.query_consolidation;
297        let query_accept_replies = self.query_accept_replies;
298        let query_timeout = self.query_timeout;
299        Ok(FetchingSubscriberBuilder {
300            session: self.session,
301            key_expr: Ok(key_expr.clone()),
302            key_space: self.key_space,
303            origin: self.origin,
304            fetch: move |cb| match key_space {
305                self::KeySpace::User => match query_selector {
306                    Some(s) => session.get(s),
307                    None => session.get(key_expr),
308                }
309                .callback(cb)
310                .target(query_target)
311                .consolidation(query_consolidation)
312                .accept_replies(query_accept_replies)
313                .timeout(query_timeout)
314                .wait(),
315                self::KeySpace::Liveliness => session
316                    .liveliness()
317                    .get(key_expr)
318                    .callback(cb)
319                    .timeout(query_timeout)
320                    .wait(),
321            },
322            handler: self.handler,
323            phantom: std::marker::PhantomData,
324        })
325    }
326}
327
328#[zenoh_macros::unstable]
329#[allow(deprecated)]
330impl<KeySpace, Handler> Resolvable for QueryingSubscriberBuilder<'_, '_, KeySpace, Handler>
331where
332    Handler: IntoHandler<Sample>,
333    Handler::Handler: Send,
334{
335    type To = ZResult<FetchingSubscriber<Handler::Handler>>;
336}
337
338#[zenoh_macros::unstable]
339#[allow(deprecated)]
340impl<KeySpace, Handler> Wait for QueryingSubscriberBuilder<'_, '_, KeySpace, Handler>
341where
342    KeySpace: Into<self::KeySpace> + Clone,
343    Handler: IntoHandler<Sample> + Send,
344    Handler::Handler: Send,
345{
346    #[zenoh_macros::unstable]
347    fn wait(self) -> <Self as Resolvable>::To {
348        self.into_fetching_subscriber_builder()?.wait()
349    }
350}
351
352#[zenoh_macros::unstable]
353#[allow(deprecated)]
354impl<KeySpace, Handler> IntoFuture for QueryingSubscriberBuilder<'_, '_, KeySpace, Handler>
355where
356    KeySpace: Into<self::KeySpace> + Clone,
357    Handler: IntoHandler<Sample> + Send,
358    Handler::Handler: Send,
359{
360    type Output = <Self as Resolvable>::To;
361    type IntoFuture = Ready<<Self as Resolvable>::To>;
362
363    #[zenoh_macros::unstable]
364    fn into_future(self) -> Self::IntoFuture {
365        std::future::ready(self.wait())
366    }
367}
368
369#[zenoh_macros::unstable]
370#[allow(deprecated)]
371impl<KeySpace> Resolvable for QueryingSubscriberBuilder<'_, '_, KeySpace, Callback<Sample>, true> {
372    type To = ZResult<()>;
373}
374
375#[zenoh_macros::unstable]
376#[allow(deprecated)]
377impl<KeySpace> Wait for QueryingSubscriberBuilder<'_, '_, KeySpace, Callback<Sample>, true>
378where
379    KeySpace: Into<self::KeySpace> + Clone,
380{
381    #[zenoh_macros::unstable]
382    fn wait(self) -> <Self as Resolvable>::To {
383        self.into_fetching_subscriber_builder()?.wait()
384    }
385}
386
387#[zenoh_macros::unstable]
388#[allow(deprecated)]
389impl<KeySpace> IntoFuture for QueryingSubscriberBuilder<'_, '_, KeySpace, Callback<Sample>, true>
390where
391    KeySpace: Into<self::KeySpace> + Clone,
392{
393    type Output = <Self as Resolvable>::To;
394    type IntoFuture = Ready<<Self as Resolvable>::To>;
395
396    #[zenoh_macros::unstable]
397    fn into_future(self) -> Self::IntoFuture {
398        std::future::ready(self.wait())
399    }
400}
401
402// Collects samples in their Timestamp order, if any,
403// and ignores repeating samples with duplicate timestamps.
404// Samples without Timestamps are kept in a separate Vector,
405// and are considered as older than any sample with Timestamp.
406struct MergeQueue {
407    untimestamped: VecDeque<Sample>,
408    timestamped: BTreeMap<Timestamp, Sample>,
409}
410
411impl MergeQueue {
412    fn new() -> Self {
413        MergeQueue {
414            untimestamped: VecDeque::new(),
415            timestamped: BTreeMap::new(),
416        }
417    }
418
419    fn len(&self) -> usize {
420        self.untimestamped.len() + self.timestamped.len()
421    }
422
423    fn push(&mut self, sample: Sample) {
424        if let Some(ts) = sample.timestamp() {
425            self.timestamped.entry(*ts).or_insert(sample);
426        } else {
427            self.untimestamped.push_back(sample);
428        }
429    }
430
431    fn drain(&mut self) -> MergeQueueValues {
432        let mut vec = VecDeque::new();
433        let mut queue = BTreeMap::new();
434        swap(&mut self.untimestamped, &mut vec);
435        swap(&mut self.timestamped, &mut queue);
436        MergeQueueValues {
437            untimestamped: vec,
438            timestamped: queue.into_values(),
439        }
440    }
441}
442
443struct MergeQueueValues {
444    untimestamped: VecDeque<Sample>,
445    timestamped: btree_map::IntoValues<Timestamp, Sample>,
446}
447
448impl Iterator for MergeQueueValues {
449    type Item = Sample;
450    fn next(&mut self) -> Option<Self::Item> {
451        self.untimestamped
452            .pop_front()
453            .or_else(|| self.timestamped.next())
454    }
455}
456
457struct InnerState {
458    pending_fetches: u64,
459    merge_queue: MergeQueue,
460}
461
462/// The builder of [`FetchingSubscriber`], allowing to configure it.
463#[zenoh_macros::unstable]
464#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
465#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
466#[allow(deprecated)]
467pub struct FetchingSubscriberBuilder<
468    'a,
469    'b,
470    KeySpace,
471    Handler,
472    Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
473    TryIntoSample,
474    const BACKGROUND: bool = false,
475> where
476    TryIntoSample: ExtractSample,
477{
478    pub(crate) session: &'a Session,
479    pub(crate) key_expr: ZResult<KeyExpr<'b>>,
480    pub(crate) key_space: KeySpace,
481    pub(crate) origin: Locality,
482    pub(crate) fetch: Fetch,
483    pub(crate) handler: Handler,
484    pub(crate) phantom: std::marker::PhantomData<TryIntoSample>,
485}
486
487#[zenoh_macros::unstable]
488#[allow(deprecated)]
489impl<
490        'a,
491        KeySpace,
492        Handler,
493        Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
494        TryIntoSample,
495        const BACKGROUND: bool,
496    > FetchingSubscriberBuilder<'a, '_, KeySpace, Handler, Fetch, TryIntoSample, BACKGROUND>
497where
498    TryIntoSample: ExtractSample,
499{
500    #[zenoh_macros::unstable]
501    fn with_static_keys(
502        self,
503    ) -> FetchingSubscriberBuilder<'a, 'static, KeySpace, Handler, Fetch, TryIntoSample> {
504        FetchingSubscriberBuilder {
505            session: self.session,
506            key_expr: self.key_expr.map(|s| s.into_owned()),
507            key_space: self.key_space,
508            origin: self.origin,
509            fetch: self.fetch,
510            handler: self.handler,
511            phantom: std::marker::PhantomData,
512        }
513    }
514}
515
516#[zenoh_macros::unstable]
517#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
518#[allow(deprecated)]
519impl<
520        'a,
521        'b,
522        KeySpace,
523        Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
524        TryIntoSample,
525    > FetchingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandler, Fetch, TryIntoSample>
526where
527    TryIntoSample: ExtractSample,
528{
529    /// Add callback to [`FetchingSubscriber`].
530    #[zenoh_macros::unstable]
531    #[inline]
532    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
533    pub fn callback<F>(
534        self,
535        callback: F,
536    ) -> FetchingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>, Fetch, TryIntoSample>
537    where
538        F: Fn(Sample) + Send + Sync + 'static,
539    {
540        self.with(Callback::from(callback))
541    }
542
543    /// Add callback to [`FetchingSubscriber`].
544    ///
545    /// Using this guarantees that your callback will never be called concurrently.
546    /// If your callback is also accepted by the [`callback`](FetchingSubscriberBuilder::callback)
547    /// method, we suggest you use it instead of `callback_mut`.
548    ///
549    /// Subscriber will not be undeclared when dropped, with the callback running
550    /// in background until the session is closed.
551    #[zenoh_macros::unstable]
552    #[inline]
553    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
554    pub fn callback_mut<F>(
555        self,
556        callback: F,
557    ) -> FetchingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>, Fetch, TryIntoSample>
558    where
559        F: FnMut(Sample) + Send + Sync + 'static,
560    {
561        self.callback(locked(callback))
562    }
563
564    /// Use the given handler to receive Samples.
565    #[zenoh_macros::unstable]
566    #[inline]
567    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
568    pub fn with<Handler>(
569        self,
570        handler: Handler,
571    ) -> FetchingSubscriberBuilder<'a, 'b, KeySpace, Handler, Fetch, TryIntoSample>
572    where
573        Handler: IntoHandler<Sample>,
574    {
575        let FetchingSubscriberBuilder {
576            session,
577            key_expr,
578            key_space,
579            origin,
580            fetch,
581            handler: _,
582            phantom,
583        } = self;
584        FetchingSubscriberBuilder {
585            session,
586            key_expr,
587            key_space,
588            origin,
589            fetch,
590            handler,
591            phantom,
592        }
593    }
594}
595
596#[zenoh_macros::unstable]
597#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
598#[allow(deprecated)]
599impl<
600        'a,
601        'b,
602        KeySpace,
603        Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
604        TryIntoSample,
605    > FetchingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>, Fetch, TryIntoSample>
606where
607    TryIntoSample: ExtractSample,
608{
609    /// Make the subscriber to run in background until the session is closed.
610    ///
611    /// Background builder doesn't return a `FetchingSubscriber` object anymore.
612    #[zenoh_macros::unstable]
613    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
614    pub fn background(
615        self,
616    ) -> FetchingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>, Fetch, TryIntoSample, true>
617    {
618        FetchingSubscriberBuilder {
619            session: self.session,
620            key_expr: self.key_expr,
621            key_space: self.key_space,
622            origin: self.origin,
623            fetch: self.fetch,
624            handler: self.handler,
625            phantom: self.phantom,
626        }
627    }
628}
629
630#[zenoh_macros::unstable]
631#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
632#[allow(deprecated)]
633impl<
634        Handler,
635        Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
636        TryIntoSample,
637        const BACKGROUND: bool,
638    > FetchingSubscriberBuilder<'_, '_, UserSpace, Handler, Fetch, TryIntoSample, BACKGROUND>
639where
640    TryIntoSample: ExtractSample,
641{
642    /// Restrict the matching publications that will be received by this [`FetchingSubscriber`] to the ones that have the given [`Locality`](Locality).
643    #[zenoh_macros::unstable]
644    #[inline]
645    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
646    pub fn allowed_origin(mut self, origin: Locality) -> Self {
647        self.origin = origin;
648        self
649    }
650}
651
652#[zenoh_macros::unstable]
653#[allow(deprecated)]
654impl<
655        KeySpace,
656        Handler,
657        Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
658        TryIntoSample,
659    > Resolvable for FetchingSubscriberBuilder<'_, '_, KeySpace, Handler, Fetch, TryIntoSample>
660where
661    Handler: IntoHandler<Sample>,
662    Handler::Handler: Send,
663    TryIntoSample: ExtractSample,
664{
665    type To = ZResult<FetchingSubscriber<Handler::Handler>>;
666}
667
668#[zenoh_macros::unstable]
669#[allow(deprecated)]
670impl<
671        KeySpace,
672        Handler,
673        Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
674        TryIntoSample,
675    > Wait for FetchingSubscriberBuilder<'_, '_, KeySpace, Handler, Fetch, TryIntoSample>
676where
677    KeySpace: Into<self::KeySpace>,
678    Handler: IntoHandler<Sample> + Send,
679    Handler::Handler: Send,
680    TryIntoSample: ExtractSample + Send + Sync,
681{
682    #[zenoh_macros::unstable]
683    fn wait(self) -> <Self as Resolvable>::To {
684        FetchingSubscriber::new(self.with_static_keys())
685    }
686}
687
688#[zenoh_macros::unstable]
689#[allow(deprecated)]
690impl<
691        KeySpace,
692        Handler,
693        Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
694        TryIntoSample,
695    > IntoFuture for FetchingSubscriberBuilder<'_, '_, KeySpace, Handler, Fetch, TryIntoSample>
696where
697    KeySpace: Into<self::KeySpace>,
698    Handler: IntoHandler<Sample> + Send,
699    Handler::Handler: Send,
700    TryIntoSample: ExtractSample + Send + Sync,
701{
702    type Output = <Self as Resolvable>::To;
703    type IntoFuture = Ready<<Self as Resolvable>::To>;
704
705    #[zenoh_macros::unstable]
706    fn into_future(self) -> Self::IntoFuture {
707        std::future::ready(self.wait())
708    }
709}
710
711#[zenoh_macros::unstable]
712#[allow(deprecated)]
713impl<
714        KeySpace,
715        Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
716        TryIntoSample,
717    > Resolvable
718    for FetchingSubscriberBuilder<'_, '_, KeySpace, Callback<Sample>, Fetch, TryIntoSample, true>
719where
720    TryIntoSample: ExtractSample,
721{
722    type To = ZResult<()>;
723}
724
725#[zenoh_macros::unstable]
726#[allow(deprecated)]
727impl<
728        KeySpace,
729        Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
730        TryIntoSample,
731    > Wait
732    for FetchingSubscriberBuilder<'_, '_, KeySpace, Callback<Sample>, Fetch, TryIntoSample, true>
733where
734    KeySpace: Into<self::KeySpace>,
735    TryIntoSample: ExtractSample + Send + Sync,
736{
737    #[zenoh_macros::unstable]
738    fn wait(self) -> <Self as Resolvable>::To {
739        FetchingSubscriber::new(self.with_static_keys())?
740            .subscriber
741            .set_background(true);
742        Ok(())
743    }
744}
745
746#[zenoh_macros::unstable]
747#[allow(deprecated)]
748impl<
749        KeySpace,
750        Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
751        TryIntoSample,
752    > IntoFuture
753    for FetchingSubscriberBuilder<'_, '_, KeySpace, Callback<Sample>, Fetch, TryIntoSample, true>
754where
755    KeySpace: Into<self::KeySpace>,
756    TryIntoSample: ExtractSample + Send + Sync,
757{
758    type Output = <Self as Resolvable>::To;
759    type IntoFuture = Ready<<Self as Resolvable>::To>;
760
761    #[zenoh_macros::unstable]
762    fn into_future(self) -> Self::IntoFuture {
763        std::future::ready(self.wait())
764    }
765}
766
767/// A Subscriber that will run the given user defined `fetch` function at startup.
768///
769/// The user defined `fetch` function should fetch some samples and return them through the callback function
770/// (it could typically be a Session::get()). Those samples will be merged with the received publications and made available in the receiver.
771/// Later on, new fetches can be performed again, calling [`FetchingSubscriber::fetch()`](super::FetchingSubscriber::fetch()).
772///
773/// A typical usage of the `FetchingSubscriber` is to retrieve publications that were made in the past, but stored in some zenoh Storage.
774///
775/// # Examples
776/// ```no_run
777/// # #[tokio::main]
778/// # async fn main() {
779/// use zenoh::Wait;
780/// use zenoh_ext::*;
781///
782/// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
783/// let subscriber = session
784///     .declare_subscriber("key/expr")
785///     .fetching( |cb| {
786///         session
787///             .get("key/expr")
788///             .callback(cb)
789///             .wait()
790///     })
791///     .await
792///     .unwrap();
793/// while let Ok(sample) = subscriber.recv_async().await {
794///     println!("Received: {:?}", sample);
795/// }
796/// # }
797/// ```
798#[zenoh_macros::unstable]
799#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
800pub struct FetchingSubscriber<Handler> {
801    subscriber: Subscriber<()>,
802    callback: Callback<Sample>,
803    state: Arc<Mutex<InnerState>>,
804    handler: Handler,
805}
806
807#[zenoh_macros::unstable]
808#[allow(deprecated)]
809impl<Handler> std::ops::Deref for FetchingSubscriber<Handler> {
810    type Target = Handler;
811    #[zenoh_macros::unstable]
812    fn deref(&self) -> &Self::Target {
813        &self.handler
814    }
815}
816
817#[zenoh_macros::unstable]
818#[allow(deprecated)]
819impl<Handler> std::ops::DerefMut for FetchingSubscriber<Handler> {
820    #[zenoh_macros::unstable]
821    fn deref_mut(&mut self) -> &mut Self::Target {
822        &mut self.handler
823    }
824}
825
826#[zenoh_macros::unstable]
827#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
828#[allow(deprecated)]
829impl<Handler> FetchingSubscriber<Handler> {
830    fn new<
831        'a,
832        KeySpace,
833        InputHandler,
834        Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
835        TryIntoSample,
836    >(
837        conf: FetchingSubscriberBuilder<'a, 'a, KeySpace, InputHandler, Fetch, TryIntoSample>,
838    ) -> ZResult<Self>
839    where
840        KeySpace: Into<self::KeySpace>,
841        InputHandler: IntoHandler<Sample, Handler = Handler> + Send,
842        TryIntoSample: ExtractSample + Send + Sync,
843    {
844        let session_id = conf.session.zid();
845
846        let state = Arc::new(Mutex::new(InnerState {
847            pending_fetches: 0,
848            merge_queue: MergeQueue::new(),
849        }));
850        let (callback, receiver) = conf.handler.into_handler();
851
852        let sub_callback = {
853            let state = state.clone();
854            let callback = callback.clone();
855            move |s| {
856                let state = &mut zlock!(state);
857                if state.pending_fetches == 0 {
858                    callback.call(s);
859                } else {
860                    tracing::trace!(
861                        "Sample received while fetch in progress: push it to merge_queue"
862                    );
863
864                    // ensure the sample has a timestamp, thus it will always be sorted into the MergeQueue
865                    // after any timestamped Sample possibly coming from a fetch reply.
866                    let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().into(); // UNIX_EPOCH is Returns a Timespec::zero(), Unwrap Should be permissible here
867                    let timestamp = s
868                        .timestamp()
869                        .cloned()
870                        .unwrap_or(Timestamp::new(now, session_id.into()));
871                    state
872                        .merge_queue
873                        .push(SampleBuilder::from(s).timestamp(timestamp).into());
874                }
875            }
876        };
877
878        let key_expr = conf.key_expr?;
879
880        // register fetch handler
881        let handler = register_handler(state.clone(), callback.clone());
882        // declare subscriber
883        let subscriber = match conf.key_space.into() {
884            self::KeySpace::User => conf
885                .session
886                .declare_subscriber(&key_expr)
887                .callback(sub_callback)
888                .allowed_origin(conf.origin)
889                .wait()?,
890            self::KeySpace::Liveliness => conf
891                .session
892                .liveliness()
893                .declare_subscriber(&key_expr)
894                .callback(sub_callback)
895                .wait()?,
896        };
897
898        let fetch_subscriber = FetchingSubscriber {
899            subscriber,
900            callback,
901            state,
902            handler: receiver,
903        };
904
905        // run fetch
906        run_fetch(conf.fetch, handler)?;
907
908        Ok(fetch_subscriber)
909    }
910
911    /// Undeclare this [`FetchingSubscriber`]`.
912    #[zenoh_macros::unstable]
913    #[inline]
914    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
915    pub fn undeclare(self) -> impl Resolve<ZResult<()>> {
916        self.subscriber.undeclare()
917    }
918
919    #[zenoh_macros::unstable]
920    #[zenoh_macros::internal]
921    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
922    pub fn set_background(&mut self, background: bool) {
923        self.subscriber.set_background(background)
924    }
925
926    /// Return the key expression of this FetchingSubscriber
927    #[zenoh_macros::unstable]
928    #[inline]
929    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
930    pub fn key_expr(&self) -> &KeyExpr<'static> {
931        self.subscriber.key_expr()
932    }
933
934    /// Perform an additional `fetch`.
935    ///
936    /// The provided `fetch` function should fetch some samples and return them through the callback function
937    /// (it could typically be a Session::get()). Those samples will be merged with the received publications and made available in the receiver.
938    ///
939    /// # Examples
940    /// ```no_run
941    /// # #[tokio::main]
942    /// # async fn main() {
943    /// use zenoh::Wait;
944    /// use zenoh_ext::*;
945    ///
946    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
947    /// let mut subscriber = session
948    ///     .declare_subscriber("key/expr")
949    ///     .fetching( |cb| {
950    ///         session
951    ///             .get("key/expr")
952    ///             .callback(cb)
953    ///             .wait()
954    ///     })
955    ///     .await
956    ///     .unwrap();
957    ///
958    /// // perform an additional fetch
959    /// subscriber
960    ///     .fetch( |cb| {
961    ///         session
962    ///             .get("key/expr")
963    ///             .callback(cb)
964    ///             .wait()
965    ///     })
966    ///     .await
967    ///     .unwrap();
968    /// # }
969    /// ```
970    #[zenoh_macros::unstable]
971    #[inline]
972    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
973    pub fn fetch<
974        Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
975        TryIntoSample,
976    >(
977        &self,
978        fetch: Fetch,
979    ) -> impl Resolve<ZResult<()>>
980    where
981        TryIntoSample: ExtractSample + Send + Sync,
982    {
983        FetchBuilder {
984            fetch,
985            phantom: std::marker::PhantomData,
986            state: self.state.clone(),
987            callback: self.callback.clone(),
988        }
989    }
990}
991
992struct RepliesHandler {
993    state: Arc<Mutex<InnerState>>,
994    callback: Callback<Sample>,
995}
996
997impl Drop for RepliesHandler {
998    fn drop(&mut self) {
999        let mut state = zlock!(self.state);
1000        state.pending_fetches -= 1;
1001        tracing::trace!(
1002            "Fetch done - {} fetches still in progress",
1003            state.pending_fetches
1004        );
1005        if state.pending_fetches == 0 {
1006            tracing::debug!(
1007                "All fetches done. Replies and live publications merged - {} samples to propagate",
1008                state.merge_queue.len()
1009            );
1010            for s in state.merge_queue.drain() {
1011                self.callback.call(s);
1012            }
1013        }
1014    }
1015}
1016
1017/// The builder returned by [`FetchingSubscriber::fetch`](FetchingSubscriber::fetch).
1018///
1019/// # Examples
1020/// ```no_run
1021/// # #[tokio::main]
1022/// # async fn main() {
1023/// # use zenoh::Wait;
1024/// # use zenoh_ext::*;
1025/// #
1026/// # let session = zenoh::open(zenoh::Config::default()).await.unwrap();
1027/// # let mut fetching_subscriber = session
1028/// #     .declare_subscriber("key/expr")
1029/// #     .fetching( |cb| {
1030/// #         session
1031/// #             .get("key/expr")
1032/// #             .callback(cb)
1033/// #            .wait()
1034/// #     })
1035/// #     .await
1036/// #     .unwrap();
1037/// #
1038/// fetching_subscriber
1039///     .fetch( |cb| {
1040///         session
1041///             .get("key/expr")
1042///             .callback(cb)
1043///             .wait()
1044///     })
1045///     .await
1046///     .unwrap();
1047/// # }
1048/// ```
1049#[zenoh_macros::unstable]
1050#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
1051#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
1052#[allow(deprecated)]
1053pub struct FetchBuilder<
1054    Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
1055    TryIntoSample,
1056> where
1057    TryIntoSample: ExtractSample,
1058{
1059    fetch: Fetch,
1060    phantom: std::marker::PhantomData<TryIntoSample>,
1061    state: Arc<Mutex<InnerState>>,
1062    callback: Callback<Sample>,
1063}
1064
1065#[zenoh_macros::unstable]
1066#[allow(deprecated)]
1067impl<Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>, TryIntoSample>
1068    Resolvable for FetchBuilder<Fetch, TryIntoSample>
1069where
1070    TryIntoSample: ExtractSample,
1071{
1072    type To = ZResult<()>;
1073}
1074
1075#[zenoh_macros::unstable]
1076#[allow(deprecated)]
1077impl<Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>, TryIntoSample> Wait
1078    for FetchBuilder<Fetch, TryIntoSample>
1079where
1080    TryIntoSample: ExtractSample,
1081{
1082    #[zenoh_macros::unstable]
1083    fn wait(self) -> <Self as Resolvable>::To {
1084        let handler = register_handler(self.state, self.callback);
1085        run_fetch(self.fetch, handler)
1086    }
1087}
1088
1089#[zenoh_macros::unstable]
1090#[allow(deprecated)]
1091impl<Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>, TryIntoSample>
1092    IntoFuture for FetchBuilder<Fetch, TryIntoSample>
1093where
1094    TryIntoSample: ExtractSample,
1095{
1096    type Output = <Self as Resolvable>::To;
1097    type IntoFuture = Ready<<Self as Resolvable>::To>;
1098
1099    #[zenoh_macros::unstable]
1100    fn into_future(self) -> Self::IntoFuture {
1101        std::future::ready(self.wait())
1102    }
1103}
1104
1105fn register_handler(state: Arc<Mutex<InnerState>>, callback: Callback<Sample>) -> RepliesHandler {
1106    zlock!(state).pending_fetches += 1;
1107    // pending fetches will be decremented in RepliesHandler drop()
1108    RepliesHandler { state, callback }
1109}
1110
1111#[zenoh_macros::unstable]
1112#[allow(deprecated)]
1113fn run_fetch<
1114    Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
1115    TryIntoSample,
1116>(
1117    fetch: Fetch,
1118    handler: RepliesHandler,
1119) -> ZResult<()>
1120where
1121    TryIntoSample: ExtractSample,
1122{
1123    tracing::debug!("Fetch data for FetchingSubscriber");
1124    (fetch)(Box::new(move |s: TryIntoSample| match s.extract() {
1125        Ok(s) => {
1126            let mut state = zlock!(handler.state);
1127            tracing::trace!("Fetched sample received: push it to merge_queue");
1128            state.merge_queue.push(s);
1129        }
1130        Err(e) => tracing::debug!("Received error fetching data: {}", e),
1131    }))
1132}
1133
1134/// [`ExtractSample`].
1135#[zenoh_macros::unstable]
1136#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
1137#[allow(deprecated)]
1138pub trait ExtractSample {
1139    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
1140    fn extract(self) -> ZResult<Sample>;
1141}
1142
1143#[allow(deprecated)]
1144impl ExtractSample for Reply {
1145    fn extract(self) -> ZResult<Sample> {
1146        self.into_result().map_err(|e| zerror!("{:?}", e).into())
1147    }
1148}