zenoh_ext/subscriber_ext.rs
1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::time::Duration;
15
16use futures::stream::{Forward, Map};
17use zenoh::{
18 handlers::{fifo, FifoChannelHandler},
19 liveliness::LivelinessSubscriberBuilder,
20 pubsub::{Subscriber, SubscriberBuilder},
21 query::{QueryConsolidation, QueryTarget, ReplyKeyExpr},
22 sample::{Locality, Sample},
23 Result as ZResult,
24};
25
26#[allow(deprecated)]
27use crate::{
28 advanced_subscriber::HistoryConfig, querying_subscriber::QueryingSubscriberBuilder,
29 AdvancedSubscriberBuilder, ExtractSample, FetchingSubscriberBuilder, RecoveryConfig,
30};
31
32/// Allows writing `subscriber.forward(receiver)` instead of `subscriber.stream().map(Ok).forward(publisher)`
33#[zenoh_macros::unstable]
34pub trait SubscriberForward<'a, S> {
35 type Output;
36 fn forward(&'a mut self, sink: S) -> Self::Output;
37}
38impl<'a, S> SubscriberForward<'a, S> for Subscriber<FifoChannelHandler<Sample>>
39where
40 S: futures::sink::Sink<Sample>,
41{
42 #[zenoh_macros::unstable]
43 type Output =
44 Forward<Map<fifo::RecvStream<'a, Sample>, fn(Sample) -> Result<Sample, S::Error>>, S>;
45 fn forward(&'a mut self, sink: S) -> Self::Output {
46 futures::StreamExt::forward(futures::StreamExt::map(self.stream(), Ok), sink)
47 }
48}
49
50/// Some extensions to the [`zenoh::subscriber::SubscriberBuilder`](zenoh::pubsub::SubscriberBuilder)
51#[zenoh_macros::unstable]
52#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
53#[allow(deprecated)]
54pub trait SubscriberBuilderExt<'a, 'b, Handler> {
55 type KeySpace;
56
57 /// Create a [`FetchingSubscriber`](super::FetchingSubscriber).
58 ///
59 /// This operation returns a [`FetchingSubscriberBuilder`](FetchingSubscriberBuilder) that can be used to finely configure the subscriber.
60 /// As soon as built (calling `.wait()` or `.await` on the `FetchingSubscriberBuilder`), the `FetchingSubscriber`
61 /// will run the given `fetch` function. The user defined `fetch` function should fetch some samples and return them
62 /// through the callback function. Those samples will be merged with the received publications and made available in the receiver.
63 /// Later on, new fetches can be performed again, calling [`FetchingSubscriber::fetch()`](super::FetchingSubscriber::fetch()).
64 ///
65 /// A typical usage of the `FetchingSubscriber` is to retrieve publications that were made in the past, but stored in some zenoh Storage.
66 ///
67 /// # Examples
68 /// ```no_run
69 /// # #[tokio::main]
70 /// # async fn main() {
71 /// use zenoh::Wait;
72 /// use zenoh_ext::*;
73 ///
74 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
75 /// let subscriber = session
76 /// .declare_subscriber("key/expr")
77 /// .fetching( |cb| {
78 /// session
79 /// .get("key/expr")
80 /// .callback(cb)
81 /// .wait()
82 /// })
83 /// .await
84 /// .unwrap();
85 /// while let Ok(sample) = subscriber.recv_async().await {
86 /// println!("Received: {:?}", sample);
87 /// }
88 /// # }
89 /// ```
90 #[zenoh_macros::unstable]
91 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
92 fn fetching<
93 Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
94 TryIntoSample,
95 >(
96 self,
97 fetch: Fetch,
98 ) -> FetchingSubscriberBuilder<'a, 'b, Self::KeySpace, Handler, Fetch, TryIntoSample>
99 where
100 TryIntoSample: ExtractSample;
101
102 /// Create a [`FetchingSubscriber`](super::FetchingSubscriber) that will perform a query (`session.get()`) as it's initial fetch.
103 ///
104 /// This operation returns a [`QueryingSubscriberBuilder`](QueryingSubscriberBuilder) that can be used to finely configure the subscriber.
105 /// As soon as built (calling `.wait()` or `.await` on the `QueryingSubscriberBuilder`), the `FetchingSubscriber`
106 /// will issue a query on a given key expression (by default it uses the same key expression than it subscribes to).
107 /// The results of the query will be merged with the received publications and made available in the receiver.
108 /// Later on, new fetches can be performed again, calling [`FetchingSubscriber::fetch()`](super::FetchingSubscriber::fetch()).
109 ///
110 /// A typical usage of the `FetchingSubscriber` is to retrieve publications that were made in the past, but stored in some zenoh Storage.
111 ///
112 /// # Examples
113 /// ```no_run
114 /// # #[tokio::main]
115 /// # async fn main() {
116 /// use zenoh_ext::*;
117 ///
118 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
119 /// let subscriber = session
120 /// .declare_subscriber("key/expr")
121 /// .querying()
122 /// .await
123 /// .unwrap();
124 /// while let Ok(sample) = subscriber.recv_async().await {
125 /// println!("Received: {:?}", sample);
126 /// }
127 /// # }
128 /// ```
129 #[zenoh_macros::unstable]
130 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
131 fn querying(self) -> QueryingSubscriberBuilder<'a, 'b, Self::KeySpace, Handler>;
132}
133
134/// Some extensions to the [`zenoh::subscriber::SubscriberBuilder`](zenoh::pubsub::SubscriberBuilder)
135#[zenoh_macros::unstable]
136pub trait AdvancedSubscriberBuilderExt<'a, 'b, 'c, Handler> {
137 /// Enable query for historical data.
138 ///
139 /// History can only be retransmitted by [`AdvancedPublishers`](crate::AdvancedPublisher) that enable [`cache`](crate::AdvancedPublisherBuilder::cache).
140 #[zenoh_macros::unstable]
141 fn history(self, config: HistoryConfig) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler>;
142
143 /// Ask for retransmission of detected lost Samples.
144 ///
145 /// Retransmission can only be achieved by [`AdvancedPublishers`](crate::AdvancedPublisher)
146 /// that enable [`cache`](crate::AdvancedPublisherBuilder::cache) and
147 /// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection).
148 #[zenoh_macros::unstable]
149 fn recovery(self, conf: RecoveryConfig) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler>;
150
151 /// Allow this subscriber to be detected through liveliness.
152 #[zenoh_macros::unstable]
153 fn subscriber_detection(self) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler>;
154
155 /// Turn this [`Subscriber`](zenoh::subscriber::Subscriber) into an [`AdvancedSubscriber`](crate::AdvancedSubscriber).
156 #[zenoh_macros::unstable]
157 fn advanced(self) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler>;
158}
159
160#[zenoh_macros::unstable]
161#[allow(deprecated)]
162impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler> for SubscriberBuilder<'a, 'b, Handler> {
163 type KeySpace = crate::UserSpace;
164
165 /// Create a [`FetchingSubscriber`](super::FetchingSubscriber).
166 ///
167 /// This operation returns a [`FetchingSubscriberBuilder`](FetchingSubscriberBuilder) that can be used to finely configure the subscriber.
168 /// As soon as built (calling `.wait()` or `.await` on the `FetchingSubscriberBuilder`), the `FetchingSubscriber`
169 /// will run the given `fetch` function. The user defined `fetch` function should fetch some samples and return them
170 /// through the callback function. Those samples will be merged with the received publications and made available in the receiver.
171 /// Later on, new fetches can be performed again, calling [`FetchingSubscriber::fetch()`](super::FetchingSubscriber::fetch()).
172 ///
173 /// A typical usage of the `FetchingSubscriber` is to retrieve publications that were made in the past, but stored in some zenoh Storage.
174 ///
175 /// # Examples
176 /// ```no_run
177 /// # #[tokio::main]
178 /// # async fn main() {
179 /// use zenoh::Wait;
180 /// use zenoh_ext::*;
181 ///
182 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
183 /// let subscriber = session
184 /// .declare_subscriber("key/expr")
185 /// .fetching( |cb| {
186 /// session
187 /// .get("key/expr")
188 /// .callback(cb)
189 /// .wait()
190 /// })
191 /// .await
192 /// .unwrap();
193 /// while let Ok(sample) = subscriber.recv_async().await {
194 /// println!("Received: {:?}", sample);
195 /// }
196 /// # }
197 /// ```
198 #[zenoh_macros::unstable]
199 fn fetching<
200 Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
201 TryIntoSample,
202 >(
203 self,
204 fetch: Fetch,
205 ) -> FetchingSubscriberBuilder<'a, 'b, Self::KeySpace, Handler, Fetch, TryIntoSample>
206 where
207 TryIntoSample: ExtractSample,
208 {
209 FetchingSubscriberBuilder {
210 session: self.session,
211 key_expr: self.key_expr,
212 key_space: crate::UserSpace,
213 origin: self.origin,
214 fetch,
215 handler: self.handler,
216 phantom: std::marker::PhantomData,
217 }
218 }
219
220 /// Create a [`FetchingSubscriber`](super::FetchingSubscriber) that will perform a query (`session.get()`) as it's initial fetch.
221 ///
222 /// This operation returns a [`QueryingSubscriberBuilder`](QueryingSubscriberBuilder) that can be used to finely configure the subscriber.
223 /// As soon as built (calling `.wait()` or `.await` on the `QueryingSubscriberBuilder`), the `FetchingSubscriber`
224 /// will issue a query on a given key expression (by default it uses the same key expression than it subscribes to).
225 /// The results of the query will be merged with the received publications and made available in the receiver.
226 /// Later on, new fetches can be performed again, calling [`FetchingSubscriber::fetch()`](super::FetchingSubscriber::fetch()).
227 ///
228 /// A typical usage of the `FetchingSubscriber` is to retrieve publications that were made in the past, but stored in some zenoh Storage.
229 ///
230 /// # Examples
231 /// ```no_run
232 /// # #[tokio::main]
233 /// # async fn main() {
234 /// use zenoh_ext::*;
235 ///
236 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
237 /// let subscriber = session
238 /// .declare_subscriber("key/expr")
239 /// .querying()
240 /// .await
241 /// .unwrap();
242 /// while let Ok(sample) = subscriber.recv_async().await {
243 /// println!("Received: {:?}", sample);
244 /// }
245 /// # }
246 /// ```
247 #[zenoh_macros::unstable]
248 fn querying(self) -> QueryingSubscriberBuilder<'a, 'b, Self::KeySpace, Handler> {
249 QueryingSubscriberBuilder {
250 session: self.session,
251 key_expr: self.key_expr,
252 key_space: crate::UserSpace,
253 origin: self.origin,
254 query_selector: None,
255 // By default query all matching publication caches and storages
256 query_target: QueryTarget::All,
257 // By default no query consolidation, to receive more than 1 sample per-resource
258 // (if history of publications is available)
259 query_consolidation: QueryConsolidation::from(zenoh::query::ConsolidationMode::None),
260 query_accept_replies: ReplyKeyExpr::default(),
261 query_timeout: Duration::from_secs(10),
262 handler: self.handler,
263 }
264 }
265}
266
267#[zenoh_macros::unstable]
268impl<'a, 'b, 'c, Handler> AdvancedSubscriberBuilderExt<'a, 'b, 'c, Handler>
269 for SubscriberBuilder<'a, 'b, Handler>
270{
271 /// Enable query for historical data.
272 ///
273 /// History can only be retransmitted by [`AdvancedPublishers`](crate::AdvancedPublisher) that enable [`cache`](crate::AdvancedPublisherBuilder::cache).
274 #[zenoh_macros::unstable]
275 fn history(self, config: HistoryConfig) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler> {
276 AdvancedSubscriberBuilder::new(self).history(config)
277 }
278
279 /// Ask for retransmission of detected lost Samples.
280 ///
281 /// Retransmission can only be achieved by [`AdvancedPublishers`](crate::AdvancedPublisher)
282 /// that enable [`cache`](crate::AdvancedPublisherBuilder::cache) and
283 /// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection).
284 #[zenoh_macros::unstable]
285 fn recovery(self, conf: RecoveryConfig) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler> {
286 AdvancedSubscriberBuilder::new(self).recovery(conf)
287 }
288
289 /// Allow this subscriber to be detected through liveliness.
290 #[zenoh_macros::unstable]
291 fn subscriber_detection(self) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler> {
292 AdvancedSubscriberBuilder::new(self).subscriber_detection()
293 }
294
295 /// Turn this [`Subscriber`](zenoh::subscriber::Subscriber) into an [`AdvancedSubscriber`](crate::AdvancedSubscriber).
296 #[zenoh_macros::unstable]
297 fn advanced(self) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler> {
298 AdvancedSubscriberBuilder::new(self)
299 }
300}
301
302#[zenoh_macros::unstable]
303#[allow(deprecated)]
304impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler>
305 for LivelinessSubscriberBuilder<'a, 'b, Handler>
306{
307 type KeySpace = crate::LivelinessSpace;
308
309 /// Create a fetching liveliness subscriber ([`FetchingSubscriber`](super::FetchingSubscriber)).
310 ///
311 /// This operation returns a [`FetchingSubscriberBuilder`](FetchingSubscriberBuilder) that can be used to finely configure the subscriber.
312 /// As soon as built (calling `.wait()` or `.await` on the `FetchingSubscriberBuilder`), the `FetchingSubscriber`
313 /// will run the given `fetch` function. The user defined `fetch` function should fetch some samples and return them
314 /// through the callback function. Those samples will be merged with the received publications and made available in the receiver.
315 /// Later on, new fetches can be performed again, calling [`FetchingSubscriber::fetch()`](super::FetchingSubscriber::fetch()).
316 ///
317 /// A typical usage of the fetching liveliness subscriber is to retrieve existing liveliness tokens while subscribing to
318 /// new liveness changes.
319 ///
320 /// # Examples
321 /// ```no_run
322 /// # #[tokio::main]
323 /// # async fn main() {
324 /// use zenoh::Wait;
325 /// use zenoh_ext::*;
326 ///
327 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
328 /// let subscriber = session
329 /// .liveliness()
330 /// .declare_subscriber("key/expr")
331 /// .fetching( |cb| {
332 /// session
333 /// .liveliness()
334 /// .get("key/expr")
335 /// .callback(cb)
336 /// .wait()
337 /// })
338 /// .await
339 /// .unwrap();
340 /// while let Ok(sample) = subscriber.recv_async().await {
341 /// println!("Received: {:?}", sample);
342 /// }
343 /// # }
344 /// ```
345 #[zenoh_macros::unstable]
346 fn fetching<
347 Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
348 TryIntoSample,
349 >(
350 self,
351 fetch: Fetch,
352 ) -> FetchingSubscriberBuilder<'a, 'b, Self::KeySpace, Handler, Fetch, TryIntoSample>
353 where
354 TryIntoSample: ExtractSample,
355 {
356 FetchingSubscriberBuilder {
357 session: self.session,
358 key_expr: self.key_expr,
359 key_space: crate::LivelinessSpace,
360 origin: Locality::default(),
361 fetch,
362 handler: self.handler,
363 phantom: std::marker::PhantomData,
364 }
365 }
366
367 /// Create a fetching liveliness subscriber ([`FetchingSubscriber`](super::FetchingSubscriber)) that will perform a liveliness query (`session.liveliness().get()`) as it's initial fetch.
368 ///
369 /// This operation returns a [`QueryingSubscriberBuilder`](QueryingSubscriberBuilder) that can be used to finely configure the subscriber.
370 /// As soon as built (calling `.wait()` or `.await` on the `QueryingSubscriberBuilder`), the `FetchingSubscriber`
371 /// will issue a liveliness query on a given key expression (by default it uses the same key expression than it subscribes to).
372 /// The results of the query will be merged with the received publications and made available in the receiver.
373 /// Later on, new fetches can be performed again, calling [`FetchingSubscriber::fetch()`](super::FetchingSubscriber::fetch()).
374 ///
375 /// A typical usage of the fetching liveliness subscriber is to retrieve existing liveliness tokens while subscribing to
376 /// new liveness changes.
377 ///
378 /// # Examples
379 /// ```no_run
380 /// # #[tokio::main]
381 /// # async fn main() {
382 /// use zenoh_ext::*;
383 ///
384 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
385 /// let subscriber = session
386 /// .liveliness()
387 /// .declare_subscriber("key/expr")
388 /// .querying()
389 /// .await
390 /// .unwrap();
391 /// while let Ok(sample) = subscriber.recv_async().await {
392 /// println!("Received: {:?}", sample);
393 /// }
394 /// # }
395 /// ```
396 #[zenoh_macros::unstable]
397 fn querying(self) -> QueryingSubscriberBuilder<'a, 'b, Self::KeySpace, Handler> {
398 QueryingSubscriberBuilder {
399 session: self.session,
400 key_expr: self.key_expr,
401 key_space: crate::LivelinessSpace,
402 origin: Locality::default(),
403 query_selector: None,
404 query_target: QueryTarget::DEFAULT,
405 query_consolidation: QueryConsolidation::DEFAULT,
406 query_accept_replies: ReplyKeyExpr::MatchingQuery,
407 query_timeout: Duration::from_secs(10),
408 handler: self.handler,
409 }
410 }
411}