1use std::{
15 collections::{btree_map, BTreeMap, VecDeque},
16 convert::TryInto,
17 future::{IntoFuture, Ready},
18 mem::swap,
19 sync::{Arc, Mutex},
20 time::{Duration, SystemTime, UNIX_EPOCH},
21};
22
23use zenoh::{
24 handlers::{locked, Callback, DefaultHandler, IntoHandler},
25 internal::{zerror, zlock},
26 key_expr::KeyExpr,
27 pubsub::Subscriber,
28 query::{QueryConsolidation, QueryTarget, Reply, ReplyKeyExpr, Selector},
29 sample::{Locality, Sample, SampleBuilder},
30 time::Timestamp,
31 Error, Resolvable, Resolve, Result as ZResult, Session, Wait,
32};
33
34#[zenoh_macros::unstable]
36#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
37pub enum KeySpace {
38 User,
39 Liveliness,
40}
41
42#[zenoh_macros::unstable]
44#[non_exhaustive]
45#[derive(Debug, Clone, Copy)]
46#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
47pub struct UserSpace;
48
49#[allow(deprecated)]
50impl From<UserSpace> for KeySpace {
51 fn from(_: UserSpace) -> Self {
52 KeySpace::User
53 }
54}
55
56#[zenoh_macros::unstable]
58#[non_exhaustive]
59#[derive(Debug, Clone, Copy)]
60#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
61pub struct LivelinessSpace;
62
63#[zenoh_macros::unstable]
64#[allow(deprecated)]
65impl From<LivelinessSpace> for KeySpace {
66 #[zenoh_macros::unstable]
67 fn from(_: LivelinessSpace) -> Self {
68 KeySpace::Liveliness
69 }
70}
71
72#[zenoh_macros::unstable]
74#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
75#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
76pub struct QueryingSubscriberBuilder<'a, 'b, KeySpace, Handler, const BACKGROUND: bool = false> {
77 pub(crate) session: &'a Session,
78 pub(crate) key_expr: ZResult<KeyExpr<'b>>,
79 pub(crate) key_space: KeySpace,
80 pub(crate) origin: Locality,
81 pub(crate) query_selector: Option<ZResult<Selector<'b>>>,
82 pub(crate) query_target: QueryTarget,
83 pub(crate) query_consolidation: QueryConsolidation,
84 pub(crate) query_accept_replies: ReplyKeyExpr,
85 pub(crate) query_timeout: Duration,
86 pub(crate) handler: Handler,
87}
88
89#[zenoh_macros::unstable]
90#[allow(deprecated)]
91impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandler> {
92 #[zenoh_macros::unstable]
94 #[inline]
95 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
96 pub fn callback<F>(
97 self,
98 callback: F,
99 ) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>>
100 where
101 F: Fn(Sample) + Send + Sync + 'static,
102 {
103 self.with(Callback::from(callback))
104 }
105
106 #[zenoh_macros::unstable]
115 #[inline]
116 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
117 pub fn callback_mut<F>(
118 self,
119 callback: F,
120 ) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>>
121 where
122 F: FnMut(Sample) + Send + Sync + 'static,
123 {
124 self.callback(locked(callback))
125 }
126
127 #[zenoh_macros::unstable]
129 #[inline]
130 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
131 pub fn with<Handler>(
132 self,
133 handler: Handler,
134 ) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, Handler>
135 where
136 Handler: IntoHandler<Sample>,
137 {
138 let QueryingSubscriberBuilder {
139 session,
140 key_expr,
141 key_space,
142 origin,
143 query_selector,
144 query_target,
145 query_consolidation,
146 query_accept_replies,
147 query_timeout,
148 handler: _,
149 } = self;
150 QueryingSubscriberBuilder {
151 session,
152 key_expr,
153 key_space,
154 origin,
155 query_selector,
156 query_target,
157 query_consolidation,
158 query_accept_replies,
159 query_timeout,
160 handler,
161 }
162 }
163}
164
165#[zenoh_macros::unstable]
166#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
167#[allow(deprecated)]
168impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>> {
169 #[zenoh_macros::unstable]
173 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
174 pub fn background(self) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>, true> {
175 QueryingSubscriberBuilder {
176 session: self.session,
177 key_expr: self.key_expr,
178 key_space: self.key_space,
179 origin: self.origin,
180 query_selector: self.query_selector,
181 query_target: self.query_target,
182 query_consolidation: self.query_consolidation,
183 query_accept_replies: self.query_accept_replies,
184 query_timeout: self.query_timeout,
185 handler: self.handler,
186 }
187 }
188}
189
190#[zenoh_macros::unstable]
191#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
192#[allow(deprecated)]
193impl<'b, Handler, const BACKGROUND: bool>
194 QueryingSubscriberBuilder<'_, 'b, UserSpace, Handler, BACKGROUND>
195{
196 #[zenoh_macros::unstable]
201 #[inline]
202 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
203 pub fn allowed_origin(mut self, origin: Locality) -> Self {
204 self.origin = origin;
205 self
206 }
207
208 #[zenoh_macros::unstable]
210 #[inline]
211 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
212 pub fn query_selector<IntoSelector>(mut self, query_selector: IntoSelector) -> Self
213 where
214 IntoSelector: TryInto<Selector<'b>>,
215 <IntoSelector as TryInto<Selector<'b>>>::Error: Into<Error>,
216 {
217 self.query_selector = Some(query_selector.try_into().map_err(Into::into));
218 self
219 }
220
221 #[zenoh_macros::unstable]
223 #[inline]
224 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
225 pub fn query_target(mut self, query_target: QueryTarget) -> Self {
226 self.query_target = query_target;
227 self
228 }
229
230 #[zenoh_macros::unstable]
232 #[inline]
233 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
234 pub fn query_consolidation<QC: Into<QueryConsolidation>>(
235 mut self,
236 query_consolidation: QC,
237 ) -> Self {
238 self.query_consolidation = query_consolidation.into();
239 self
240 }
241
242 #[zenoh_macros::unstable]
244 #[inline]
245 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
246 pub fn query_accept_replies(mut self, accept_replies: ReplyKeyExpr) -> Self {
247 self.query_accept_replies = accept_replies;
248 self
249 }
250}
251
252#[zenoh_macros::unstable]
253#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
254#[allow(deprecated)]
255impl<'a, 'b, KeySpace, Handler, const BACKGROUND: bool>
256 QueryingSubscriberBuilder<'a, 'b, KeySpace, Handler, BACKGROUND>
257{
258 #[zenoh_macros::unstable]
260 #[inline]
261 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
262 pub fn query_timeout(mut self, query_timeout: Duration) -> Self {
263 self.query_timeout = query_timeout;
264 self
265 }
266
267 #[zenoh_macros::unstable]
268 #[allow(clippy::type_complexity)]
269 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
270 fn into_fetching_subscriber_builder(
271 self,
272 ) -> ZResult<
273 FetchingSubscriberBuilder<
274 'a,
275 'b,
276 KeySpace,
277 Handler,
278 impl FnOnce(Box<dyn Fn(Reply) + Send + Sync>) -> ZResult<()>,
279 Reply,
280 BACKGROUND,
281 >,
282 >
283 where
284 KeySpace: Into<self::KeySpace> + Clone,
285 Handler: IntoHandler<Sample>,
286 Handler::Handler: Send,
287 {
288 let session = self.session.clone();
289 let key_expr = self.key_expr?.into_owned();
290 let key_space = self.key_space.clone().into();
291 let query_selector = match self.query_selector {
292 Some(s) => Some(s?.into_owned()),
293 None => None,
294 };
295 let query_target = self.query_target;
296 let query_consolidation = self.query_consolidation;
297 let query_accept_replies = self.query_accept_replies;
298 let query_timeout = self.query_timeout;
299 Ok(FetchingSubscriberBuilder {
300 session: self.session,
301 key_expr: Ok(key_expr.clone()),
302 key_space: self.key_space,
303 origin: self.origin,
304 fetch: move |cb| match key_space {
305 self::KeySpace::User => match query_selector {
306 Some(s) => session.get(s),
307 None => session.get(key_expr),
308 }
309 .callback(cb)
310 .target(query_target)
311 .consolidation(query_consolidation)
312 .accept_replies(query_accept_replies)
313 .timeout(query_timeout)
314 .wait(),
315 self::KeySpace::Liveliness => session
316 .liveliness()
317 .get(key_expr)
318 .callback(cb)
319 .timeout(query_timeout)
320 .wait(),
321 },
322 handler: self.handler,
323 phantom: std::marker::PhantomData,
324 })
325 }
326}
327
328#[zenoh_macros::unstable]
329#[allow(deprecated)]
330impl<KeySpace, Handler> Resolvable for QueryingSubscriberBuilder<'_, '_, KeySpace, Handler>
331where
332 Handler: IntoHandler<Sample>,
333 Handler::Handler: Send,
334{
335 type To = ZResult<FetchingSubscriber<Handler::Handler>>;
336}
337
338#[zenoh_macros::unstable]
339#[allow(deprecated)]
340impl<KeySpace, Handler> Wait for QueryingSubscriberBuilder<'_, '_, KeySpace, Handler>
341where
342 KeySpace: Into<self::KeySpace> + Clone,
343 Handler: IntoHandler<Sample> + Send,
344 Handler::Handler: Send,
345{
346 #[zenoh_macros::unstable]
347 fn wait(self) -> <Self as Resolvable>::To {
348 self.into_fetching_subscriber_builder()?.wait()
349 }
350}
351
352#[zenoh_macros::unstable]
353#[allow(deprecated)]
354impl<KeySpace, Handler> IntoFuture for QueryingSubscriberBuilder<'_, '_, KeySpace, Handler>
355where
356 KeySpace: Into<self::KeySpace> + Clone,
357 Handler: IntoHandler<Sample> + Send,
358 Handler::Handler: Send,
359{
360 type Output = <Self as Resolvable>::To;
361 type IntoFuture = Ready<<Self as Resolvable>::To>;
362
363 #[zenoh_macros::unstable]
364 fn into_future(self) -> Self::IntoFuture {
365 std::future::ready(self.wait())
366 }
367}
368
369#[zenoh_macros::unstable]
370#[allow(deprecated)]
371impl<KeySpace> Resolvable for QueryingSubscriberBuilder<'_, '_, KeySpace, Callback<Sample>, true> {
372 type To = ZResult<()>;
373}
374
375#[zenoh_macros::unstable]
376#[allow(deprecated)]
377impl<KeySpace> Wait for QueryingSubscriberBuilder<'_, '_, KeySpace, Callback<Sample>, true>
378where
379 KeySpace: Into<self::KeySpace> + Clone,
380{
381 #[zenoh_macros::unstable]
382 fn wait(self) -> <Self as Resolvable>::To {
383 self.into_fetching_subscriber_builder()?.wait()
384 }
385}
386
387#[zenoh_macros::unstable]
388#[allow(deprecated)]
389impl<KeySpace> IntoFuture for QueryingSubscriberBuilder<'_, '_, KeySpace, Callback<Sample>, true>
390where
391 KeySpace: Into<self::KeySpace> + Clone,
392{
393 type Output = <Self as Resolvable>::To;
394 type IntoFuture = Ready<<Self as Resolvable>::To>;
395
396 #[zenoh_macros::unstable]
397 fn into_future(self) -> Self::IntoFuture {
398 std::future::ready(self.wait())
399 }
400}
401
402struct MergeQueue {
407 untimestamped: VecDeque<Sample>,
408 timestamped: BTreeMap<Timestamp, Sample>,
409}
410
411impl MergeQueue {
412 fn new() -> Self {
413 MergeQueue {
414 untimestamped: VecDeque::new(),
415 timestamped: BTreeMap::new(),
416 }
417 }
418
419 fn len(&self) -> usize {
420 self.untimestamped.len() + self.timestamped.len()
421 }
422
423 fn push(&mut self, sample: Sample) {
424 if let Some(ts) = sample.timestamp() {
425 self.timestamped.entry(*ts).or_insert(sample);
426 } else {
427 self.untimestamped.push_back(sample);
428 }
429 }
430
431 fn drain(&mut self) -> MergeQueueValues {
432 let mut vec = VecDeque::new();
433 let mut queue = BTreeMap::new();
434 swap(&mut self.untimestamped, &mut vec);
435 swap(&mut self.timestamped, &mut queue);
436 MergeQueueValues {
437 untimestamped: vec,
438 timestamped: queue.into_values(),
439 }
440 }
441}
442
443struct MergeQueueValues {
444 untimestamped: VecDeque<Sample>,
445 timestamped: btree_map::IntoValues<Timestamp, Sample>,
446}
447
448impl Iterator for MergeQueueValues {
449 type Item = Sample;
450 fn next(&mut self) -> Option<Self::Item> {
451 self.untimestamped
452 .pop_front()
453 .or_else(|| self.timestamped.next())
454 }
455}
456
457struct InnerState {
458 pending_fetches: u64,
459 merge_queue: MergeQueue,
460}
461
462#[zenoh_macros::unstable]
464#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
465#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
466#[allow(deprecated)]
467pub struct FetchingSubscriberBuilder<
468 'a,
469 'b,
470 KeySpace,
471 Handler,
472 Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
473 TryIntoSample,
474 const BACKGROUND: bool = false,
475> where
476 TryIntoSample: ExtractSample,
477{
478 pub(crate) session: &'a Session,
479 pub(crate) key_expr: ZResult<KeyExpr<'b>>,
480 pub(crate) key_space: KeySpace,
481 pub(crate) origin: Locality,
482 pub(crate) fetch: Fetch,
483 pub(crate) handler: Handler,
484 pub(crate) phantom: std::marker::PhantomData<TryIntoSample>,
485}
486
487#[zenoh_macros::unstable]
488#[allow(deprecated)]
489impl<
490 'a,
491 KeySpace,
492 Handler,
493 Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
494 TryIntoSample,
495 const BACKGROUND: bool,
496 > FetchingSubscriberBuilder<'a, '_, KeySpace, Handler, Fetch, TryIntoSample, BACKGROUND>
497where
498 TryIntoSample: ExtractSample,
499{
500 #[zenoh_macros::unstable]
501 fn with_static_keys(
502 self,
503 ) -> FetchingSubscriberBuilder<'a, 'static, KeySpace, Handler, Fetch, TryIntoSample> {
504 FetchingSubscriberBuilder {
505 session: self.session,
506 key_expr: self.key_expr.map(|s| s.into_owned()),
507 key_space: self.key_space,
508 origin: self.origin,
509 fetch: self.fetch,
510 handler: self.handler,
511 phantom: std::marker::PhantomData,
512 }
513 }
514}
515
516#[zenoh_macros::unstable]
517#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
518#[allow(deprecated)]
519impl<
520 'a,
521 'b,
522 KeySpace,
523 Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
524 TryIntoSample,
525 > FetchingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandler, Fetch, TryIntoSample>
526where
527 TryIntoSample: ExtractSample,
528{
529 #[zenoh_macros::unstable]
531 #[inline]
532 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
533 pub fn callback<F>(
534 self,
535 callback: F,
536 ) -> FetchingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>, Fetch, TryIntoSample>
537 where
538 F: Fn(Sample) + Send + Sync + 'static,
539 {
540 self.with(Callback::from(callback))
541 }
542
543 #[zenoh_macros::unstable]
552 #[inline]
553 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
554 pub fn callback_mut<F>(
555 self,
556 callback: F,
557 ) -> FetchingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>, Fetch, TryIntoSample>
558 where
559 F: FnMut(Sample) + Send + Sync + 'static,
560 {
561 self.callback(locked(callback))
562 }
563
564 #[zenoh_macros::unstable]
566 #[inline]
567 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
568 pub fn with<Handler>(
569 self,
570 handler: Handler,
571 ) -> FetchingSubscriberBuilder<'a, 'b, KeySpace, Handler, Fetch, TryIntoSample>
572 where
573 Handler: IntoHandler<Sample>,
574 {
575 let FetchingSubscriberBuilder {
576 session,
577 key_expr,
578 key_space,
579 origin,
580 fetch,
581 handler: _,
582 phantom,
583 } = self;
584 FetchingSubscriberBuilder {
585 session,
586 key_expr,
587 key_space,
588 origin,
589 fetch,
590 handler,
591 phantom,
592 }
593 }
594}
595
596#[zenoh_macros::unstable]
597#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
598#[allow(deprecated)]
599impl<
600 'a,
601 'b,
602 KeySpace,
603 Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
604 TryIntoSample,
605 > FetchingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>, Fetch, TryIntoSample>
606where
607 TryIntoSample: ExtractSample,
608{
609 #[zenoh_macros::unstable]
613 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
614 pub fn background(
615 self,
616 ) -> FetchingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>, Fetch, TryIntoSample, true>
617 {
618 FetchingSubscriberBuilder {
619 session: self.session,
620 key_expr: self.key_expr,
621 key_space: self.key_space,
622 origin: self.origin,
623 fetch: self.fetch,
624 handler: self.handler,
625 phantom: self.phantom,
626 }
627 }
628}
629
630#[zenoh_macros::unstable]
631#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
632#[allow(deprecated)]
633impl<
634 Handler,
635 Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
636 TryIntoSample,
637 const BACKGROUND: bool,
638 > FetchingSubscriberBuilder<'_, '_, UserSpace, Handler, Fetch, TryIntoSample, BACKGROUND>
639where
640 TryIntoSample: ExtractSample,
641{
642 #[zenoh_macros::unstable]
645 #[inline]
646 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
647 pub fn allowed_origin(mut self, origin: Locality) -> Self {
648 self.origin = origin;
649 self
650 }
651}
652
653#[zenoh_macros::unstable]
654#[allow(deprecated)]
655impl<
656 KeySpace,
657 Handler,
658 Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
659 TryIntoSample,
660 > Resolvable for FetchingSubscriberBuilder<'_, '_, KeySpace, Handler, Fetch, TryIntoSample>
661where
662 Handler: IntoHandler<Sample>,
663 Handler::Handler: Send,
664 TryIntoSample: ExtractSample,
665{
666 type To = ZResult<FetchingSubscriber<Handler::Handler>>;
667}
668
669#[zenoh_macros::unstable]
670#[allow(deprecated)]
671impl<
672 KeySpace,
673 Handler,
674 Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
675 TryIntoSample,
676 > Wait for FetchingSubscriberBuilder<'_, '_, KeySpace, Handler, Fetch, TryIntoSample>
677where
678 KeySpace: Into<self::KeySpace>,
679 Handler: IntoHandler<Sample> + Send,
680 Handler::Handler: Send,
681 TryIntoSample: ExtractSample + Send + Sync,
682{
683 #[zenoh_macros::unstable]
684 fn wait(self) -> <Self as Resolvable>::To {
685 FetchingSubscriber::new(self.with_static_keys())
686 }
687}
688
689#[zenoh_macros::unstable]
690#[allow(deprecated)]
691impl<
692 KeySpace,
693 Handler,
694 Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
695 TryIntoSample,
696 > IntoFuture for FetchingSubscriberBuilder<'_, '_, KeySpace, Handler, Fetch, TryIntoSample>
697where
698 KeySpace: Into<self::KeySpace>,
699 Handler: IntoHandler<Sample> + Send,
700 Handler::Handler: Send,
701 TryIntoSample: ExtractSample + Send + Sync,
702{
703 type Output = <Self as Resolvable>::To;
704 type IntoFuture = Ready<<Self as Resolvable>::To>;
705
706 #[zenoh_macros::unstable]
707 fn into_future(self) -> Self::IntoFuture {
708 std::future::ready(self.wait())
709 }
710}
711
712#[zenoh_macros::unstable]
713#[allow(deprecated)]
714impl<
715 KeySpace,
716 Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
717 TryIntoSample,
718 > Resolvable
719 for FetchingSubscriberBuilder<'_, '_, KeySpace, Callback<Sample>, Fetch, TryIntoSample, true>
720where
721 TryIntoSample: ExtractSample,
722{
723 type To = ZResult<()>;
724}
725
726#[zenoh_macros::unstable]
727#[allow(deprecated)]
728impl<
729 KeySpace,
730 Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
731 TryIntoSample,
732 > Wait
733 for FetchingSubscriberBuilder<'_, '_, KeySpace, Callback<Sample>, Fetch, TryIntoSample, true>
734where
735 KeySpace: Into<self::KeySpace>,
736 TryIntoSample: ExtractSample + Send + Sync,
737{
738 #[zenoh_macros::unstable]
739 fn wait(self) -> <Self as Resolvable>::To {
740 FetchingSubscriber::new(self.with_static_keys())?
741 .subscriber
742 .set_background(true);
743 Ok(())
744 }
745}
746
747#[zenoh_macros::unstable]
748#[allow(deprecated)]
749impl<
750 KeySpace,
751 Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
752 TryIntoSample,
753 > IntoFuture
754 for FetchingSubscriberBuilder<'_, '_, KeySpace, Callback<Sample>, Fetch, TryIntoSample, true>
755where
756 KeySpace: Into<self::KeySpace>,
757 TryIntoSample: ExtractSample + Send + Sync,
758{
759 type Output = <Self as Resolvable>::To;
760 type IntoFuture = Ready<<Self as Resolvable>::To>;
761
762 #[zenoh_macros::unstable]
763 fn into_future(self) -> Self::IntoFuture {
764 std::future::ready(self.wait())
765 }
766}
767
768#[zenoh_macros::unstable]
800#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
801pub struct FetchingSubscriber<Handler> {
802 subscriber: Subscriber<()>,
803 callback: Callback<Sample>,
804 state: Arc<Mutex<InnerState>>,
805 handler: Handler,
806}
807
808#[zenoh_macros::unstable]
809#[allow(deprecated)]
810impl<Handler> std::ops::Deref for FetchingSubscriber<Handler> {
811 type Target = Handler;
812 #[zenoh_macros::unstable]
813 fn deref(&self) -> &Self::Target {
814 &self.handler
815 }
816}
817
818#[zenoh_macros::unstable]
819#[allow(deprecated)]
820impl<Handler> std::ops::DerefMut for FetchingSubscriber<Handler> {
821 #[zenoh_macros::unstable]
822 fn deref_mut(&mut self) -> &mut Self::Target {
823 &mut self.handler
824 }
825}
826
827#[zenoh_macros::unstable]
828#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
829#[allow(deprecated)]
830impl<Handler> FetchingSubscriber<Handler> {
831 fn new<
832 'a,
833 KeySpace,
834 InputHandler,
835 Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
836 TryIntoSample,
837 >(
838 conf: FetchingSubscriberBuilder<'a, 'a, KeySpace, InputHandler, Fetch, TryIntoSample>,
839 ) -> ZResult<Self>
840 where
841 KeySpace: Into<self::KeySpace>,
842 InputHandler: IntoHandler<Sample, Handler = Handler> + Send,
843 TryIntoSample: ExtractSample + Send + Sync,
844 {
845 let session_id = conf.session.zid();
846
847 let state = Arc::new(Mutex::new(InnerState {
848 pending_fetches: 0,
849 merge_queue: MergeQueue::new(),
850 }));
851 let (callback, receiver) = conf.handler.into_handler();
852
853 let sub_callback = {
854 let state = state.clone();
855 let callback = callback.clone();
856 move |s| {
857 let state = &mut zlock!(state);
858 if state.pending_fetches == 0 {
859 callback.call(s);
860 } else {
861 tracing::trace!(
862 "Sample received while fetch in progress: push it to merge_queue"
863 );
864
865 let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().into(); let timestamp = s
869 .timestamp()
870 .cloned()
871 .unwrap_or(Timestamp::new(now, session_id.into()));
872 state
873 .merge_queue
874 .push(SampleBuilder::from(s).timestamp(timestamp).into());
875 }
876 }
877 };
878
879 let key_expr = conf.key_expr?;
880
881 let handler = register_handler(state.clone(), callback.clone());
883 let subscriber = match conf.key_space.into() {
885 self::KeySpace::User => conf
886 .session
887 .declare_subscriber(&key_expr)
888 .callback(sub_callback)
889 .allowed_origin(conf.origin)
890 .wait()?,
891 self::KeySpace::Liveliness => conf
892 .session
893 .liveliness()
894 .declare_subscriber(&key_expr)
895 .callback(sub_callback)
896 .wait()?,
897 };
898
899 let fetch_subscriber = FetchingSubscriber {
900 subscriber,
901 callback,
902 state,
903 handler: receiver,
904 };
905
906 run_fetch(conf.fetch, handler)?;
908
909 Ok(fetch_subscriber)
910 }
911
912 #[zenoh_macros::unstable]
914 #[inline]
915 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
916 pub fn undeclare(self) -> impl Resolve<ZResult<()>> {
917 self.subscriber.undeclare()
918 }
919
920 #[zenoh_macros::unstable]
921 #[zenoh_macros::internal]
922 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
923 pub fn set_background(&mut self, background: bool) {
924 self.subscriber.set_background(background)
925 }
926
927 #[zenoh_macros::unstable]
929 #[inline]
930 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
931 pub fn key_expr(&self) -> &KeyExpr<'static> {
932 self.subscriber.key_expr()
933 }
934
935 #[zenoh_macros::unstable]
972 #[inline]
973 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
974 pub fn fetch<
975 Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
976 TryIntoSample,
977 >(
978 &self,
979 fetch: Fetch,
980 ) -> impl Resolve<ZResult<()>>
981 where
982 TryIntoSample: ExtractSample + Send + Sync,
983 {
984 FetchBuilder {
985 fetch,
986 phantom: std::marker::PhantomData,
987 state: self.state.clone(),
988 callback: self.callback.clone(),
989 }
990 }
991}
992
993struct RepliesHandler {
994 state: Arc<Mutex<InnerState>>,
995 callback: Callback<Sample>,
996}
997
998impl Drop for RepliesHandler {
999 fn drop(&mut self) {
1000 let mut state = zlock!(self.state);
1001 state.pending_fetches -= 1;
1002 tracing::trace!(
1003 "Fetch done - {} fetches still in progress",
1004 state.pending_fetches
1005 );
1006 if state.pending_fetches == 0 {
1007 tracing::debug!(
1008 "All fetches done. Replies and live publications merged - {} samples to propagate",
1009 state.merge_queue.len()
1010 );
1011 for s in state.merge_queue.drain() {
1012 self.callback.call(s);
1013 }
1014 }
1015 }
1016}
1017
1018#[zenoh_macros::unstable]
1051#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
1052#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
1053#[allow(deprecated)]
1054pub struct FetchBuilder<
1055 Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
1056 TryIntoSample,
1057> where
1058 TryIntoSample: ExtractSample,
1059{
1060 fetch: Fetch,
1061 phantom: std::marker::PhantomData<TryIntoSample>,
1062 state: Arc<Mutex<InnerState>>,
1063 callback: Callback<Sample>,
1064}
1065
1066#[zenoh_macros::unstable]
1067#[allow(deprecated)]
1068impl<Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>, TryIntoSample>
1069 Resolvable for FetchBuilder<Fetch, TryIntoSample>
1070where
1071 TryIntoSample: ExtractSample,
1072{
1073 type To = ZResult<()>;
1074}
1075
1076#[zenoh_macros::unstable]
1077#[allow(deprecated)]
1078impl<Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>, TryIntoSample> Wait
1079 for FetchBuilder<Fetch, TryIntoSample>
1080where
1081 TryIntoSample: ExtractSample,
1082{
1083 #[zenoh_macros::unstable]
1084 fn wait(self) -> <Self as Resolvable>::To {
1085 let handler = register_handler(self.state, self.callback);
1086 run_fetch(self.fetch, handler)
1087 }
1088}
1089
1090#[zenoh_macros::unstable]
1091#[allow(deprecated)]
1092impl<Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>, TryIntoSample>
1093 IntoFuture for FetchBuilder<Fetch, TryIntoSample>
1094where
1095 TryIntoSample: ExtractSample,
1096{
1097 type Output = <Self as Resolvable>::To;
1098 type IntoFuture = Ready<<Self as Resolvable>::To>;
1099
1100 #[zenoh_macros::unstable]
1101 fn into_future(self) -> Self::IntoFuture {
1102 std::future::ready(self.wait())
1103 }
1104}
1105
1106fn register_handler(state: Arc<Mutex<InnerState>>, callback: Callback<Sample>) -> RepliesHandler {
1107 zlock!(state).pending_fetches += 1;
1108 RepliesHandler { state, callback }
1110}
1111
1112#[zenoh_macros::unstable]
1113#[allow(deprecated)]
1114fn run_fetch<
1115 Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
1116 TryIntoSample,
1117>(
1118 fetch: Fetch,
1119 handler: RepliesHandler,
1120) -> ZResult<()>
1121where
1122 TryIntoSample: ExtractSample,
1123{
1124 tracing::debug!("Fetch data for FetchingSubscriber");
1125 (fetch)(Box::new(move |s: TryIntoSample| match s.extract() {
1126 Ok(s) => {
1127 let mut state = zlock!(handler.state);
1128 tracing::trace!("Fetched sample received: push it to merge_queue");
1129 state.merge_queue.push(s);
1130 }
1131 Err(e) => tracing::debug!("Received error fetching data: {}", e),
1132 }))
1133}
1134
1135#[zenoh_macros::unstable]
1137#[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
1138#[allow(deprecated)]
1139pub trait ExtractSample {
1140 #[deprecated = "Use `AdvancedPublisher` and `AdvancedSubscriber` instead."]
1141 fn extract(self) -> ZResult<Sample>;
1142}
1143
1144#[allow(deprecated)]
1145impl ExtractSample for Reply {
1146 fn extract(self) -> ZResult<Sample> {
1147 self.into_result().map_err(|e| zerror!("{:?}", e).into())
1148 }
1149}