zenoh_ext/subscriber_ext.rs
1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::time::Duration;
15
16use futures::stream::{Forward, Map};
17use zenoh::{
18 handlers::{fifo, FifoChannelHandler},
19 liveliness::LivelinessSubscriberBuilder,
20 pubsub::{Subscriber, SubscriberBuilder},
21 query::{QueryConsolidation, QueryTarget, ReplyKeyExpr},
22 sample::{Locality, Sample},
23 Result as ZResult,
24};
25
26#[allow(deprecated)]
27use crate::{
28 advanced_subscriber::HistoryConfig, querying_subscriber::QueryingSubscriberBuilder,
29 AdvancedSubscriberBuilder, ExtractSample, FetchingSubscriberBuilder, RecoveryConfig,
30};
31
32/// Allows writing `subscriber.forward(receiver)` instead of `subscriber.stream().map(Ok).forward(publisher)`
33#[zenoh_macros::unstable]
34pub trait SubscriberForward<'a, S> {
35 type Output;
36 fn forward(&'a mut self, sink: S) -> Self::Output;
37}
38impl<'a, S> SubscriberForward<'a, S> for Subscriber<FifoChannelHandler<Sample>>
39where
40 S: futures::sink::Sink<Sample>,
41{
42 #[zenoh_macros::unstable]
43 type Output =
44 Forward<Map<fifo::RecvStream<'a, Sample>, fn(Sample) -> Result<Sample, S::Error>>, S>;
45 fn forward(&'a mut self, sink: S) -> Self::Output {
46 futures::StreamExt::forward(futures::StreamExt::map(self.stream(), Ok), sink)
47 }
48}
49
50/// Some extensions to the [`zenoh::subscriber::SubscriberBuilder`](zenoh::pubsub::SubscriberBuilder)
51#[zenoh_macros::unstable]
52#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
53#[allow(deprecated)]
54pub trait SubscriberBuilderExt<'a, 'b, Handler> {
55 type KeySpace;
56
57 /// Create a [`FetchingSubscriber`](super::FetchingSubscriber).
58 ///
59 /// This operation returns a [`FetchingSubscriberBuilder`](FetchingSubscriberBuilder) that can be used to finely configure the subscriber.
60 /// As soon as built (calling `.wait()` or `.await` on the `FetchingSubscriberBuilder`), the `FetchingSubscriber`
61 /// will run the given `fetch` function. The user defined `fetch` function should fetch some samples and return them
62 /// through the callback function. Those samples will be merged with the received publications and made available in the receiver.
63 /// Later on, new fetches can be performed again, calling [`FetchingSubscriber::fetch()`](super::FetchingSubscriber::fetch()).
64 ///
65 /// A typical usage of the `FetchingSubscriber` is to retrieve publications that were made in the past, but stored in some zenoh Storage.
66 ///
67 /// # Examples
68 /// ```no_run
69 /// # #[tokio::main]
70 /// # async fn main() {
71 /// use zenoh::Wait;
72 /// use zenoh_ext::*;
73 ///
74 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
75 /// let subscriber = session
76 /// .declare_subscriber("key/expr")
77 /// .fetching( |cb| {
78 /// session
79 /// .get("key/expr")
80 /// .callback(cb)
81 /// .wait()
82 /// })
83 /// .await
84 /// .unwrap();
85 /// while let Ok(sample) = subscriber.recv_async().await {
86 /// println!("Received: {:?}", sample);
87 /// }
88 /// # }
89 /// ```
90 #[zenoh_macros::unstable]
91 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
92 fn fetching<
93 Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
94 TryIntoSample,
95 >(
96 self,
97 fetch: Fetch,
98 ) -> FetchingSubscriberBuilder<'a, 'b, Self::KeySpace, Handler, Fetch, TryIntoSample>
99 where
100 TryIntoSample: ExtractSample;
101
102 /// Create a [`FetchingSubscriber`](super::FetchingSubscriber) that will perform a query (`session.get()`) as it's
103 /// initial fetch.
104 ///
105 /// This operation returns a [`QueryingSubscriberBuilder`](QueryingSubscriberBuilder) that can be used to finely configure the subscriber.
106 /// As soon as built (calling `.wait()` or `.await` on the `QueryingSubscriberBuilder`), the `FetchingSubscriber`
107 /// will issue a query on a given key expression (by default it uses the same key expression than it subscribes to).
108 /// The results of the query will be merged with the received publications and made available in the receiver.
109 /// Later on, new fetches can be performed again, calling [`FetchingSubscriber::fetch()`](super::FetchingSubscriber::fetch()).
110 ///
111 /// A typical usage of the `FetchingSubscriber` is to retrieve publications that were made in the past, but stored in some zenoh Storage.
112 ///
113 /// # Examples
114 /// ```no_run
115 /// # #[tokio::main]
116 /// # async fn main() {
117 /// use zenoh_ext::*;
118 ///
119 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
120 /// let subscriber = session
121 /// .declare_subscriber("key/expr")
122 /// .querying()
123 /// .await
124 /// .unwrap();
125 /// while let Ok(sample) = subscriber.recv_async().await {
126 /// println!("Received: {:?}", sample);
127 /// }
128 /// # }
129 /// ```
130 #[zenoh_macros::unstable]
131 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
132 fn querying(self) -> QueryingSubscriberBuilder<'a, 'b, Self::KeySpace, Handler>;
133}
134
135/// Some extensions to the [`zenoh::subscriber::SubscriberBuilder`](zenoh::pubsub::SubscriberBuilder)
136#[zenoh_macros::unstable]
137pub trait AdvancedSubscriberBuilderExt<'a, 'b, 'c, Handler> {
138 /// Enable query for historical data.
139 ///
140 /// History can only be retransmitted by [`AdvancedPublishers`](crate::AdvancedPublisher) that enable [`cache`](crate::AdvancedPublisherBuilder::cache).
141 #[zenoh_macros::unstable]
142 fn history(self, config: HistoryConfig) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler>;
143
144 /// Ask for retransmission of detected lost Samples.
145 ///
146 /// Retransmission can only be achieved by [`AdvancedPublishers`](crate::AdvancedPublisher)
147 /// that enable [`cache`](crate::AdvancedPublisherBuilder::cache) and
148 /// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection).
149 #[zenoh_macros::unstable]
150 fn recovery(self, conf: RecoveryConfig) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler>;
151
152 /// Allow this subscriber to be detected through liveliness.
153 #[zenoh_macros::unstable]
154 fn subscriber_detection(self) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler>;
155
156 /// Turn this [`Subscriber`](zenoh::subscriber::Subscriber) into an [`AdvancedSubscriber`](crate::AdvancedSubscriber).
157 #[zenoh_macros::unstable]
158 fn advanced(self) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler>;
159}
160
161#[zenoh_macros::unstable]
162#[allow(deprecated)]
163impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler> for SubscriberBuilder<'a, 'b, Handler> {
164 type KeySpace = crate::UserSpace;
165
166 /// Create a [`FetchingSubscriber`](super::FetchingSubscriber).
167 ///
168 /// This operation returns a [`FetchingSubscriberBuilder`](FetchingSubscriberBuilder) that can be used to finely configure the subscriber.
169 /// As soon as built (calling `.wait()` or `.await` on the `FetchingSubscriberBuilder`), the `FetchingSubscriber`
170 /// will run the given `fetch` function. The user defined `fetch` function should fetch some samples and return them
171 /// through the callback function. Those samples will be merged with the received publications and made available in the receiver.
172 /// Later on, new fetches can be performed again, calling [`FetchingSubscriber::fetch()`](super::FetchingSubscriber::fetch()).
173 ///
174 /// A typical usage of the `FetchingSubscriber` is to retrieve publications that were made in the past, but stored in some zenoh Storage.
175 ///
176 /// # Examples
177 /// ```no_run
178 /// # #[tokio::main]
179 /// # async fn main() {
180 /// use zenoh::Wait;
181 /// use zenoh_ext::*;
182 ///
183 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
184 /// let subscriber = session
185 /// .declare_subscriber("key/expr")
186 /// .fetching( |cb| {
187 /// session
188 /// .get("key/expr")
189 /// .callback(cb)
190 /// .wait()
191 /// })
192 /// .await
193 /// .unwrap();
194 /// while let Ok(sample) = subscriber.recv_async().await {
195 /// println!("Received: {:?}", sample);
196 /// }
197 /// # }
198 /// ```
199 #[zenoh_macros::unstable]
200 fn fetching<
201 Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
202 TryIntoSample,
203 >(
204 self,
205 fetch: Fetch,
206 ) -> FetchingSubscriberBuilder<'a, 'b, Self::KeySpace, Handler, Fetch, TryIntoSample>
207 where
208 TryIntoSample: ExtractSample,
209 {
210 FetchingSubscriberBuilder {
211 session: self.session,
212 key_expr: self.key_expr,
213 key_space: crate::UserSpace,
214 origin: self.origin,
215 fetch,
216 handler: self.handler,
217 phantom: std::marker::PhantomData,
218 }
219 }
220
221 /// Create a [`FetchingSubscriber`](super::FetchingSubscriber) that will perform a query (`session.get()`) as it's
222 /// initial fetch.
223 ///
224 /// This operation returns a [`QueryingSubscriberBuilder`](QueryingSubscriberBuilder) that can be used to finely configure the subscriber.
225 /// As soon as built (calling `.wait()` or `.await` on the `QueryingSubscriberBuilder`), the `FetchingSubscriber`
226 /// will issue a query on a given key expression (by default it uses the same key expression than it subscribes to).
227 /// The results of the query will be merged with the received publications and made available in the receiver.
228 /// Later on, new fetches can be performed again, calling [`FetchingSubscriber::fetch()`](super::FetchingSubscriber::fetch()).
229 ///
230 /// A typical usage of the `FetchingSubscriber` is to retrieve publications that were made in the past, but stored in some zenoh Storage.
231 ///
232 /// # Examples
233 /// ```no_run
234 /// # #[tokio::main]
235 /// # async fn main() {
236 /// use zenoh_ext::*;
237 ///
238 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
239 /// let subscriber = session
240 /// .declare_subscriber("key/expr")
241 /// .querying()
242 /// .await
243 /// .unwrap();
244 /// while let Ok(sample) = subscriber.recv_async().await {
245 /// println!("Received: {:?}", sample);
246 /// }
247 /// # }
248 /// ```
249 #[zenoh_macros::unstable]
250 fn querying(self) -> QueryingSubscriberBuilder<'a, 'b, Self::KeySpace, Handler> {
251 QueryingSubscriberBuilder {
252 session: self.session,
253 key_expr: self.key_expr,
254 key_space: crate::UserSpace,
255 origin: self.origin,
256 query_selector: None,
257 // By default query all matching publication caches and storages
258 query_target: QueryTarget::All,
259 // By default no query consolidation, to receive more than 1 sample per-resource
260 // (if history of publications is available)
261 query_consolidation: QueryConsolidation::from(zenoh::query::ConsolidationMode::None),
262 query_accept_replies: ReplyKeyExpr::default(),
263 query_timeout: Duration::from_secs(10),
264 handler: self.handler,
265 }
266 }
267}
268
269#[zenoh_macros::unstable]
270impl<'a, 'b, 'c, Handler> AdvancedSubscriberBuilderExt<'a, 'b, 'c, Handler>
271 for SubscriberBuilder<'a, 'b, Handler>
272{
273 /// Enable query for historical data.
274 ///
275 /// History can only be retransmitted by [`AdvancedPublishers`](crate::AdvancedPublisher) that enable [`cache`](crate::AdvancedPublisherBuilder::cache).
276 #[zenoh_macros::unstable]
277 fn history(self, config: HistoryConfig) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler> {
278 AdvancedSubscriberBuilder::new(self).history(config)
279 }
280
281 /// Ask for retransmission of detected lost Samples.
282 ///
283 /// Retransmission can only be achieved by [`AdvancedPublishers`](crate::AdvancedPublisher)
284 /// that enable [`cache`](crate::AdvancedPublisherBuilder::cache) and
285 /// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection).
286 #[zenoh_macros::unstable]
287 fn recovery(self, conf: RecoveryConfig) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler> {
288 AdvancedSubscriberBuilder::new(self).recovery(conf)
289 }
290
291 /// Allow this subscriber to be detected through liveliness.
292 #[zenoh_macros::unstable]
293 fn subscriber_detection(self) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler> {
294 AdvancedSubscriberBuilder::new(self).subscriber_detection()
295 }
296
297 /// Turn this [`Subscriber`](zenoh::subscriber::Subscriber) into an [`AdvancedSubscriber`](crate::AdvancedSubscriber).
298 #[zenoh_macros::unstable]
299 fn advanced(self) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler> {
300 AdvancedSubscriberBuilder::new(self)
301 }
302}
303
304#[zenoh_macros::unstable]
305#[allow(deprecated)]
306impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler>
307 for LivelinessSubscriberBuilder<'a, 'b, Handler>
308{
309 type KeySpace = crate::LivelinessSpace;
310
311 /// Create a fetching liveliness subscriber ([`FetchingSubscriber`](super::FetchingSubscriber)).
312 ///
313 /// This operation returns a [`FetchingSubscriberBuilder`](FetchingSubscriberBuilder) that can be used to finely configure the subscriber.
314 /// As soon as built (calling `.wait()` or `.await` on the `FetchingSubscriberBuilder`), the `FetchingSubscriber`
315 /// will run the given `fetch` function. The user defined `fetch` function should fetch some samples and return them
316 /// through the callback function. Those samples will be merged with the received publications and made available in the receiver.
317 /// Later on, new fetches can be performed again, calling [`FetchingSubscriber::fetch()`](super::FetchingSubscriber::fetch()).
318 ///
319 /// A typical usage of the fetching liveliness subscriber is to retrieve existing liveliness tokens while subscribing to
320 /// new liveness changes.
321 ///
322 /// # Examples
323 /// ```no_run
324 /// # #[tokio::main]
325 /// # async fn main() {
326 /// use zenoh::Wait;
327 /// use zenoh_ext::*;
328 ///
329 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
330 /// let subscriber = session
331 /// .liveliness()
332 /// .declare_subscriber("key/expr")
333 /// .fetching( |cb| {
334 /// session
335 /// .liveliness()
336 /// .get("key/expr")
337 /// .callback(cb)
338 /// .wait()
339 /// })
340 /// .await
341 /// .unwrap();
342 /// while let Ok(sample) = subscriber.recv_async().await {
343 /// println!("Received: {:?}", sample);
344 /// }
345 /// # }
346 /// ```
347 #[zenoh_macros::unstable]
348 fn fetching<
349 Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
350 TryIntoSample,
351 >(
352 self,
353 fetch: Fetch,
354 ) -> FetchingSubscriberBuilder<'a, 'b, Self::KeySpace, Handler, Fetch, TryIntoSample>
355 where
356 TryIntoSample: ExtractSample,
357 {
358 FetchingSubscriberBuilder {
359 session: self.session,
360 key_expr: self.key_expr,
361 key_space: crate::LivelinessSpace,
362 origin: Locality::default(),
363 fetch,
364 handler: self.handler,
365 phantom: std::marker::PhantomData,
366 }
367 }
368
369 /// Create a fetching liveliness subscriber ([`FetchingSubscriber`](super::FetchingSubscriber)) that will perform a
370 /// liveliness query (`session.liveliness().get()`) as it's initial fetch.
371 ///
372 /// This operation returns a [`QueryingSubscriberBuilder`](QueryingSubscriberBuilder) that can be used to finely configure the subscriber.
373 /// As soon as built (calling `.wait()` or `.await` on the `QueryingSubscriberBuilder`), the `FetchingSubscriber`
374 /// will issue a liveliness query on a given key expression (by default it uses the same key expression than it subscribes to).
375 /// The results of the query will be merged with the received publications and made available in the receiver.
376 /// Later on, new fetches can be performed again, calling [`FetchingSubscriber::fetch()`](super::FetchingSubscriber::fetch()).
377 ///
378 /// A typical usage of the fetching liveliness subscriber is to retrieve existing liveliness tokens while subscribing to
379 /// new liveness changes.
380 ///
381 /// # Examples
382 /// ```no_run
383 /// # #[tokio::main]
384 /// # async fn main() {
385 /// use zenoh_ext::*;
386 ///
387 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
388 /// let subscriber = session
389 /// .liveliness()
390 /// .declare_subscriber("key/expr")
391 /// .querying()
392 /// .await
393 /// .unwrap();
394 /// while let Ok(sample) = subscriber.recv_async().await {
395 /// println!("Received: {:?}", sample);
396 /// }
397 /// # }
398 /// ```
399 #[zenoh_macros::unstable]
400 fn querying(self) -> QueryingSubscriberBuilder<'a, 'b, Self::KeySpace, Handler> {
401 QueryingSubscriberBuilder {
402 session: self.session,
403 key_expr: self.key_expr,
404 key_space: crate::LivelinessSpace,
405 origin: Locality::default(),
406 query_selector: None,
407 query_target: QueryTarget::DEFAULT,
408 query_consolidation: QueryConsolidation::DEFAULT,
409 query_accept_replies: ReplyKeyExpr::MatchingQuery,
410 query_timeout: Duration::from_secs(10),
411 handler: self.handler,
412 }
413 }
414}