1use std::{
15 collections::{btree_map, BTreeMap, VecDeque},
16 convert::TryInto,
17 future::{IntoFuture, Ready},
18 mem::swap,
19 sync::{Arc, Mutex},
20 time::{Duration, SystemTime, UNIX_EPOCH},
21};
22
23use zenoh::{
24 handlers::{locked, Callback, DefaultHandler, IntoHandler},
25 internal::{zerror, zlock},
26 key_expr::KeyExpr,
27 pubsub::Subscriber,
28 query::{QueryConsolidation, QueryTarget, Reply, ReplyKeyExpr, Selector},
29 sample::{Locality, Sample, SampleBuilder},
30 time::Timestamp,
31 Error, Resolvable, Resolve, Result as ZResult, Session, Wait,
32};
33
34#[zenoh_macros::unstable]
36#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
37pub enum KeySpace {
38 User,
39 Liveliness,
40}
41
42#[zenoh_macros::unstable]
44#[non_exhaustive]
45#[derive(Debug, Clone, Copy)]
46#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
47pub struct UserSpace;
48
49#[allow(deprecated)]
50impl From<UserSpace> for KeySpace {
51 fn from(_: UserSpace) -> Self {
52 KeySpace::User
53 }
54}
55
56#[zenoh_macros::unstable]
58#[non_exhaustive]
59#[derive(Debug, Clone, Copy)]
60#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
61pub struct LivelinessSpace;
62
63#[zenoh_macros::unstable]
64#[allow(deprecated)]
65impl From<LivelinessSpace> for KeySpace {
66 #[zenoh_macros::unstable]
67 fn from(_: LivelinessSpace) -> Self {
68 KeySpace::Liveliness
69 }
70}
71
72#[zenoh_macros::unstable]
74#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
75#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
76pub struct QueryingSubscriberBuilder<'a, 'b, KeySpace, Handler, const BACKGROUND: bool = false> {
77 pub(crate) session: &'a Session,
78 pub(crate) key_expr: ZResult<KeyExpr<'b>>,
79 pub(crate) key_space: KeySpace,
80 pub(crate) origin: Locality,
81 pub(crate) query_selector: Option<ZResult<Selector<'b>>>,
82 pub(crate) query_target: QueryTarget,
83 pub(crate) query_consolidation: QueryConsolidation,
84 pub(crate) query_accept_replies: ReplyKeyExpr,
85 pub(crate) query_timeout: Duration,
86 pub(crate) handler: Handler,
87}
88
89#[zenoh_macros::unstable]
90#[allow(deprecated)]
91impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandler> {
92 #[zenoh_macros::unstable]
94 #[inline]
95 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
96 pub fn callback<F>(
97 self,
98 callback: F,
99 ) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>>
100 where
101 F: Fn(Sample) + Send + Sync + 'static,
102 {
103 self.with(Callback::from(callback))
104 }
105
106 #[zenoh_macros::unstable]
115 #[inline]
116 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
117 pub fn callback_mut<F>(
118 self,
119 callback: F,
120 ) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>>
121 where
122 F: FnMut(Sample) + Send + Sync + 'static,
123 {
124 self.callback(locked(callback))
125 }
126
127 #[zenoh_macros::unstable]
129 #[inline]
130 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
131 pub fn with<Handler>(
132 self,
133 handler: Handler,
134 ) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, Handler>
135 where
136 Handler: IntoHandler<Sample>,
137 {
138 let QueryingSubscriberBuilder {
139 session,
140 key_expr,
141 key_space,
142 origin,
143 query_selector,
144 query_target,
145 query_consolidation,
146 query_accept_replies,
147 query_timeout,
148 handler: _,
149 } = self;
150 QueryingSubscriberBuilder {
151 session,
152 key_expr,
153 key_space,
154 origin,
155 query_selector,
156 query_target,
157 query_consolidation,
158 query_accept_replies,
159 query_timeout,
160 handler,
161 }
162 }
163}
164
165#[zenoh_macros::unstable]
166#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
167#[allow(deprecated)]
168impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>> {
169 #[zenoh_macros::unstable]
173 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
174 pub fn background(self) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>, true> {
175 QueryingSubscriberBuilder {
176 session: self.session,
177 key_expr: self.key_expr,
178 key_space: self.key_space,
179 origin: self.origin,
180 query_selector: self.query_selector,
181 query_target: self.query_target,
182 query_consolidation: self.query_consolidation,
183 query_accept_replies: self.query_accept_replies,
184 query_timeout: self.query_timeout,
185 handler: self.handler,
186 }
187 }
188}
189
190#[zenoh_macros::unstable]
191#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
192#[allow(deprecated)]
193impl<'b, Handler, const BACKGROUND: bool>
194 QueryingSubscriberBuilder<'_, 'b, UserSpace, Handler, BACKGROUND>
195{
196 #[zenoh_macros::unstable]
201 #[inline]
202 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
203 pub fn allowed_origin(mut self, origin: Locality) -> Self {
204 self.origin = origin;
205 self
206 }
207
208 #[zenoh_macros::unstable]
210 #[inline]
211 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
212 pub fn query_selector<IntoSelector>(mut self, query_selector: IntoSelector) -> Self
213 where
214 IntoSelector: TryInto<Selector<'b>>,
215 <IntoSelector as TryInto<Selector<'b>>>::Error: Into<Error>,
216 {
217 self.query_selector = Some(query_selector.try_into().map_err(Into::into));
218 self
219 }
220
221 #[zenoh_macros::unstable]
223 #[inline]
224 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
225 pub fn query_target(mut self, query_target: QueryTarget) -> Self {
226 self.query_target = query_target;
227 self
228 }
229
230 #[zenoh_macros::unstable]
232 #[inline]
233 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
234 pub fn query_consolidation<QC: Into<QueryConsolidation>>(
235 mut self,
236 query_consolidation: QC,
237 ) -> Self {
238 self.query_consolidation = query_consolidation.into();
239 self
240 }
241
242 #[zenoh_macros::unstable]
244 #[inline]
245 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
246 pub fn query_accept_replies(mut self, accept_replies: ReplyKeyExpr) -> Self {
247 self.query_accept_replies = accept_replies;
248 self
249 }
250}
251
252#[zenoh_macros::unstable]
253#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
254#[allow(deprecated)]
255impl<'a, 'b, KeySpace, Handler, const BACKGROUND: bool>
256 QueryingSubscriberBuilder<'a, 'b, KeySpace, Handler, BACKGROUND>
257{
258 #[zenoh_macros::unstable]
260 #[inline]
261 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
262 pub fn query_timeout(mut self, query_timeout: Duration) -> Self {
263 self.query_timeout = query_timeout;
264 self
265 }
266
267 #[zenoh_macros::unstable]
268 #[allow(clippy::type_complexity)]
269 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
270 fn into_fetching_subscriber_builder(
271 self,
272 ) -> ZResult<
273 FetchingSubscriberBuilder<
274 'a,
275 'b,
276 KeySpace,
277 Handler,
278 impl FnOnce(Box<dyn Fn(Reply) + Send + Sync>) -> ZResult<()>,
279 Reply,
280 BACKGROUND,
281 >,
282 >
283 where
284 KeySpace: Into<self::KeySpace> + Clone,
285 Handler: IntoHandler<Sample>,
286 Handler::Handler: Send,
287 {
288 let session = self.session.downgrade();
289 let key_expr = self.key_expr?.into_owned();
290 let key_space = self.key_space.clone().into();
291 let query_selector = match self.query_selector {
292 Some(s) => Some(s?.into_owned()),
293 None => None,
294 };
295 let query_target = self.query_target;
296 let query_consolidation = self.query_consolidation;
297 let query_accept_replies = self.query_accept_replies;
298 let query_timeout = self.query_timeout;
299 Ok(FetchingSubscriberBuilder {
300 session: self.session,
301 key_expr: Ok(key_expr.clone()),
302 key_space: self.key_space,
303 origin: self.origin,
304 fetch: move |cb| match key_space {
305 self::KeySpace::User => match query_selector {
306 Some(s) => session.get(s),
307 None => session.get(key_expr),
308 }
309 .callback(cb)
310 .target(query_target)
311 .consolidation(query_consolidation)
312 .accept_replies(query_accept_replies)
313 .timeout(query_timeout)
314 .wait(),
315 self::KeySpace::Liveliness => session
316 .liveliness()
317 .get(key_expr)
318 .callback(cb)
319 .timeout(query_timeout)
320 .wait(),
321 },
322 handler: self.handler,
323 phantom: std::marker::PhantomData,
324 })
325 }
326}
327
328#[zenoh_macros::unstable]
329#[allow(deprecated)]
330impl<KeySpace, Handler> Resolvable for QueryingSubscriberBuilder<'_, '_, KeySpace, Handler>
331where
332 Handler: IntoHandler<Sample>,
333 Handler::Handler: Send,
334{
335 type To = ZResult<FetchingSubscriber<Handler::Handler>>;
336}
337
338#[zenoh_macros::unstable]
339#[allow(deprecated)]
340impl<KeySpace, Handler> Wait for QueryingSubscriberBuilder<'_, '_, KeySpace, Handler>
341where
342 KeySpace: Into<self::KeySpace> + Clone,
343 Handler: IntoHandler<Sample> + Send,
344 Handler::Handler: Send,
345{
346 #[zenoh_macros::unstable]
347 fn wait(self) -> <Self as Resolvable>::To {
348 self.into_fetching_subscriber_builder()?.wait()
349 }
350}
351
352#[zenoh_macros::unstable]
353#[allow(deprecated)]
354impl<KeySpace, Handler> IntoFuture for QueryingSubscriberBuilder<'_, '_, KeySpace, Handler>
355where
356 KeySpace: Into<self::KeySpace> + Clone,
357 Handler: IntoHandler<Sample> + Send,
358 Handler::Handler: Send,
359{
360 type Output = <Self as Resolvable>::To;
361 type IntoFuture = Ready<<Self as Resolvable>::To>;
362
363 #[zenoh_macros::unstable]
364 fn into_future(self) -> Self::IntoFuture {
365 std::future::ready(self.wait())
366 }
367}
368
369#[zenoh_macros::unstable]
370#[allow(deprecated)]
371impl<KeySpace> Resolvable for QueryingSubscriberBuilder<'_, '_, KeySpace, Callback<Sample>, true> {
372 type To = ZResult<()>;
373}
374
375#[zenoh_macros::unstable]
376#[allow(deprecated)]
377impl<KeySpace> Wait for QueryingSubscriberBuilder<'_, '_, KeySpace, Callback<Sample>, true>
378where
379 KeySpace: Into<self::KeySpace> + Clone,
380{
381 #[zenoh_macros::unstable]
382 fn wait(self) -> <Self as Resolvable>::To {
383 self.into_fetching_subscriber_builder()?.wait()
384 }
385}
386
387#[zenoh_macros::unstable]
388#[allow(deprecated)]
389impl<KeySpace> IntoFuture for QueryingSubscriberBuilder<'_, '_, KeySpace, Callback<Sample>, true>
390where
391 KeySpace: Into<self::KeySpace> + Clone,
392{
393 type Output = <Self as Resolvable>::To;
394 type IntoFuture = Ready<<Self as Resolvable>::To>;
395
396 #[zenoh_macros::unstable]
397 fn into_future(self) -> Self::IntoFuture {
398 std::future::ready(self.wait())
399 }
400}
401
402struct MergeQueue {
407 untimestamped: VecDeque<Sample>,
408 timestamped: BTreeMap<Timestamp, Sample>,
409}
410
411impl MergeQueue {
412 fn new() -> Self {
413 MergeQueue {
414 untimestamped: VecDeque::new(),
415 timestamped: BTreeMap::new(),
416 }
417 }
418
419 fn len(&self) -> usize {
420 self.untimestamped.len() + self.timestamped.len()
421 }
422
423 fn push(&mut self, sample: Sample) {
424 if let Some(ts) = sample.timestamp() {
425 self.timestamped.entry(*ts).or_insert(sample);
426 } else {
427 self.untimestamped.push_back(sample);
428 }
429 }
430
431 fn drain(&mut self) -> MergeQueueValues {
432 let mut vec = VecDeque::new();
433 let mut queue = BTreeMap::new();
434 swap(&mut self.untimestamped, &mut vec);
435 swap(&mut self.timestamped, &mut queue);
436 MergeQueueValues {
437 untimestamped: vec,
438 timestamped: queue.into_values(),
439 }
440 }
441}
442
443struct MergeQueueValues {
444 untimestamped: VecDeque<Sample>,
445 timestamped: btree_map::IntoValues<Timestamp, Sample>,
446}
447
448impl Iterator for MergeQueueValues {
449 type Item = Sample;
450 fn next(&mut self) -> Option<Self::Item> {
451 self.untimestamped
452 .pop_front()
453 .or_else(|| self.timestamped.next())
454 }
455}
456
457struct InnerState {
458 pending_fetches: u64,
459 merge_queue: MergeQueue,
460}
461
462#[zenoh_macros::unstable]
464#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
465#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
466#[allow(deprecated)]
467pub struct FetchingSubscriberBuilder<
468 'a,
469 'b,
470 KeySpace,
471 Handler,
472 Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
473 TryIntoSample,
474 const BACKGROUND: bool = false,
475> where
476 TryIntoSample: ExtractSample,
477{
478 pub(crate) session: &'a Session,
479 pub(crate) key_expr: ZResult<KeyExpr<'b>>,
480 pub(crate) key_space: KeySpace,
481 pub(crate) origin: Locality,
482 pub(crate) fetch: Fetch,
483 pub(crate) handler: Handler,
484 pub(crate) phantom: std::marker::PhantomData<TryIntoSample>,
485}
486
487#[zenoh_macros::unstable]
488#[allow(deprecated)]
489impl<
490 'a,
491 KeySpace,
492 Handler,
493 Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
494 TryIntoSample,
495 const BACKGROUND: bool,
496 > FetchingSubscriberBuilder<'a, '_, KeySpace, Handler, Fetch, TryIntoSample, BACKGROUND>
497where
498 TryIntoSample: ExtractSample,
499{
500 #[zenoh_macros::unstable]
501 fn with_static_keys(
502 self,
503 ) -> FetchingSubscriberBuilder<'a, 'static, KeySpace, Handler, Fetch, TryIntoSample> {
504 FetchingSubscriberBuilder {
505 session: self.session,
506 key_expr: self.key_expr.map(|s| s.into_owned()),
507 key_space: self.key_space,
508 origin: self.origin,
509 fetch: self.fetch,
510 handler: self.handler,
511 phantom: std::marker::PhantomData,
512 }
513 }
514}
515
516#[zenoh_macros::unstable]
517#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
518#[allow(deprecated)]
519impl<
520 'a,
521 'b,
522 KeySpace,
523 Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
524 TryIntoSample,
525 > FetchingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandler, Fetch, TryIntoSample>
526where
527 TryIntoSample: ExtractSample,
528{
529 #[zenoh_macros::unstable]
531 #[inline]
532 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
533 pub fn callback<F>(
534 self,
535 callback: F,
536 ) -> FetchingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>, Fetch, TryIntoSample>
537 where
538 F: Fn(Sample) + Send + Sync + 'static,
539 {
540 self.with(Callback::from(callback))
541 }
542
543 #[zenoh_macros::unstable]
552 #[inline]
553 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
554 pub fn callback_mut<F>(
555 self,
556 callback: F,
557 ) -> FetchingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>, Fetch, TryIntoSample>
558 where
559 F: FnMut(Sample) + Send + Sync + 'static,
560 {
561 self.callback(locked(callback))
562 }
563
564 #[zenoh_macros::unstable]
566 #[inline]
567 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
568 pub fn with<Handler>(
569 self,
570 handler: Handler,
571 ) -> FetchingSubscriberBuilder<'a, 'b, KeySpace, Handler, Fetch, TryIntoSample>
572 where
573 Handler: IntoHandler<Sample>,
574 {
575 let FetchingSubscriberBuilder {
576 session,
577 key_expr,
578 key_space,
579 origin,
580 fetch,
581 handler: _,
582 phantom,
583 } = self;
584 FetchingSubscriberBuilder {
585 session,
586 key_expr,
587 key_space,
588 origin,
589 fetch,
590 handler,
591 phantom,
592 }
593 }
594}
595
596#[zenoh_macros::unstable]
597#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
598#[allow(deprecated)]
599impl<
600 'a,
601 'b,
602 KeySpace,
603 Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
604 TryIntoSample,
605 > FetchingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>, Fetch, TryIntoSample>
606where
607 TryIntoSample: ExtractSample,
608{
609 #[zenoh_macros::unstable]
613 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
614 pub fn background(
615 self,
616 ) -> FetchingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>, Fetch, TryIntoSample, true>
617 {
618 FetchingSubscriberBuilder {
619 session: self.session,
620 key_expr: self.key_expr,
621 key_space: self.key_space,
622 origin: self.origin,
623 fetch: self.fetch,
624 handler: self.handler,
625 phantom: self.phantom,
626 }
627 }
628}
629
630#[zenoh_macros::unstable]
631#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
632#[allow(deprecated)]
633impl<
634 Handler,
635 Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
636 TryIntoSample,
637 const BACKGROUND: bool,
638 > FetchingSubscriberBuilder<'_, '_, UserSpace, Handler, Fetch, TryIntoSample, BACKGROUND>
639where
640 TryIntoSample: ExtractSample,
641{
642 #[zenoh_macros::unstable]
644 #[inline]
645 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
646 pub fn allowed_origin(mut self, origin: Locality) -> Self {
647 self.origin = origin;
648 self
649 }
650}
651
652#[zenoh_macros::unstable]
653#[allow(deprecated)]
654impl<
655 KeySpace,
656 Handler,
657 Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
658 TryIntoSample,
659 > Resolvable for FetchingSubscriberBuilder<'_, '_, KeySpace, Handler, Fetch, TryIntoSample>
660where
661 Handler: IntoHandler<Sample>,
662 Handler::Handler: Send,
663 TryIntoSample: ExtractSample,
664{
665 type To = ZResult<FetchingSubscriber<Handler::Handler>>;
666}
667
668#[zenoh_macros::unstable]
669#[allow(deprecated)]
670impl<
671 KeySpace,
672 Handler,
673 Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
674 TryIntoSample,
675 > Wait for FetchingSubscriberBuilder<'_, '_, KeySpace, Handler, Fetch, TryIntoSample>
676where
677 KeySpace: Into<self::KeySpace>,
678 Handler: IntoHandler<Sample> + Send,
679 Handler::Handler: Send,
680 TryIntoSample: ExtractSample + Send + Sync,
681{
682 #[zenoh_macros::unstable]
683 fn wait(self) -> <Self as Resolvable>::To {
684 FetchingSubscriber::new(self.with_static_keys())
685 }
686}
687
688#[zenoh_macros::unstable]
689#[allow(deprecated)]
690impl<
691 KeySpace,
692 Handler,
693 Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
694 TryIntoSample,
695 > IntoFuture for FetchingSubscriberBuilder<'_, '_, KeySpace, Handler, Fetch, TryIntoSample>
696where
697 KeySpace: Into<self::KeySpace>,
698 Handler: IntoHandler<Sample> + Send,
699 Handler::Handler: Send,
700 TryIntoSample: ExtractSample + Send + Sync,
701{
702 type Output = <Self as Resolvable>::To;
703 type IntoFuture = Ready<<Self as Resolvable>::To>;
704
705 #[zenoh_macros::unstable]
706 fn into_future(self) -> Self::IntoFuture {
707 std::future::ready(self.wait())
708 }
709}
710
711#[zenoh_macros::unstable]
712#[allow(deprecated)]
713impl<
714 KeySpace,
715 Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
716 TryIntoSample,
717 > Resolvable
718 for FetchingSubscriberBuilder<'_, '_, KeySpace, Callback<Sample>, Fetch, TryIntoSample, true>
719where
720 TryIntoSample: ExtractSample,
721{
722 type To = ZResult<()>;
723}
724
725#[zenoh_macros::unstable]
726#[allow(deprecated)]
727impl<
728 KeySpace,
729 Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
730 TryIntoSample,
731 > Wait
732 for FetchingSubscriberBuilder<'_, '_, KeySpace, Callback<Sample>, Fetch, TryIntoSample, true>
733where
734 KeySpace: Into<self::KeySpace>,
735 TryIntoSample: ExtractSample + Send + Sync,
736{
737 #[zenoh_macros::unstable]
738 fn wait(self) -> <Self as Resolvable>::To {
739 FetchingSubscriber::new(self.with_static_keys())?
740 .subscriber
741 .set_background(true);
742 Ok(())
743 }
744}
745
746#[zenoh_macros::unstable]
747#[allow(deprecated)]
748impl<
749 KeySpace,
750 Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
751 TryIntoSample,
752 > IntoFuture
753 for FetchingSubscriberBuilder<'_, '_, KeySpace, Callback<Sample>, Fetch, TryIntoSample, true>
754where
755 KeySpace: Into<self::KeySpace>,
756 TryIntoSample: ExtractSample + Send + Sync,
757{
758 type Output = <Self as Resolvable>::To;
759 type IntoFuture = Ready<<Self as Resolvable>::To>;
760
761 #[zenoh_macros::unstable]
762 fn into_future(self) -> Self::IntoFuture {
763 std::future::ready(self.wait())
764 }
765}
766
767#[zenoh_macros::unstable]
799#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
800pub struct FetchingSubscriber<Handler> {
801 subscriber: Subscriber<()>,
802 callback: Callback<Sample>,
803 state: Arc<Mutex<InnerState>>,
804 handler: Handler,
805}
806
807#[zenoh_macros::unstable]
808#[allow(deprecated)]
809impl<Handler> std::ops::Deref for FetchingSubscriber<Handler> {
810 type Target = Handler;
811 #[zenoh_macros::unstable]
812 fn deref(&self) -> &Self::Target {
813 &self.handler
814 }
815}
816
817#[zenoh_macros::unstable]
818#[allow(deprecated)]
819impl<Handler> std::ops::DerefMut for FetchingSubscriber<Handler> {
820 #[zenoh_macros::unstable]
821 fn deref_mut(&mut self) -> &mut Self::Target {
822 &mut self.handler
823 }
824}
825
826#[zenoh_macros::unstable]
827#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
828#[allow(deprecated)]
829impl<Handler> FetchingSubscriber<Handler> {
830 fn new<
831 'a,
832 KeySpace,
833 InputHandler,
834 Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
835 TryIntoSample,
836 >(
837 conf: FetchingSubscriberBuilder<'a, 'a, KeySpace, InputHandler, Fetch, TryIntoSample>,
838 ) -> ZResult<Self>
839 where
840 KeySpace: Into<self::KeySpace>,
841 InputHandler: IntoHandler<Sample, Handler = Handler> + Send,
842 TryIntoSample: ExtractSample + Send + Sync,
843 {
844 let session_id = conf.session.zid();
845
846 let state = Arc::new(Mutex::new(InnerState {
847 pending_fetches: 0,
848 merge_queue: MergeQueue::new(),
849 }));
850 let (callback, receiver) = conf.handler.into_handler();
851
852 let sub_callback = {
853 let state = state.clone();
854 let callback = callback.clone();
855 move |s| {
856 let state = &mut zlock!(state);
857 if state.pending_fetches == 0 {
858 callback.call(s);
859 } else {
860 tracing::trace!(
861 "Sample received while fetch in progress: push it to merge_queue"
862 );
863
864 let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().into(); let timestamp = s
868 .timestamp()
869 .cloned()
870 .unwrap_or(Timestamp::new(now, session_id.into()));
871 state
872 .merge_queue
873 .push(SampleBuilder::from(s).timestamp(timestamp).into());
874 }
875 }
876 };
877
878 let key_expr = conf.key_expr?;
879
880 let handler = register_handler(state.clone(), callback.clone());
882 let subscriber = match conf.key_space.into() {
884 self::KeySpace::User => conf
885 .session
886 .declare_subscriber(&key_expr)
887 .callback(sub_callback)
888 .allowed_origin(conf.origin)
889 .wait()?,
890 self::KeySpace::Liveliness => conf
891 .session
892 .liveliness()
893 .declare_subscriber(&key_expr)
894 .callback(sub_callback)
895 .wait()?,
896 };
897
898 let fetch_subscriber = FetchingSubscriber {
899 subscriber,
900 callback,
901 state,
902 handler: receiver,
903 };
904
905 run_fetch(conf.fetch, handler)?;
907
908 Ok(fetch_subscriber)
909 }
910
911 #[zenoh_macros::unstable]
913 #[inline]
914 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
915 pub fn undeclare(self) -> impl Resolve<ZResult<()>> {
916 self.subscriber.undeclare()
917 }
918
919 #[zenoh_macros::unstable]
920 #[zenoh_macros::internal]
921 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
922 pub fn set_background(&mut self, background: bool) {
923 self.subscriber.set_background(background)
924 }
925
926 #[zenoh_macros::unstable]
928 #[inline]
929 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
930 pub fn key_expr(&self) -> &KeyExpr<'static> {
931 self.subscriber.key_expr()
932 }
933
934 #[zenoh_macros::unstable]
971 #[inline]
972 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
973 pub fn fetch<
974 Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
975 TryIntoSample,
976 >(
977 &self,
978 fetch: Fetch,
979 ) -> impl Resolve<ZResult<()>>
980 where
981 TryIntoSample: ExtractSample + Send + Sync,
982 {
983 FetchBuilder {
984 fetch,
985 phantom: std::marker::PhantomData,
986 state: self.state.clone(),
987 callback: self.callback.clone(),
988 }
989 }
990}
991
992struct RepliesHandler {
993 state: Arc<Mutex<InnerState>>,
994 callback: Callback<Sample>,
995}
996
997impl Drop for RepliesHandler {
998 fn drop(&mut self) {
999 let mut state = zlock!(self.state);
1000 state.pending_fetches -= 1;
1001 tracing::trace!(
1002 "Fetch done - {} fetches still in progress",
1003 state.pending_fetches
1004 );
1005 if state.pending_fetches == 0 {
1006 tracing::debug!(
1007 "All fetches done. Replies and live publications merged - {} samples to propagate",
1008 state.merge_queue.len()
1009 );
1010 for s in state.merge_queue.drain() {
1011 self.callback.call(s);
1012 }
1013 }
1014 }
1015}
1016
1017#[zenoh_macros::unstable]
1050#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
1051#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
1052#[allow(deprecated)]
1053pub struct FetchBuilder<
1054 Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
1055 TryIntoSample,
1056> where
1057 TryIntoSample: ExtractSample,
1058{
1059 fetch: Fetch,
1060 phantom: std::marker::PhantomData<TryIntoSample>,
1061 state: Arc<Mutex<InnerState>>,
1062 callback: Callback<Sample>,
1063}
1064
1065#[zenoh_macros::unstable]
1066#[allow(deprecated)]
1067impl<Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>, TryIntoSample>
1068 Resolvable for FetchBuilder<Fetch, TryIntoSample>
1069where
1070 TryIntoSample: ExtractSample,
1071{
1072 type To = ZResult<()>;
1073}
1074
1075#[zenoh_macros::unstable]
1076#[allow(deprecated)]
1077impl<Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>, TryIntoSample> Wait
1078 for FetchBuilder<Fetch, TryIntoSample>
1079where
1080 TryIntoSample: ExtractSample,
1081{
1082 #[zenoh_macros::unstable]
1083 fn wait(self) -> <Self as Resolvable>::To {
1084 let handler = register_handler(self.state, self.callback);
1085 run_fetch(self.fetch, handler)
1086 }
1087}
1088
1089#[zenoh_macros::unstable]
1090#[allow(deprecated)]
1091impl<Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>, TryIntoSample>
1092 IntoFuture for FetchBuilder<Fetch, TryIntoSample>
1093where
1094 TryIntoSample: ExtractSample,
1095{
1096 type Output = <Self as Resolvable>::To;
1097 type IntoFuture = Ready<<Self as Resolvable>::To>;
1098
1099 #[zenoh_macros::unstable]
1100 fn into_future(self) -> Self::IntoFuture {
1101 std::future::ready(self.wait())
1102 }
1103}
1104
1105fn register_handler(state: Arc<Mutex<InnerState>>, callback: Callback<Sample>) -> RepliesHandler {
1106 zlock!(state).pending_fetches += 1;
1107 RepliesHandler { state, callback }
1109}
1110
1111#[zenoh_macros::unstable]
1112#[allow(deprecated)]
1113fn run_fetch<
1114 Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
1115 TryIntoSample,
1116>(
1117 fetch: Fetch,
1118 handler: RepliesHandler,
1119) -> ZResult<()>
1120where
1121 TryIntoSample: ExtractSample,
1122{
1123 tracing::debug!("Fetch data for FetchingSubscriber");
1124 (fetch)(Box::new(move |s: TryIntoSample| match s.extract() {
1125 Ok(s) => {
1126 let mut state = zlock!(handler.state);
1127 tracing::trace!("Fetched sample received: push it to merge_queue");
1128 state.merge_queue.push(s);
1129 }
1130 Err(e) => tracing::debug!("Received error fetching data: {}", e),
1131 }))
1132}
1133
1134#[zenoh_macros::unstable]
1136#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
1137#[allow(deprecated)]
1138pub trait ExtractSample {
1139 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
1140 fn extract(self) -> ZResult<Sample>;
1141}
1142
1143#[allow(deprecated)]
1144impl ExtractSample for Reply {
1145 fn extract(self) -> ZResult<Sample> {
1146 self.into_result().map_err(|e| zerror!("{:?}", e).into())
1147 }
1148}