Skip to main content

zenoh_ext/
subscriber_ext.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::time::Duration;
15
16use futures::stream::{Forward, Map};
17use zenoh::{
18    handlers::{fifo, FifoChannelHandler},
19    liveliness::LivelinessSubscriberBuilder,
20    pubsub::{Subscriber, SubscriberBuilder},
21    query::{QueryConsolidation, QueryTarget, ReplyKeyExpr},
22    sample::{Locality, Sample},
23    Result as ZResult,
24};
25
26#[allow(deprecated)]
27use crate::{
28    advanced_subscriber::HistoryConfig, querying_subscriber::QueryingSubscriberBuilder,
29    AdvancedSubscriberBuilder, ExtractSample, FetchingSubscriberBuilder, RecoveryConfig,
30};
31
32/// Allows writing `subscriber.forward(receiver)` instead of `subscriber.stream().map(Ok).forward(publisher)`
33#[zenoh_macros::unstable]
34pub trait SubscriberForward<'a, S> {
35    type Output;
36    fn forward(&'a mut self, sink: S) -> Self::Output;
37}
38impl<'a, S> SubscriberForward<'a, S> for Subscriber<FifoChannelHandler<Sample>>
39where
40    S: futures::sink::Sink<Sample>,
41{
42    #[zenoh_macros::unstable]
43    type Output =
44        Forward<Map<fifo::RecvStream<'a, Sample>, fn(Sample) -> Result<Sample, S::Error>>, S>;
45    fn forward(&'a mut self, sink: S) -> Self::Output {
46        futures::StreamExt::forward(futures::StreamExt::map(self.stream(), Ok), sink)
47    }
48}
49
50/// Some extensions to the [`zenoh::subscriber::SubscriberBuilder`](zenoh::pubsub::SubscriberBuilder)
51#[zenoh_macros::unstable]
52#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
53#[allow(deprecated)]
54pub trait SubscriberBuilderExt<'a, 'b, Handler> {
55    type KeySpace;
56
57    /// Create a [`FetchingSubscriber`](super::FetchingSubscriber).
58    ///
59    /// This operation returns a [`FetchingSubscriberBuilder`](FetchingSubscriberBuilder) that can be used to finely configure the subscriber.
60    /// As soon as built (calling `.wait()` or `.await` on the `FetchingSubscriberBuilder`), the `FetchingSubscriber`
61    /// will run the given `fetch` function. The user defined `fetch` function should fetch some samples and return them
62    /// through the callback function. Those samples will be merged with the received publications and made available in the receiver.
63    /// Later on, new fetches can be performed again, calling [`FetchingSubscriber::fetch()`](super::FetchingSubscriber::fetch()).
64    ///
65    /// A typical usage of the `FetchingSubscriber` is to retrieve publications that were made in the past, but stored in some zenoh Storage.
66    ///
67    /// # Examples
68    /// ```no_run
69    /// # #[tokio::main]
70    /// # async fn main() {
71    /// use zenoh::Wait;
72    /// use zenoh_ext::*;
73    ///
74    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
75    /// let subscriber = session
76    ///     .declare_subscriber("key/expr")
77    ///     .fetching( |cb| {
78    ///         session
79    ///             .get("key/expr")
80    ///             .callback(cb)
81    ///             .wait()
82    ///     })
83    ///     .await
84    ///     .unwrap();
85    /// while let Ok(sample) = subscriber.recv_async().await {
86    ///     println!("Received: {:?}", sample);
87    /// }
88    /// # }
89    /// ```
90    #[zenoh_macros::unstable]
91    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
92    fn fetching<
93        Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
94        TryIntoSample,
95    >(
96        self,
97        fetch: Fetch,
98    ) -> FetchingSubscriberBuilder<'a, 'b, Self::KeySpace, Handler, Fetch, TryIntoSample>
99    where
100        TryIntoSample: ExtractSample;
101
102    /// Create a [`FetchingSubscriber`](super::FetchingSubscriber) that will perform a query (`session.get()`) as it's initial fetch.
103    ///
104    /// This operation returns a [`QueryingSubscriberBuilder`](QueryingSubscriberBuilder) that can be used to finely configure the subscriber.
105    /// As soon as built (calling `.wait()` or `.await` on the `QueryingSubscriberBuilder`), the `FetchingSubscriber`
106    /// will issue a query on a given key expression (by default it uses the same key expression than it subscribes to).
107    /// The results of the query will be merged with the received publications and made available in the receiver.
108    /// Later on, new fetches can be performed again, calling [`FetchingSubscriber::fetch()`](super::FetchingSubscriber::fetch()).
109    ///
110    /// A typical usage of the `FetchingSubscriber` is to retrieve publications that were made in the past, but stored in some zenoh Storage.
111    ///
112    /// # Examples
113    /// ```no_run
114    /// # #[tokio::main]
115    /// # async fn main() {
116    /// use zenoh_ext::*;
117    ///
118    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
119    /// let subscriber = session
120    ///     .declare_subscriber("key/expr")
121    ///     .querying()
122    ///     .await
123    ///     .unwrap();
124    /// while let Ok(sample) = subscriber.recv_async().await {
125    ///     println!("Received: {:?}", sample);
126    /// }
127    /// # }
128    /// ```
129    #[zenoh_macros::unstable]
130    #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
131    fn querying(self) -> QueryingSubscriberBuilder<'a, 'b, Self::KeySpace, Handler>;
132}
133
134/// Some extensions to the [`zenoh::subscriber::SubscriberBuilder`](zenoh::pubsub::SubscriberBuilder)
135#[zenoh_macros::unstable]
136pub trait AdvancedSubscriberBuilderExt<'a, 'b, 'c, Handler> {
137    /// Enable query for historical data.
138    ///
139    /// History can only be retransmitted by [`AdvancedPublishers`](crate::AdvancedPublisher) that enable [`cache`](crate::AdvancedPublisherBuilder::cache).
140    #[zenoh_macros::unstable]
141    fn history(self, config: HistoryConfig) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler>;
142
143    /// Ask for retransmission of detected lost Samples.
144    ///
145    /// Retransmission can only be achieved by [`AdvancedPublishers`](crate::AdvancedPublisher)
146    /// that enable [`cache`](crate::AdvancedPublisherBuilder::cache) and
147    /// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection).
148    #[zenoh_macros::unstable]
149    fn recovery(self, conf: RecoveryConfig) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler>;
150
151    /// Allow this subscriber to be detected through liveliness.
152    #[zenoh_macros::unstable]
153    fn subscriber_detection(self) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler>;
154
155    /// Turn this [`Subscriber`](zenoh::subscriber::Subscriber) into an [`AdvancedSubscriber`](crate::AdvancedSubscriber).
156    #[zenoh_macros::unstable]
157    fn advanced(self) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler>;
158}
159
160#[zenoh_macros::unstable]
161#[allow(deprecated)]
162impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler> for SubscriberBuilder<'a, 'b, Handler> {
163    type KeySpace = crate::UserSpace;
164
165    /// Create a [`FetchingSubscriber`](super::FetchingSubscriber).
166    ///
167    /// This operation returns a [`FetchingSubscriberBuilder`](FetchingSubscriberBuilder) that can be used to finely configure the subscriber.
168    /// As soon as built (calling `.wait()` or `.await` on the `FetchingSubscriberBuilder`), the `FetchingSubscriber`
169    /// will run the given `fetch` function. The user defined `fetch` function should fetch some samples and return them
170    /// through the callback function. Those samples will be merged with the received publications and made available in the receiver.
171    /// Later on, new fetches can be performed again, calling [`FetchingSubscriber::fetch()`](super::FetchingSubscriber::fetch()).
172    ///
173    /// A typical usage of the `FetchingSubscriber` is to retrieve publications that were made in the past, but stored in some zenoh Storage.
174    ///
175    /// # Examples
176    /// ```no_run
177    /// # #[tokio::main]
178    /// # async fn main() {
179    /// use zenoh::Wait;
180    /// use zenoh_ext::*;
181    ///
182    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
183    /// let subscriber = session
184    ///     .declare_subscriber("key/expr")
185    ///     .fetching( |cb| {
186    ///         session
187    ///             .get("key/expr")
188    ///             .callback(cb)
189    ///             .wait()
190    ///     })
191    ///     .await
192    ///     .unwrap();
193    /// while let Ok(sample) = subscriber.recv_async().await {
194    ///     println!("Received: {:?}", sample);
195    /// }
196    /// # }
197    /// ```
198    #[zenoh_macros::unstable]
199    fn fetching<
200        Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
201        TryIntoSample,
202    >(
203        self,
204        fetch: Fetch,
205    ) -> FetchingSubscriberBuilder<'a, 'b, Self::KeySpace, Handler, Fetch, TryIntoSample>
206    where
207        TryIntoSample: ExtractSample,
208    {
209        FetchingSubscriberBuilder {
210            session: self.session,
211            key_expr: self.key_expr,
212            key_space: crate::UserSpace,
213            origin: self.origin,
214            fetch,
215            handler: self.handler,
216            phantom: std::marker::PhantomData,
217        }
218    }
219
220    /// Create a [`FetchingSubscriber`](super::FetchingSubscriber) that will perform a query (`session.get()`) as it's initial fetch.
221    ///
222    /// This operation returns a [`QueryingSubscriberBuilder`](QueryingSubscriberBuilder) that can be used to finely configure the subscriber.
223    /// As soon as built (calling `.wait()` or `.await` on the `QueryingSubscriberBuilder`), the `FetchingSubscriber`
224    /// will issue a query on a given key expression (by default it uses the same key expression than it subscribes to).
225    /// The results of the query will be merged with the received publications and made available in the receiver.
226    /// Later on, new fetches can be performed again, calling [`FetchingSubscriber::fetch()`](super::FetchingSubscriber::fetch()).
227    ///
228    /// A typical usage of the `FetchingSubscriber` is to retrieve publications that were made in the past, but stored in some zenoh Storage.
229    ///
230    /// # Examples
231    /// ```no_run
232    /// # #[tokio::main]
233    /// # async fn main() {
234    /// use zenoh_ext::*;
235    ///
236    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
237    /// let subscriber = session
238    ///     .declare_subscriber("key/expr")
239    ///     .querying()
240    ///     .await
241    ///     .unwrap();
242    /// while let Ok(sample) = subscriber.recv_async().await {
243    ///     println!("Received: {:?}", sample);
244    /// }
245    /// # }
246    /// ```
247    #[zenoh_macros::unstable]
248    fn querying(self) -> QueryingSubscriberBuilder<'a, 'b, Self::KeySpace, Handler> {
249        QueryingSubscriberBuilder {
250            session: self.session,
251            key_expr: self.key_expr,
252            key_space: crate::UserSpace,
253            origin: self.origin,
254            query_selector: None,
255            // By default query all matching publication caches and storages
256            query_target: QueryTarget::All,
257            // By default no query consolidation, to receive more than 1 sample per-resource
258            // (if history of publications is available)
259            query_consolidation: QueryConsolidation::from(zenoh::query::ConsolidationMode::None),
260            query_accept_replies: ReplyKeyExpr::default(),
261            query_timeout: Duration::from_secs(10),
262            handler: self.handler,
263        }
264    }
265}
266
267#[zenoh_macros::unstable]
268impl<'a, 'b, 'c, Handler> AdvancedSubscriberBuilderExt<'a, 'b, 'c, Handler>
269    for SubscriberBuilder<'a, 'b, Handler>
270{
271    /// Enable query for historical data.
272    ///
273    /// History can only be retransmitted by [`AdvancedPublishers`](crate::AdvancedPublisher) that enable [`cache`](crate::AdvancedPublisherBuilder::cache).
274    #[zenoh_macros::unstable]
275    fn history(self, config: HistoryConfig) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler> {
276        AdvancedSubscriberBuilder::new(self).history(config)
277    }
278
279    /// Ask for retransmission of detected lost Samples.
280    ///
281    /// Retransmission can only be achieved by [`AdvancedPublishers`](crate::AdvancedPublisher)
282    /// that enable [`cache`](crate::AdvancedPublisherBuilder::cache) and
283    /// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection).
284    #[zenoh_macros::unstable]
285    fn recovery(self, conf: RecoveryConfig) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler> {
286        AdvancedSubscriberBuilder::new(self).recovery(conf)
287    }
288
289    /// Allow this subscriber to be detected through liveliness.
290    #[zenoh_macros::unstable]
291    fn subscriber_detection(self) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler> {
292        AdvancedSubscriberBuilder::new(self).subscriber_detection()
293    }
294
295    /// Turn this [`Subscriber`](zenoh::subscriber::Subscriber) into an [`AdvancedSubscriber`](crate::AdvancedSubscriber).
296    #[zenoh_macros::unstable]
297    fn advanced(self) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler> {
298        AdvancedSubscriberBuilder::new(self)
299    }
300}
301
302#[zenoh_macros::unstable]
303#[allow(deprecated)]
304impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler>
305    for LivelinessSubscriberBuilder<'a, 'b, Handler>
306{
307    type KeySpace = crate::LivelinessSpace;
308
309    /// Create a fetching liveliness subscriber ([`FetchingSubscriber`](super::FetchingSubscriber)).
310    ///
311    /// This operation returns a [`FetchingSubscriberBuilder`](FetchingSubscriberBuilder) that can be used to finely configure the subscriber.
312    /// As soon as built (calling `.wait()` or `.await` on the `FetchingSubscriberBuilder`), the `FetchingSubscriber`
313    /// will run the given `fetch` function. The user defined `fetch` function should fetch some samples and return them
314    /// through the callback function. Those samples will be merged with the received publications and made available in the receiver.
315    /// Later on, new fetches can be performed again, calling [`FetchingSubscriber::fetch()`](super::FetchingSubscriber::fetch()).
316    ///
317    /// A typical usage of the fetching liveliness subscriber is to retrieve existing liveliness tokens while subscribing to
318    /// new liveness changes.
319    ///
320    /// # Examples
321    /// ```no_run
322    /// # #[tokio::main]
323    /// # async fn main() {
324    /// use zenoh::Wait;
325    /// use zenoh_ext::*;
326    ///
327    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
328    /// let subscriber = session
329    ///     .liveliness()
330    ///     .declare_subscriber("key/expr")
331    ///     .fetching( |cb| {
332    ///         session
333    ///             .liveliness()
334    ///             .get("key/expr")
335    ///             .callback(cb)
336    ///             .wait()
337    ///     })
338    ///     .await
339    ///     .unwrap();
340    /// while let Ok(sample) = subscriber.recv_async().await {
341    ///     println!("Received: {:?}", sample);
342    /// }
343    /// # }
344    /// ```
345    #[zenoh_macros::unstable]
346    fn fetching<
347        Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
348        TryIntoSample,
349    >(
350        self,
351        fetch: Fetch,
352    ) -> FetchingSubscriberBuilder<'a, 'b, Self::KeySpace, Handler, Fetch, TryIntoSample>
353    where
354        TryIntoSample: ExtractSample,
355    {
356        FetchingSubscriberBuilder {
357            session: self.session,
358            key_expr: self.key_expr,
359            key_space: crate::LivelinessSpace,
360            origin: Locality::default(),
361            fetch,
362            handler: self.handler,
363            phantom: std::marker::PhantomData,
364        }
365    }
366
367    /// Create a fetching liveliness subscriber ([`FetchingSubscriber`](super::FetchingSubscriber)) that will perform a liveliness query (`session.liveliness().get()`) as it's initial fetch.
368    ///
369    /// This operation returns a [`QueryingSubscriberBuilder`](QueryingSubscriberBuilder) that can be used to finely configure the subscriber.
370    /// As soon as built (calling `.wait()` or `.await` on the `QueryingSubscriberBuilder`), the `FetchingSubscriber`
371    /// will issue a liveliness query on a given key expression (by default it uses the same key expression than it subscribes to).
372    /// The results of the query will be merged with the received publications and made available in the receiver.
373    /// Later on, new fetches can be performed again, calling [`FetchingSubscriber::fetch()`](super::FetchingSubscriber::fetch()).
374    ///
375    /// A typical usage of the fetching liveliness subscriber is to retrieve existing liveliness tokens while subscribing to
376    /// new liveness changes.
377    ///
378    /// # Examples
379    /// ```no_run
380    /// # #[tokio::main]
381    /// # async fn main() {
382    /// use zenoh_ext::*;
383    ///
384    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
385    /// let subscriber = session
386    ///     .liveliness()
387    ///     .declare_subscriber("key/expr")
388    ///     .querying()
389    ///     .await
390    ///     .unwrap();
391    /// while let Ok(sample) = subscriber.recv_async().await {
392    ///     println!("Received: {:?}", sample);
393    /// }
394    /// # }
395    /// ```
396    #[zenoh_macros::unstable]
397    fn querying(self) -> QueryingSubscriberBuilder<'a, 'b, Self::KeySpace, Handler> {
398        QueryingSubscriberBuilder {
399            session: self.session,
400            key_expr: self.key_expr,
401            key_space: crate::LivelinessSpace,
402            origin: Locality::default(),
403            query_selector: None,
404            query_target: QueryTarget::DEFAULT,
405            query_consolidation: QueryConsolidation::DEFAULT,
406            query_accept_replies: ReplyKeyExpr::MatchingQuery,
407            query_timeout: Duration::from_secs(10),
408            handler: self.handler,
409        }
410    }
411}