zenoh_ext/
subscriber_ext.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::time::Duration;
15
16use futures::stream::{Forward, Map};
17use zenoh::{
18    handlers::{fifo, FifoChannelHandler},
19    liveliness::LivelinessSubscriberBuilder,
20    pubsub::{Subscriber, SubscriberBuilder},
21    query::{QueryConsolidation, QueryTarget, ReplyKeyExpr},
22    sample::{Locality, Sample},
23    Result as ZResult,
24};
25
26#[allow(deprecated)]
27use crate::{
28    advanced_subscriber::HistoryConfig, querying_subscriber::QueryingSubscriberBuilder,
29    AdvancedSubscriberBuilder, ExtractSample, FetchingSubscriberBuilder, RecoveryConfig,
30};
31
32/// Allows writing `subscriber.forward(receiver)` instead of `subscriber.stream().map(Ok).forward(publisher)`
33#[zenoh_macros::unstable]
34pub trait SubscriberForward<'a, S> {
35    type Output;
36    fn forward(&'a mut self, sink: S) -> Self::Output;
37}
38impl<'a, S> SubscriberForward<'a, S> for Subscriber<FifoChannelHandler<Sample>>
39where
40    S: futures::sink::Sink<Sample>,
41{
42    #[zenoh_macros::unstable]
43    type Output =
44        Forward<Map<fifo::RecvStream<'a, Sample>, fn(Sample) -> Result<Sample, S::Error>>, S>;
45    fn forward(&'a mut self, sink: S) -> Self::Output {
46        futures::StreamExt::forward(futures::StreamExt::map(self.stream(), Ok), sink)
47    }
48}
49
50/// Some extensions to the [`zenoh::subscriber::SubscriberBuilder`](zenoh::pubsub::SubscriberBuilder)
51#[zenoh_macros::unstable]
52#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
53#[allow(deprecated)]
54pub trait SubscriberBuilderExt<'a, 'b, Handler> {
55    type KeySpace;
56
57    /// Create a [`FetchingSubscriber`](super::FetchingSubscriber).
58    ///
59    /// This operation returns a [`FetchingSubscriberBuilder`](FetchingSubscriberBuilder) that can be used to finely configure the subscriber.
60    /// As soon as built (calling `.wait()` or `.await` on the `FetchingSubscriberBuilder`), the `FetchingSubscriber`
61    /// will run the given `fetch` function. The user defined `fetch` function should fetch some samples and return them
62    /// through the callback function. Those samples will be merged with the received publications and made available in the receiver.
63    /// Later on, new fetches can be performed again, calling [`FetchingSubscriber::fetch()`](super::FetchingSubscriber::fetch()).
64    ///
65    /// A typical usage of the `FetchingSubscriber` is to retrieve publications that were made in the past, but stored in some zenoh Storage.
66    ///
67    /// # Examples
68    /// ```no_run
69    /// # #[tokio::main]
70    /// # async fn main() {
71    /// use zenoh::Wait;
72    /// use zenoh_ext::*;
73    ///
74    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
75    /// let subscriber = session
76    ///     .declare_subscriber("key/expr")
77    ///     .fetching( |cb| {
78    ///         session
79    ///             .get("key/expr")
80    ///             .callback(cb)
81    ///             .wait()
82    ///     })
83    ///     .await
84    ///     .unwrap();
85    /// while let Ok(sample) = subscriber.recv_async().await {
86    ///     println!("Received: {:?}", sample);
87    /// }
88    /// # }
89    /// ```
90    #[zenoh_macros::unstable]
91    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
92    fn fetching<
93        Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
94        TryIntoSample,
95    >(
96        self,
97        fetch: Fetch,
98    ) -> FetchingSubscriberBuilder<'a, 'b, Self::KeySpace, Handler, Fetch, TryIntoSample>
99    where
100        TryIntoSample: ExtractSample;
101
102    /// Create a [`FetchingSubscriber`](super::FetchingSubscriber) that will perform a query (`session.get()`) as it's
103    /// initial fetch.
104    ///
105    /// This operation returns a [`QueryingSubscriberBuilder`](QueryingSubscriberBuilder) that can be used to finely configure the subscriber.
106    /// As soon as built (calling `.wait()` or `.await` on the `QueryingSubscriberBuilder`), the `FetchingSubscriber`
107    /// will issue a query on a given key expression (by default it uses the same key expression than it subscribes to).
108    /// The results of the query will be merged with the received publications and made available in the receiver.
109    /// Later on, new fetches can be performed again, calling [`FetchingSubscriber::fetch()`](super::FetchingSubscriber::fetch()).
110    ///
111    /// A typical usage of the `FetchingSubscriber` is to retrieve publications that were made in the past, but stored in some zenoh Storage.
112    ///
113    /// # Examples
114    /// ```no_run
115    /// # #[tokio::main]
116    /// # async fn main() {
117    /// use zenoh_ext::*;
118    ///
119    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
120    /// let subscriber = session
121    ///     .declare_subscriber("key/expr")
122    ///     .querying()
123    ///     .await
124    ///     .unwrap();
125    /// while let Ok(sample) = subscriber.recv_async().await {
126    ///     println!("Received: {:?}", sample);
127    /// }
128    /// # }
129    /// ```
130    #[zenoh_macros::unstable]
131    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
132    fn querying(self) -> QueryingSubscriberBuilder<'a, 'b, Self::KeySpace, Handler>;
133}
134
135/// Some extensions to the [`zenoh::subscriber::SubscriberBuilder`](zenoh::pubsub::SubscriberBuilder)
136#[zenoh_macros::unstable]
137pub trait AdvancedSubscriberBuilderExt<'a, 'b, 'c, Handler> {
138    /// Enable query for historical data.
139    ///
140    /// History can only be retransmitted by [`AdvancedPublishers`](crate::AdvancedPublisher) that enable [`cache`](crate::AdvancedPublisherBuilder::cache).
141    #[zenoh_macros::unstable]
142    fn history(self, config: HistoryConfig) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler>;
143
144    /// Ask for retransmission of detected lost Samples.
145    ///
146    /// Retransmission can only be achieved by [`AdvancedPublishers`](crate::AdvancedPublisher)
147    /// that enable [`cache`](crate::AdvancedPublisherBuilder::cache) and
148    /// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection).
149    #[zenoh_macros::unstable]
150    fn recovery(self, conf: RecoveryConfig) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler>;
151
152    /// Allow this subscriber to be detected through liveliness.
153    #[zenoh_macros::unstable]
154    fn subscriber_detection(self) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler>;
155
156    /// Turn this [`Subscriber`](zenoh::subscriber::Subscriber) into an [`AdvancedSubscriber`](crate::AdvancedSubscriber).
157    #[zenoh_macros::unstable]
158    fn advanced(self) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler>;
159}
160
161#[zenoh_macros::unstable]
162#[allow(deprecated)]
163impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler> for SubscriberBuilder<'a, 'b, Handler> {
164    type KeySpace = crate::UserSpace;
165
166    /// Create a [`FetchingSubscriber`](super::FetchingSubscriber).
167    ///
168    /// This operation returns a [`FetchingSubscriberBuilder`](FetchingSubscriberBuilder) that can be used to finely configure the subscriber.
169    /// As soon as built (calling `.wait()` or `.await` on the `FetchingSubscriberBuilder`), the `FetchingSubscriber`
170    /// will run the given `fetch` function. The user defined `fetch` function should fetch some samples and return them
171    /// through the callback function. Those samples will be merged with the received publications and made available in the receiver.
172    /// Later on, new fetches can be performed again, calling [`FetchingSubscriber::fetch()`](super::FetchingSubscriber::fetch()).
173    ///
174    /// A typical usage of the `FetchingSubscriber` is to retrieve publications that were made in the past, but stored in some zenoh Storage.
175    ///
176    /// # Examples
177    /// ```no_run
178    /// # #[tokio::main]
179    /// # async fn main() {
180    /// use zenoh::Wait;
181    /// use zenoh_ext::*;
182    ///
183    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
184    /// let subscriber = session
185    ///     .declare_subscriber("key/expr")
186    ///     .fetching( |cb| {
187    ///         session
188    ///             .get("key/expr")
189    ///             .callback(cb)
190    ///             .wait()
191    ///     })
192    ///     .await
193    ///     .unwrap();
194    /// while let Ok(sample) = subscriber.recv_async().await {
195    ///     println!("Received: {:?}", sample);
196    /// }
197    /// # }
198    /// ```
199    #[zenoh_macros::unstable]
200    fn fetching<
201        Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
202        TryIntoSample,
203    >(
204        self,
205        fetch: Fetch,
206    ) -> FetchingSubscriberBuilder<'a, 'b, Self::KeySpace, Handler, Fetch, TryIntoSample>
207    where
208        TryIntoSample: ExtractSample,
209    {
210        FetchingSubscriberBuilder {
211            session: self.session,
212            key_expr: self.key_expr,
213            key_space: crate::UserSpace,
214            origin: self.origin,
215            fetch,
216            handler: self.handler,
217            phantom: std::marker::PhantomData,
218        }
219    }
220
221    /// Create a [`FetchingSubscriber`](super::FetchingSubscriber) that will perform a query (`session.get()`) as it's
222    /// initial fetch.
223    ///
224    /// This operation returns a [`QueryingSubscriberBuilder`](QueryingSubscriberBuilder) that can be used to finely configure the subscriber.
225    /// As soon as built (calling `.wait()` or `.await` on the `QueryingSubscriberBuilder`), the `FetchingSubscriber`
226    /// will issue a query on a given key expression (by default it uses the same key expression than it subscribes to).
227    /// The results of the query will be merged with the received publications and made available in the receiver.
228    /// Later on, new fetches can be performed again, calling [`FetchingSubscriber::fetch()`](super::FetchingSubscriber::fetch()).
229    ///
230    /// A typical usage of the `FetchingSubscriber` is to retrieve publications that were made in the past, but stored in some zenoh Storage.
231    ///
232    /// # Examples
233    /// ```no_run
234    /// # #[tokio::main]
235    /// # async fn main() {
236    /// use zenoh_ext::*;
237    ///
238    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
239    /// let subscriber = session
240    ///     .declare_subscriber("key/expr")
241    ///     .querying()
242    ///     .await
243    ///     .unwrap();
244    /// while let Ok(sample) = subscriber.recv_async().await {
245    ///     println!("Received: {:?}", sample);
246    /// }
247    /// # }
248    /// ```
249    #[zenoh_macros::unstable]
250    fn querying(self) -> QueryingSubscriberBuilder<'a, 'b, Self::KeySpace, Handler> {
251        QueryingSubscriberBuilder {
252            session: self.session,
253            key_expr: self.key_expr,
254            key_space: crate::UserSpace,
255            origin: self.origin,
256            query_selector: None,
257            // By default query all matching publication caches and storages
258            query_target: QueryTarget::All,
259            // By default no query consolidation, to receive more than 1 sample per-resource
260            // (if history of publications is available)
261            query_consolidation: QueryConsolidation::from(zenoh::query::ConsolidationMode::None),
262            query_accept_replies: ReplyKeyExpr::default(),
263            query_timeout: Duration::from_secs(10),
264            handler: self.handler,
265        }
266    }
267}
268
269#[zenoh_macros::unstable]
270impl<'a, 'b, 'c, Handler> AdvancedSubscriberBuilderExt<'a, 'b, 'c, Handler>
271    for SubscriberBuilder<'a, 'b, Handler>
272{
273    /// Enable query for historical data.
274    ///
275    /// History can only be retransmitted by [`AdvancedPublishers`](crate::AdvancedPublisher) that enable [`cache`](crate::AdvancedPublisherBuilder::cache).
276    #[zenoh_macros::unstable]
277    fn history(self, config: HistoryConfig) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler> {
278        AdvancedSubscriberBuilder::new(self).history(config)
279    }
280
281    /// Ask for retransmission of detected lost Samples.
282    ///
283    /// Retransmission can only be achieved by [`AdvancedPublishers`](crate::AdvancedPublisher)
284    /// that enable [`cache`](crate::AdvancedPublisherBuilder::cache) and
285    /// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection).
286    #[zenoh_macros::unstable]
287    fn recovery(self, conf: RecoveryConfig) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler> {
288        AdvancedSubscriberBuilder::new(self).recovery(conf)
289    }
290
291    /// Allow this subscriber to be detected through liveliness.
292    #[zenoh_macros::unstable]
293    fn subscriber_detection(self) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler> {
294        AdvancedSubscriberBuilder::new(self).subscriber_detection()
295    }
296
297    /// Turn this [`Subscriber`](zenoh::subscriber::Subscriber) into an [`AdvancedSubscriber`](crate::AdvancedSubscriber).
298    #[zenoh_macros::unstable]
299    fn advanced(self) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler> {
300        AdvancedSubscriberBuilder::new(self)
301    }
302}
303
304#[zenoh_macros::unstable]
305#[allow(deprecated)]
306impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler>
307    for LivelinessSubscriberBuilder<'a, 'b, Handler>
308{
309    type KeySpace = crate::LivelinessSpace;
310
311    /// Create a fetching liveliness subscriber ([`FetchingSubscriber`](super::FetchingSubscriber)).
312    ///
313    /// This operation returns a [`FetchingSubscriberBuilder`](FetchingSubscriberBuilder) that can be used to finely configure the subscriber.
314    /// As soon as built (calling `.wait()` or `.await` on the `FetchingSubscriberBuilder`), the `FetchingSubscriber`
315    /// will run the given `fetch` function. The user defined `fetch` function should fetch some samples and return them
316    /// through the callback function. Those samples will be merged with the received publications and made available in the receiver.
317    /// Later on, new fetches can be performed again, calling [`FetchingSubscriber::fetch()`](super::FetchingSubscriber::fetch()).
318    ///
319    /// A typical usage of the fetching liveliness subscriber is to retrieve existing liveliness tokens while subscribing to
320    /// new liveness changes.
321    ///
322    /// # Examples
323    /// ```no_run
324    /// # #[tokio::main]
325    /// # async fn main() {
326    /// use zenoh::Wait;
327    /// use zenoh_ext::*;
328    ///
329    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
330    /// let subscriber = session
331    ///     .liveliness()
332    ///     .declare_subscriber("key/expr")
333    ///     .fetching( |cb| {
334    ///         session
335    ///             .liveliness()
336    ///             .get("key/expr")
337    ///             .callback(cb)
338    ///             .wait()
339    ///     })
340    ///     .await
341    ///     .unwrap();
342    /// while let Ok(sample) = subscriber.recv_async().await {
343    ///     println!("Received: {:?}", sample);
344    /// }
345    /// # }
346    /// ```
347    #[zenoh_macros::unstable]
348    fn fetching<
349        Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
350        TryIntoSample,
351    >(
352        self,
353        fetch: Fetch,
354    ) -> FetchingSubscriberBuilder<'a, 'b, Self::KeySpace, Handler, Fetch, TryIntoSample>
355    where
356        TryIntoSample: ExtractSample,
357    {
358        FetchingSubscriberBuilder {
359            session: self.session,
360            key_expr: self.key_expr,
361            key_space: crate::LivelinessSpace,
362            origin: Locality::default(),
363            fetch,
364            handler: self.handler,
365            phantom: std::marker::PhantomData,
366        }
367    }
368
369    /// Create a fetching liveliness subscriber ([`FetchingSubscriber`](super::FetchingSubscriber)) that will perform a
370    /// liveliness query (`session.liveliness().get()`) as it's initial fetch.
371    ///
372    /// This operation returns a [`QueryingSubscriberBuilder`](QueryingSubscriberBuilder) that can be used to finely configure the subscriber.
373    /// As soon as built (calling `.wait()` or `.await` on the `QueryingSubscriberBuilder`), the `FetchingSubscriber`
374    /// will issue a liveliness query on a given key expression (by default it uses the same key expression than it subscribes to).
375    /// The results of the query will be merged with the received publications and made available in the receiver.
376    /// Later on, new fetches can be performed again, calling [`FetchingSubscriber::fetch()`](super::FetchingSubscriber::fetch()).
377    ///
378    /// A typical usage of the fetching liveliness subscriber is to retrieve existing liveliness tokens while subscribing to
379    /// new liveness changes.
380    ///
381    /// # Examples
382    /// ```no_run
383    /// # #[tokio::main]
384    /// # async fn main() {
385    /// use zenoh_ext::*;
386    ///
387    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
388    /// let subscriber = session
389    ///     .liveliness()
390    ///     .declare_subscriber("key/expr")
391    ///     .querying()
392    ///     .await
393    ///     .unwrap();
394    /// while let Ok(sample) = subscriber.recv_async().await {
395    ///     println!("Received: {:?}", sample);
396    /// }
397    /// # }
398    /// ```
399    #[zenoh_macros::unstable]
400    fn querying(self) -> QueryingSubscriberBuilder<'a, 'b, Self::KeySpace, Handler> {
401        QueryingSubscriberBuilder {
402            session: self.session,
403            key_expr: self.key_expr,
404            key_space: crate::LivelinessSpace,
405            origin: Locality::default(),
406            query_selector: None,
407            query_target: QueryTarget::DEFAULT,
408            query_consolidation: QueryConsolidation::DEFAULT,
409            query_accept_replies: ReplyKeyExpr::MatchingQuery,
410            query_timeout: Duration::from_secs(10),
411            handler: self.handler,
412        }
413    }
414}