1use std::{collections::BTreeMap, future::IntoFuture, str::FromStr};
15
16use zenoh::{
17 config::ZenohId,
18 handlers::{Callback, CallbackParameter, IntoHandler},
19 key_expr::KeyExpr,
20 liveliness::{LivelinessSubscriberBuilder, LivelinessToken},
21 pubsub::SubscriberBuilder,
22 query::{
23 ConsolidationMode, Parameters, Selector, TimeBound, TimeExpr, TimeRange, ZenohParameters,
24 },
25 sample::{Locality, Sample, SampleKind, SourceSn},
26 session::{EntityGlobalId, EntityId},
27 Resolvable, Resolve, Session, Wait, KE_ADV_PREFIX, KE_EMPTY, KE_PUB, KE_STAR, KE_STARSTAR,
28 KE_SUB,
29};
30use zenoh_util::{Timed, TimedEvent, Timer};
31#[zenoh_macros::unstable]
32use {
33 async_trait::async_trait,
34 std::collections::hash_map::Entry,
35 std::collections::HashMap,
36 std::convert::TryFrom,
37 std::future::Ready,
38 std::sync::{Arc, Mutex},
39 std::time::Duration,
40 uhlc::ID,
41 zenoh::handlers::{locked, DefaultHandler},
42 zenoh::internal::{runtime::ZRuntime, zlock},
43 zenoh::pubsub::Subscriber,
44 zenoh::query::{QueryTarget, Reply, ReplyKeyExpr},
45 zenoh::time::Timestamp,
46 zenoh::Result as ZResult,
47};
48
49use crate::{
50 advanced_cache::{ke_liveliness, KE_UHLC},
51 z_deserialize,
52};
53
54#[derive(Debug, Default, Clone)]
55#[zenoh_macros::unstable]
57pub struct HistoryConfig {
58 liveliness: bool,
59 sample_depth: Option<usize>,
60 age: Option<f64>,
61}
62
63#[zenoh_macros::unstable]
64impl HistoryConfig {
65 #[inline]
70 #[zenoh_macros::unstable]
71 pub fn detect_late_publishers(mut self) -> Self {
72 self.liveliness = true;
73 self
74 }
75
76 #[zenoh_macros::unstable]
78 pub fn max_samples(mut self, depth: usize) -> Self {
79 self.sample_depth = Some(depth);
80 self
81 }
82
83 #[zenoh_macros::unstable]
85 pub fn max_age(mut self, seconds: f64) -> Self {
86 self.age = Some(seconds);
87 self
88 }
89}
90
91#[derive(Debug, Default, Clone, Copy)]
92#[zenoh_macros::unstable]
94pub struct RecoveryConfig<const CONFIGURED: bool = true> {
95 periodic_queries: Option<Duration>,
96 heartbeat: bool,
97}
98
99#[zenoh_macros::unstable]
100impl RecoveryConfig<false> {
101 #[zenoh_macros::unstable]
110 #[inline]
111 pub fn periodic_queries(self, period: Duration) -> RecoveryConfig<true> {
112 RecoveryConfig {
113 periodic_queries: Some(period),
114 heartbeat: false,
115 }
116 }
117
118 #[zenoh_macros::unstable]
127 #[inline]
128 pub fn heartbeat(self) -> RecoveryConfig<true> {
129 RecoveryConfig {
130 periodic_queries: None,
131 heartbeat: true,
132 }
133 }
134}
135
136#[zenoh_macros::unstable]
138pub struct AdvancedSubscriberBuilder<'a, 'b, 'c, Handler, const BACKGROUND: bool = false> {
139 pub(crate) session: &'a Session,
140 pub(crate) key_expr: ZResult<KeyExpr<'b>>,
141 pub(crate) origin: Locality,
142 pub(crate) retransmission: Option<RecoveryConfig>,
143 pub(crate) query_target: QueryTarget,
144 pub(crate) query_timeout: Duration,
145 pub(crate) history: Option<HistoryConfig>,
146 pub(crate) liveliness: bool,
147 pub(crate) meta_key_expr: Option<ZResult<KeyExpr<'c>>>,
148 pub(crate) handler: Handler,
149}
150
151#[zenoh_macros::unstable]
152impl<'a, 'b, Handler> AdvancedSubscriberBuilder<'a, 'b, '_, Handler> {
153 #[zenoh_macros::unstable]
154 pub(crate) fn new(builder: SubscriberBuilder<'a, 'b, Handler>) -> Self {
155 AdvancedSubscriberBuilder {
156 session: builder.session,
157 key_expr: builder.key_expr,
158 origin: builder.origin,
159 handler: builder.handler,
160 retransmission: None,
161 query_target: QueryTarget::All,
162 query_timeout: Duration::from_secs(10),
163 history: None,
164 liveliness: false,
165 meta_key_expr: None,
166 }
167 }
168}
169
170#[zenoh_macros::unstable]
171impl<'a, 'b, 'c> AdvancedSubscriberBuilder<'a, 'b, 'c, DefaultHandler> {
172 #[inline]
174 #[zenoh_macros::unstable]
175 pub fn callback<F>(self, callback: F) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback<Sample>>
176 where
177 F: Fn(Sample) + Send + Sync + 'static,
178 {
179 self.with(Callback::from(callback))
180 }
181
182 #[inline]
187 #[zenoh_macros::unstable]
188 pub fn callback_mut<F>(
189 self,
190 callback: F,
191 ) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback<Sample>>
192 where
193 F: FnMut(Sample) + Send + Sync + 'static,
194 {
195 self.callback(locked(callback))
196 }
197
198 #[inline]
200 #[zenoh_macros::unstable]
201 pub fn with<Handler>(self, handler: Handler) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler>
202 where
203 Handler: IntoHandler<Sample>,
204 {
205 AdvancedSubscriberBuilder {
206 session: self.session,
207 key_expr: self.key_expr,
208 origin: self.origin,
209 retransmission: self.retransmission,
210 query_target: self.query_target,
211 query_timeout: self.query_timeout,
212 history: self.history,
213 liveliness: self.liveliness,
214 meta_key_expr: self.meta_key_expr,
215 handler,
216 }
217 }
218}
219
220#[zenoh_macros::unstable]
221impl<'a, 'b, 'c> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback<Sample>> {
222 pub fn background(self) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback<Sample>, true> {
226 AdvancedSubscriberBuilder {
227 session: self.session,
228 key_expr: self.key_expr,
229 origin: self.origin,
230 retransmission: self.retransmission,
231 query_target: self.query_target,
232 query_timeout: self.query_timeout,
233 history: self.history,
234 liveliness: self.liveliness,
235 meta_key_expr: self.meta_key_expr,
236 handler: self.handler,
237 }
238 }
239}
240
241#[zenoh_macros::unstable]
242impl<'a, 'c, Handler, const BACKGROUND: bool>
243 AdvancedSubscriberBuilder<'a, '_, 'c, Handler, BACKGROUND>
244{
245 #[zenoh_macros::unstable]
248 #[inline]
249 pub fn allowed_origin(mut self, origin: Locality) -> Self {
250 self.origin = origin;
251 self
252 }
253
254 #[zenoh_macros::unstable]
260 #[inline]
261 pub fn recovery(mut self, conf: RecoveryConfig) -> Self {
262 self.retransmission = Some(conf);
263 self
264 }
265
266 #[zenoh_macros::unstable]
276 #[inline]
277 pub fn query_timeout(mut self, query_timeout: Duration) -> Self {
278 self.query_timeout = query_timeout;
279 self
280 }
281
282 #[zenoh_macros::unstable]
286 #[inline]
287 pub fn history(mut self, config: HistoryConfig) -> Self {
288 self.history = Some(config);
289 self
290 }
291
292 #[zenoh_macros::unstable]
294 pub fn subscriber_detection(mut self) -> Self {
295 self.liveliness = true;
296 self
297 }
298
299 #[zenoh_macros::unstable]
302 pub fn subscriber_detection_metadata<TryIntoKeyExpr>(mut self, meta: TryIntoKeyExpr) -> Self
303 where
304 TryIntoKeyExpr: TryInto<KeyExpr<'c>>,
305 <TryIntoKeyExpr as TryInto<KeyExpr<'c>>>::Error: Into<zenoh::Error>,
306 {
307 self.meta_key_expr = Some(meta.try_into().map_err(Into::into));
308 self
309 }
310
311 #[zenoh_macros::unstable]
312 fn with_static_keys(self) -> AdvancedSubscriberBuilder<'a, 'static, 'static, Handler> {
313 AdvancedSubscriberBuilder {
314 session: self.session,
315 key_expr: self.key_expr.map(|s| s.into_owned()),
316 origin: self.origin,
317 retransmission: self.retransmission,
318 query_target: self.query_target,
319 query_timeout: self.query_timeout,
320 history: self.history,
321 liveliness: self.liveliness,
322 meta_key_expr: self.meta_key_expr.map(|s| s.map(|s| s.into_owned())),
323 handler: self.handler,
324 }
325 }
326}
327
328#[zenoh_macros::unstable]
329impl<Handler> Resolvable for AdvancedSubscriberBuilder<'_, '_, '_, Handler>
330where
331 Handler: IntoHandler<Sample>,
332 Handler::Handler: Send,
333{
334 type To = ZResult<AdvancedSubscriber<Handler::Handler>>;
335}
336
337#[zenoh_macros::unstable]
338impl<Handler> Wait for AdvancedSubscriberBuilder<'_, '_, '_, Handler>
339where
340 Handler: IntoHandler<Sample> + Send,
341 Handler::Handler: Send,
342{
343 #[zenoh_macros::unstable]
344 fn wait(self) -> <Self as Resolvable>::To {
345 AdvancedSubscriber::new(self.with_static_keys())
346 }
347}
348
349#[zenoh_macros::unstable]
350impl<Handler> IntoFuture for AdvancedSubscriberBuilder<'_, '_, '_, Handler>
351where
352 Handler: IntoHandler<Sample> + Send,
353 Handler::Handler: Send,
354{
355 type Output = <Self as Resolvable>::To;
356 type IntoFuture = Ready<<Self as Resolvable>::To>;
357
358 #[zenoh_macros::unstable]
359 fn into_future(self) -> Self::IntoFuture {
360 std::future::ready(self.wait())
361 }
362}
363
364#[zenoh_macros::unstable]
365impl Resolvable for AdvancedSubscriberBuilder<'_, '_, '_, Callback<Sample>, true> {
366 type To = ZResult<()>;
367}
368
369#[zenoh_macros::unstable]
370impl Wait for AdvancedSubscriberBuilder<'_, '_, '_, Callback<Sample>, true> {
371 #[zenoh_macros::unstable]
372 fn wait(self) -> <Self as Resolvable>::To {
373 let mut sub = AdvancedSubscriber::new(self.with_static_keys())?;
374 sub.set_background_impl(true);
375 Ok(())
376 }
377}
378
379#[zenoh_macros::unstable]
380impl IntoFuture for AdvancedSubscriberBuilder<'_, '_, '_, Callback<Sample>, true> {
381 type Output = <Self as Resolvable>::To;
382 type IntoFuture = Ready<<Self as Resolvable>::To>;
383
384 #[zenoh_macros::unstable]
385 fn into_future(self) -> Self::IntoFuture {
386 std::future::ready(self.wait())
387 }
388}
389
390#[zenoh_macros::unstable]
391struct Period {
392 timer: Timer,
393 period: Duration,
394}
395
396#[zenoh_macros::unstable]
397struct State {
398 next_id: usize,
399 global_pending_queries: u64,
400 sequenced_states: HashMap<EntityGlobalId, SourceState<u32>>,
401 timestamped_states: HashMap<ID, SourceState<Timestamp>>,
402 session: Session,
403 key_expr: KeyExpr<'static>,
404 retransmission: bool,
405 period: Option<Period>,
406 history_depth: usize,
407 query_target: QueryTarget,
408 query_timeout: Duration,
409 callback: Callback<Sample>,
410 miss_handlers: HashMap<usize, Callback<Miss>>,
411 token: Option<LivelinessToken>,
412}
413
414#[zenoh_macros::unstable]
415impl State {
416 #[zenoh_macros::unstable]
417 fn register_miss_callback(&mut self, callback: Callback<Miss>) -> usize {
418 let id = self.next_id;
419 self.next_id += 1;
420 self.miss_handlers.insert(id, callback);
421 id
422 }
423 #[zenoh_macros::unstable]
424 fn unregister_miss_callback(&mut self, id: &usize) {
425 self.miss_handlers.remove(id);
426 }
427}
428
429macro_rules! spawn_periodic_queries {
430 ($p:expr,$s:expr,$r:expr) => {{
431 if let Some(period) = &$p.period {
432 period.timer.add(TimedEvent::periodic(
433 period.period,
434 PeriodicQuery {
435 source_id: $s,
436 statesref: $r,
437 },
438 ))
439 }
440 }};
441}
442
443#[zenoh_macros::unstable]
444struct SourceState<T> {
445 last_delivered: Option<T>,
446 pending_queries: u64,
447 pending_samples: BTreeMap<T, Sample>,
448}
449
450#[zenoh_macros::unstable]
521pub struct AdvancedSubscriber<Receiver> {
522 statesref: Arc<Mutex<State>>,
523 subscriber: Subscriber<()>,
524 receiver: Receiver,
525 liveliness_subscriber: Option<Subscriber<()>>,
526 heartbeat_subscriber: Option<Subscriber<()>>,
527}
528
529#[zenoh_macros::unstable]
530impl<Receiver> std::ops::Deref for AdvancedSubscriber<Receiver> {
531 type Target = Receiver;
532 fn deref(&self) -> &Self::Target {
533 &self.receiver
534 }
535}
536
537#[zenoh_macros::unstable]
538impl<Receiver> std::ops::DerefMut for AdvancedSubscriber<Receiver> {
539 fn deref_mut(&mut self) -> &mut Self::Target {
540 &mut self.receiver
541 }
542}
543
544#[zenoh_macros::unstable]
545fn handle_sample(states: &mut State, sample: Sample) -> bool {
546 if let Some(source_info) = sample.source_info().cloned() {
547 #[inline]
548 fn deliver_and_flush(
549 sample: Sample,
550 mut source_sn: SourceSn,
551 callback: &Callback<Sample>,
552 state: &mut SourceState<u32>,
553 ) {
554 callback.call(sample);
555 state.last_delivered = Some(source_sn);
556 while let Some(sample) = state.pending_samples.remove(&(source_sn + 1)) {
557 callback.call(sample);
558 source_sn += 1;
559 state.last_delivered = Some(source_sn);
560 }
561 }
562
563 let entry = states.sequenced_states.entry(*source_info.source_id());
564 let new = matches!(&entry, Entry::Vacant(_));
565 let state = entry.or_insert(SourceState::<u32> {
566 last_delivered: None,
567 pending_queries: 0,
568 pending_samples: BTreeMap::new(),
569 });
570 if state.last_delivered.is_none() && states.global_pending_queries != 0 {
571 if states.history_depth == 1 {
573 state.last_delivered = Some(source_info.source_sn());
574 states.callback.call(sample);
575 } else {
576 state
577 .pending_samples
578 .insert(source_info.source_sn(), sample);
579 if state.pending_samples.len() >= states.history_depth {
580 if let Some((sn, sample)) = state.pending_samples.pop_first() {
581 deliver_and_flush(sample, sn, &states.callback, state);
582 }
583 }
584 }
585 } else if state.last_delivered.is_some()
586 && source_info.source_sn() != state.last_delivered.unwrap() + 1
587 {
588 if source_info.source_sn() > state.last_delivered.unwrap() {
589 if states.retransmission {
590 state
591 .pending_samples
592 .insert(source_info.source_sn(), sample);
593 } else {
594 tracing::info!(
595 "Sample missed: missed {} samples from {:?}.",
596 source_info.source_sn() - state.last_delivered.unwrap() - 1,
597 source_info.source_id(),
598 );
599 for miss_callback in states.miss_handlers.values() {
600 miss_callback.call(Miss {
601 source: *source_info.source_id(),
602 nb: source_info.source_sn() - state.last_delivered.unwrap() - 1,
603 });
604 }
605 states.callback.call(sample);
606 state.last_delivered = Some(source_info.source_sn());
607 }
608 }
609 } else {
610 deliver_and_flush(sample, source_info.source_sn(), &states.callback, state);
611 }
612 new
613 } else if let Some(timestamp) = sample.timestamp() {
614 let entry = states.timestamped_states.entry(*timestamp.get_id());
615 let state = entry.or_insert(SourceState::<Timestamp> {
616 last_delivered: None,
617 pending_queries: 0,
618 pending_samples: BTreeMap::new(),
619 });
620 if state.last_delivered.map(|t| t < *timestamp).unwrap_or(true) {
621 if (states.global_pending_queries == 0 && state.pending_queries == 0)
622 || states.history_depth == 1
623 {
624 state.last_delivered = Some(*timestamp);
625 states.callback.call(sample);
626 } else {
627 state.pending_samples.entry(*timestamp).or_insert(sample);
628 if state.pending_samples.len() >= states.history_depth {
629 flush_timestamped_source(state, &states.callback);
630 }
631 }
632 }
633 false
634 } else {
635 states.callback.call(sample);
636 false
637 }
638}
639
640#[zenoh_macros::unstable]
641fn seq_num_range(start: Option<u32>, end: Option<u32>) -> String {
642 match (start, end) {
643 (Some(start), Some(end)) => format!("_sn={start}..{end}"),
644 (Some(start), None) => format!("_sn={start}.."),
645 (None, Some(end)) => format!("_sn=..{end}"),
646 (None, None) => "_sn=..".to_string(),
647 }
648}
649
650#[zenoh_macros::unstable]
651#[derive(Clone)]
652struct PeriodicQuery {
653 source_id: EntityGlobalId,
654 statesref: Arc<Mutex<State>>,
655}
656
657#[zenoh_macros::unstable]
658#[async_trait]
659impl Timed for PeriodicQuery {
660 async fn run(&mut self) {
661 let mut lock = zlock!(self.statesref);
662 let states = &mut *lock;
663 if let Some(state) = states.sequenced_states.get_mut(&self.source_id) {
664 state.pending_queries += 1;
665 let query_expr = &states.key_expr
666 / KE_ADV_PREFIX
667 / KE_STAR
668 / &self.source_id.zid().into_keyexpr()
669 / &KeyExpr::try_from(self.source_id.eid().to_string()).unwrap()
670 / KE_STARSTAR;
671 let seq_num_range = seq_num_range(state.last_delivered.map(|s| s + 1), None);
672
673 let session = states.session.clone();
674 let key_expr = states.key_expr.clone().into_owned();
675 let query_target = states.query_target;
676 let query_timeout = states.query_timeout;
677
678 tracing::trace!(
679 "AdvancedSubscriber{{key_expr: {}}}: Querying undelivered samples {}?{}",
680 states.key_expr,
681 query_expr,
682 seq_num_range
683 );
684 drop(lock);
685
686 let handler = SequencedRepliesHandler {
687 source_id: self.source_id,
688 statesref: self.statesref.clone(),
689 };
690 let _ = session
691 .get(Selector::from((query_expr, seq_num_range)))
692 .callback({
693 move |r: Reply| {
694 if let Ok(s) = r.into_result() {
695 if key_expr.intersects(s.key_expr()) {
696 let states = &mut *zlock!(handler.statesref);
697 tracing::trace!(
698 "AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}",
699 states.key_expr,
700 s.source_info(),
701 s.timestamp()
702 );
703 handle_sample(states, s);
704 }
705 }
706 }
707 })
708 .consolidation(ConsolidationMode::None)
709 .accept_replies(ReplyKeyExpr::Any)
710 .target(query_target)
711 .timeout(query_timeout)
712 .wait();
713 }
714 }
715}
716
717#[zenoh_macros::unstable]
718impl<Handler> AdvancedSubscriber<Handler> {
719 fn new<H>(conf: AdvancedSubscriberBuilder<'_, '_, '_, H>) -> ZResult<Self>
720 where
721 H: IntoHandler<Sample, Handler = Handler> + Send,
722 {
723 let (callback, receiver) = conf.handler.into_handler();
724 let key_expr = conf.key_expr?;
725 let meta = match conf.meta_key_expr {
726 Some(meta) => Some(meta?),
727 None => None,
728 };
729 let retransmission = conf.retransmission;
730 let query_target = conf.query_target;
731 let query_timeout = conf.query_timeout;
732 let session = conf.session.clone();
733 let statesref = Arc::new(Mutex::new(State {
734 next_id: 0,
735 sequenced_states: HashMap::new(),
736 timestamped_states: HashMap::new(),
737 global_pending_queries: if conf.history.is_some() { 1 } else { 0 },
738 session,
739 period: retransmission.as_ref().and_then(|r| {
740 let _rt = ZRuntime::Application.enter();
741 r.periodic_queries.map(|p| Period {
742 timer: Timer::new(false),
743 period: p,
744 })
745 }),
746 key_expr: key_expr.clone().into_owned(),
747 retransmission: retransmission.is_some(),
748 history_depth: conf
749 .history
750 .as_ref()
751 .and_then(|h| h.sample_depth)
752 .unwrap_or_default(),
753 query_target: conf.query_target,
754 query_timeout: conf.query_timeout,
755 callback: callback.clone(),
756 miss_handlers: HashMap::new(),
757 token: None,
758 }));
759
760 let sub_callback = {
761 let statesref = statesref.clone();
762 let session = conf.session.clone();
763 let key_expr = key_expr.clone().into_owned();
764
765 move |s: Sample| {
766 let mut lock = zlock!(statesref);
767 let states = &mut *lock;
768 let source_id = s.source_info().map(|si| *si.source_id());
769 let new = handle_sample(states, s);
770
771 if let Some(source_id) = source_id {
772 if new {
773 spawn_periodic_queries!(states, source_id, statesref.clone());
774 }
775 if let Some(state) = states.sequenced_states.get_mut(&source_id) {
776 if retransmission.is_some()
777 && state.pending_queries == 0
778 && !state.pending_samples.is_empty()
779 {
780 state.pending_queries += 1;
781 let query_expr = &key_expr
782 / KE_ADV_PREFIX
783 / KE_STAR
784 / &source_id.zid().into_keyexpr()
785 / &KeyExpr::try_from(source_id.eid().to_string()).unwrap()
786 / KE_STARSTAR;
787 let seq_num_range =
788 seq_num_range(state.last_delivered.map(|s| s + 1), None);
789 tracing::trace!(
790 "AdvancedSubscriber{{key_expr: {}}}: Querying missing samples {}?{}",
791 states.key_expr,
792 query_expr,
793 seq_num_range
794 );
795 drop(lock);
796 let handler = SequencedRepliesHandler {
797 source_id,
798 statesref: statesref.clone(),
799 };
800 let _ = session
801 .get(Selector::from((query_expr, seq_num_range)))
802 .callback({
803 let key_expr = key_expr.clone().into_owned();
804 move |r: Reply| {
805 if let Ok(s) = r.into_result() {
806 if key_expr.intersects(s.key_expr()) {
807 let states = &mut *zlock!(handler.statesref);
808 tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
809 handle_sample(states, s);
810 }
811 }
812 }
813 })
814 .consolidation(ConsolidationMode::None)
815 .accept_replies(ReplyKeyExpr::Any)
816 .target(query_target)
817 .timeout(query_timeout)
818 .wait();
819 }
820 }
821 }
822 }
823 };
824
825 let subscriber = conf
826 .session
827 .declare_subscriber(&key_expr)
828 .callback(sub_callback)
829 .allowed_origin(conf.origin)
830 .wait()?;
831
832 tracing::debug!("Create AdvancedSubscriber{{key_expr: {}}}", key_expr,);
833
834 if let Some(historyconf) = conf.history.as_ref() {
835 let handler = InitialRepliesHandler {
836 statesref: statesref.clone(),
837 };
838 let mut params = Parameters::empty();
839 if let Some(max) = historyconf.sample_depth {
840 params.insert("_max", max.to_string());
841 }
842 if let Some(age) = historyconf.age {
843 params.set_time_range(TimeRange {
844 start: TimeBound::Inclusive(TimeExpr::Now { offset_secs: -age }),
845 end: TimeBound::Unbounded,
846 });
847 }
848 tracing::trace!(
849 "AdvancedSubscriber{{key_expr: {}}} Querying historical samples {}?{}",
850 key_expr,
851 &key_expr / KE_ADV_PREFIX / KE_STARSTAR,
852 params
853 );
854 let _ = conf
855 .session
856 .get(Selector::from((
857 &key_expr / KE_ADV_PREFIX / KE_STARSTAR,
858 params,
859 )))
860 .callback({
861 let key_expr = key_expr.clone().into_owned();
862 move |r: Reply| {
863 if let Ok(s) = r.into_result() {
864 if key_expr.intersects(s.key_expr()) {
865 let states = &mut *zlock!(handler.statesref);
866 tracing::trace!(
867 "AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}",
868 states.key_expr,
869 s.source_info(),
870 s.timestamp()
871 );
872 handle_sample(states, s);
873 }
874 }
875 }
876 })
877 .consolidation(ConsolidationMode::None)
878 .accept_replies(ReplyKeyExpr::Any)
879 .target(query_target)
880 .timeout(query_timeout)
881 .wait();
882 }
883
884 let liveliness_subscriber = if let Some(historyconf) = conf.history.as_ref() {
885 if historyconf.liveliness {
886 let live_callback = {
887 let session = conf.session.clone();
888 let statesref = statesref.clone();
889 let key_expr = key_expr.clone().into_owned();
890 let historyconf = historyconf.clone();
891 move |s: Sample| {
892 if s.kind() == SampleKind::Put {
893 if let Ok(parsed) = ke_liveliness::parse(s.key_expr().as_keyexpr()) {
894 if let Ok(zid) = ZenohId::from_str(parsed.zid().as_str()) {
895 if parsed.eid() == KE_UHLC {
898 let mut lock = zlock!(statesref);
899 let states = &mut *lock;
900 tracing::trace!(
901 "AdvancedSubscriber{{key_expr: {}}}: Detect late joiner publishers with zid={}",
902 states.key_expr,
903 parsed.zid().as_str()
904 );
905 let entry = states.timestamped_states.entry(ID::from(zid));
906 let state = entry.or_insert(SourceState::<Timestamp> {
907 last_delivered: None,
908 pending_queries: 0,
909 pending_samples: BTreeMap::new(),
910 });
911 state.pending_queries += 1;
912
913 let mut params = Parameters::empty();
914 if let Some(max) = historyconf.sample_depth {
915 params.insert("_max", max.to_string());
916 }
917 if let Some(age) = historyconf.age {
918 params.set_time_range(TimeRange {
919 start: TimeBound::Inclusive(TimeExpr::Now {
920 offset_secs: -age,
921 }),
922 end: TimeBound::Unbounded,
923 });
924 }
925 tracing::trace!(
926 "AdvancedSubscriber{{key_expr: {}}}: Querying historical samples {}?{}",
927 states.key_expr,
928 s.key_expr(),
929 params
930 );
931 drop(lock);
932
933 let handler = TimestampedRepliesHandler {
934 id: ID::from(zid),
935 statesref: statesref.clone(),
936 callback: callback.clone(),
937 };
938 let _ = session
939 .get(Selector::from((s.key_expr(), params)))
940 .callback({
941 let key_expr = key_expr.clone().into_owned();
942 move |r: Reply| {
943 if let Ok(s) = r.into_result() {
944 if key_expr.intersects(s.key_expr()) {
945 let states =
946 &mut *zlock!(handler.statesref);
947 tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
948 handle_sample(states, s);
949 }
950 }
951 }
952 })
953 .consolidation(ConsolidationMode::None)
954 .accept_replies(ReplyKeyExpr::Any)
955 .target(query_target)
956 .timeout(query_timeout)
957 .wait();
958 } else if let Ok(eid) =
959 EntityId::from_str(parsed.eid().as_str())
960 {
961 let source_id = EntityGlobalId::new(zid, eid);
962 let mut lock = zlock!(statesref);
963 let states = &mut *lock;
964 tracing::trace!(
965 "AdvancedSubscriber{{key_expr: {}}}: Detect late joiner publishers with zid={}",
966 states.key_expr,
967 parsed.zid().as_str()
968 );
969 let entry = states.sequenced_states.entry(source_id);
970 let new = matches!(&entry, Entry::Vacant(_));
971 let state = entry.or_insert(SourceState::<u32> {
972 last_delivered: None,
973 pending_queries: 0,
974 pending_samples: BTreeMap::new(),
975 });
976 state.pending_queries += 1;
977
978 let mut params = Parameters::empty();
979 if let Some(max) = historyconf.sample_depth {
980 params.insert("_max", max.to_string());
981 }
982 if let Some(age) = historyconf.age {
983 params.set_time_range(TimeRange {
984 start: TimeBound::Inclusive(TimeExpr::Now {
985 offset_secs: -age,
986 }),
987 end: TimeBound::Unbounded,
988 });
989 }
990 tracing::trace!(
991 "AdvancedSubscriber{{key_expr: {}}}: Querying historical samples {}?{}",
992 states.key_expr,
993 s.key_expr(),
994 params,
995 );
996 drop(lock);
997
998 let handler = SequencedRepliesHandler {
999 source_id,
1000 statesref: statesref.clone(),
1001 };
1002 let _ = session
1003 .get(Selector::from((s.key_expr(), params)))
1004 .callback({
1005 let key_expr = key_expr.clone().into_owned();
1006 move |r: Reply| {
1007 if let Ok(s) = r.into_result() {
1008 if key_expr.intersects(s.key_expr()) {
1009 let states =
1010 &mut *zlock!(handler.statesref);
1011 tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
1012 handle_sample(states, s);
1013 }
1014 }
1015 }
1016 })
1017 .consolidation(ConsolidationMode::None)
1018 .accept_replies(ReplyKeyExpr::Any)
1019 .target(query_target)
1020 .timeout(query_timeout)
1021 .wait();
1022
1023 if new {
1024 spawn_periodic_queries!(
1025 zlock!(statesref),
1026 source_id,
1027 statesref.clone()
1028 );
1029 }
1030 }
1031 } else {
1032 let mut lock = zlock!(statesref);
1033 let states = &mut *lock;
1034 tracing::trace!(
1035 "AdvancedSubscriber{{key_expr: {}}}: Detect late joiner publishers with zid={}",
1036 states.key_expr,
1037 parsed.zid().as_str()
1038 );
1039 states.global_pending_queries += 1;
1040
1041 let mut params = Parameters::empty();
1042 if let Some(max) = historyconf.sample_depth {
1043 params.insert("_max", max.to_string());
1044 }
1045 if let Some(age) = historyconf.age {
1046 params.set_time_range(TimeRange {
1047 start: TimeBound::Inclusive(TimeExpr::Now {
1048 offset_secs: -age,
1049 }),
1050 end: TimeBound::Unbounded,
1051 });
1052 }
1053 tracing::trace!(
1054 "AdvancedSubscriber{{key_expr: {}}}: Querying historical samples {}?{}",
1055 states.key_expr,
1056 s.key_expr(),
1057 params,
1058 );
1059 drop(lock);
1060
1061 let handler = InitialRepliesHandler {
1062 statesref: statesref.clone(),
1063 };
1064 let _ = session
1065 .get(Selector::from((s.key_expr(), params)))
1066 .callback({
1067 let key_expr = key_expr.clone().into_owned();
1068 move |r: Reply| {
1069 if let Ok(s) = r.into_result() {
1070 if key_expr.intersects(s.key_expr()) {
1071 let states =
1072 &mut *zlock!(handler.statesref);
1073 tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
1074 handle_sample(states, s);
1075 }
1076 }
1077 }
1078 })
1079 .consolidation(ConsolidationMode::None)
1080 .accept_replies(ReplyKeyExpr::Any)
1081 .target(query_target)
1082 .timeout(query_timeout)
1083 .wait();
1084 }
1085 } else {
1086 tracing::warn!(
1087 "AdvancedSubscriber{{}}: Received malformed liveliness token key expression: {}",
1088 s.key_expr()
1089 );
1090 }
1091 }
1092 }
1093 };
1094
1095 tracing::debug!(
1096 "AdvancedSubscriber{{key_expr: {}}}: Detect late joiner publishers on {}",
1097 key_expr,
1098 &key_expr / KE_ADV_PREFIX / KE_PUB / KE_STARSTAR
1099 );
1100 Some(
1101 conf.session
1102 .liveliness()
1103 .declare_subscriber(&key_expr / KE_ADV_PREFIX / KE_PUB / KE_STARSTAR)
1104 .history(true)
1106 .callback(live_callback)
1107 .wait()?,
1108 )
1109 } else {
1110 None
1111 }
1112 } else {
1113 None
1114 };
1115
1116 let heartbeat_subscriber = if retransmission.is_some_and(|r| r.heartbeat) {
1117 let ke_heartbeat_sub = &key_expr / KE_ADV_PREFIX / KE_PUB / KE_STARSTAR;
1118 let statesref = statesref.clone();
1119 tracing::debug!(
1120 "AdvancedSubscriber{{key_expr: {}}}: Enable heartbeat subscriber on {}",
1121 key_expr,
1122 ke_heartbeat_sub
1123 );
1124 let heartbeat_sub = conf
1125 .session
1126 .declare_subscriber(ke_heartbeat_sub)
1127 .callback(move |sample_hb| {
1128 if sample_hb.kind() != SampleKind::Put {
1129 return;
1130 }
1131
1132 let heartbeat_keyexpr = sample_hb.key_expr().as_keyexpr();
1133 let Ok(parsed_keyexpr) = ke_liveliness::parse(heartbeat_keyexpr) else {
1134 return;
1135 };
1136 let source_id = {
1137 let Ok(zid) = ZenohId::from_str(parsed_keyexpr.zid().as_str()) else {
1138 return;
1139 };
1140 let Ok(eid) = EntityId::from_str(parsed_keyexpr.eid().as_str()) else {
1141 return;
1142 };
1143 EntityGlobalId::new(zid, eid)
1144 };
1145
1146 let Ok(heartbeat_sn) = z_deserialize::<u32>(sample_hb.payload()) else {
1147 tracing::debug!(
1148 "AdvancedSubscriber{{}}: Skipping invalid heartbeat payload on '{}'",
1149 heartbeat_keyexpr
1150 );
1151 return;
1152 };
1153
1154 let mut lock = zlock!(statesref);
1155 let states = &mut *lock;
1156 let entry = states.sequenced_states.entry(source_id);
1157 if matches!(&entry, Entry::Vacant(_)) {
1158 spawn_periodic_queries!(states, source_id, statesref.clone());
1160 if states.global_pending_queries > 0 {
1161 tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Skipping heartbeat on '{}' from publisher that is currently being pulled by global query", states.key_expr, heartbeat_keyexpr);
1162 return;
1163 }
1164 }
1165
1166 let state = entry.or_insert(SourceState::<u32> {
1167 last_delivered: None,
1168 pending_queries: 0,
1169 pending_samples: BTreeMap::new(),
1170 });
1171
1172 if (state.last_delivered.is_none()
1174 || state.last_delivered.is_some_and(|sn| heartbeat_sn > sn))
1175 && state.pending_queries == 0
1176 {
1177 let seq_num_range = seq_num_range(
1178 state.last_delivered.map(|s| s + 1),
1179 Some(heartbeat_sn),
1180 );
1181
1182 let session = states.session.clone();
1183 let key_expr = states.key_expr.clone().into_owned();
1184 let query_target = states.query_target;
1185 let query_timeout = states.query_timeout;
1186 state.pending_queries += 1;
1187
1188 tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Querying missing samples {}?{}", states.key_expr, heartbeat_keyexpr, seq_num_range);
1189 drop(lock);
1190
1191 let handler = SequencedRepliesHandler {
1192 source_id,
1193 statesref: statesref.clone(),
1194 };
1195 let _ = session
1196 .get(Selector::from((heartbeat_keyexpr, seq_num_range)))
1197 .callback({
1198 move |r: Reply| {
1199 if let Ok(s) = r.into_result() {
1200 if key_expr.intersects(s.key_expr()) {
1201 let states = &mut *zlock!(handler.statesref);
1202 tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
1203 handle_sample(states, s);
1204 }
1205 }
1206 }
1207 })
1208 .consolidation(ConsolidationMode::None)
1209 .accept_replies(ReplyKeyExpr::Any)
1210 .target(query_target)
1211 .timeout(query_timeout)
1212 .wait();
1213 }
1214 })
1215 .allowed_origin(conf.origin)
1216 .wait()?;
1217 Some(heartbeat_sub)
1218 } else {
1219 None
1220 };
1221
1222 if conf.liveliness {
1223 let suffix = KE_ADV_PREFIX
1224 / KE_SUB
1225 / &subscriber.id().zid().into_keyexpr()
1226 / &KeyExpr::try_from(subscriber.id().eid().to_string()).unwrap();
1227 let suffix = match meta {
1228 Some(meta) => suffix / &meta,
1229 _ => suffix / KE_EMPTY,
1231 };
1232 tracing::debug!(
1233 "AdvancedSubscriber{{key_expr: {}}}: Declare liveliness token {}",
1234 key_expr,
1235 &key_expr / &suffix,
1236 );
1237 let token = conf
1238 .session
1239 .liveliness()
1240 .declare_token(&key_expr / &suffix)
1241 .wait()?;
1242 zlock!(statesref).token = Some(token)
1243 }
1244
1245 let reliable_subscriber = AdvancedSubscriber {
1246 statesref,
1247 subscriber,
1248 receiver,
1249 liveliness_subscriber,
1250 heartbeat_subscriber,
1251 };
1252
1253 Ok(reliable_subscriber)
1254 }
1255
1256 #[zenoh_macros::unstable]
1258 pub fn id(&self) -> EntityGlobalId {
1259 self.subscriber.id()
1260 }
1261
1262 #[zenoh_macros::unstable]
1264 pub fn key_expr(&self) -> &KeyExpr<'static> {
1265 self.subscriber.key_expr()
1266 }
1267
1268 #[zenoh_macros::unstable]
1272 pub fn handler(&self) -> &Handler {
1273 &self.receiver
1274 }
1275
1276 #[zenoh_macros::unstable]
1280 pub fn handler_mut(&mut self) -> &mut Handler {
1281 &mut self.receiver
1282 }
1283
1284 #[zenoh_macros::unstable]
1289 pub fn sample_miss_listener(&self) -> SampleMissListenerBuilder<'_, DefaultHandler> {
1290 SampleMissListenerBuilder {
1291 statesref: &self.statesref,
1292 handler: DefaultHandler::default(),
1293 }
1294 }
1295
1296 #[zenoh_macros::unstable]
1301 pub fn detect_publishers(&self) -> LivelinessSubscriberBuilder<'_, '_, DefaultHandler> {
1302 self.subscriber
1303 .session()
1304 .liveliness()
1305 .declare_subscriber(self.subscriber.key_expr() / KE_ADV_PREFIX / KE_PUB / KE_STARSTAR)
1306 }
1307
1308 #[inline]
1310 #[zenoh_macros::unstable]
1311 pub fn undeclare(self) -> impl Resolve<ZResult<()>> {
1312 tracing::debug!(
1313 "AdvancedSubscriber{{key_expr: {}}}: Undeclare",
1314 self.key_expr()
1315 );
1316 self.subscriber.undeclare()
1317 }
1318
1319 fn set_background_impl(&mut self, background: bool) {
1320 self.subscriber.set_background(background);
1321 if let Some(mut liveliness_sub) = self.liveliness_subscriber.take() {
1322 liveliness_sub.set_background(background);
1323 }
1324 if let Some(mut heartbeat_sub) = self.heartbeat_subscriber.take() {
1325 heartbeat_sub.set_background(background);
1326 }
1327 }
1328
1329 #[zenoh_macros::internal]
1330 pub fn set_background(&mut self, background: bool) {
1331 self.set_background_impl(background)
1332 }
1333}
1334
1335#[zenoh_macros::unstable]
1336#[inline]
1337fn flush_sequenced_source(
1338 state: &mut SourceState<u32>,
1339 callback: &Callback<Sample>,
1340 source_id: &EntityGlobalId,
1341 miss_handlers: &HashMap<usize, Callback<Miss>>,
1342) {
1343 if state.pending_queries == 0 && !state.pending_samples.is_empty() {
1344 let mut pending_samples = BTreeMap::new();
1345 std::mem::swap(&mut state.pending_samples, &mut pending_samples);
1346 for (seq_num, sample) in pending_samples {
1347 match state.last_delivered {
1348 None => {
1349 state.last_delivered = Some(seq_num);
1350 callback.call(sample);
1351 }
1352 Some(last) if seq_num == last + 1 => {
1353 state.last_delivered = Some(seq_num);
1354 callback.call(sample);
1355 }
1356 Some(last) if seq_num > last + 1 => {
1357 tracing::warn!(
1358 "Sample missed: missed {} samples from {:?}.",
1359 seq_num - last - 1,
1360 source_id,
1361 );
1362 for miss_callback in miss_handlers.values() {
1363 miss_callback.call(Miss {
1364 source: *source_id,
1365 nb: seq_num - last - 1,
1366 })
1367 }
1368 state.last_delivered = Some(seq_num);
1369 callback.call(sample);
1370 }
1371 _ => {
1372 }
1374 }
1375 }
1376 }
1377}
1378
1379#[zenoh_macros::unstable]
1380#[inline]
1381fn flush_timestamped_source(state: &mut SourceState<Timestamp>, callback: &Callback<Sample>) {
1382 if state.pending_queries == 0 && !state.pending_samples.is_empty() {
1383 let mut pending_samples = BTreeMap::new();
1384 std::mem::swap(&mut state.pending_samples, &mut pending_samples);
1385 for (timestamp, sample) in pending_samples {
1386 if state
1387 .last_delivered
1388 .map(|last| timestamp > last)
1389 .unwrap_or(true)
1390 {
1391 state.last_delivered = Some(timestamp);
1392 callback.call(sample);
1393 }
1394 }
1395 }
1396}
1397
1398#[zenoh_macros::unstable]
1399#[derive(Clone)]
1400struct InitialRepliesHandler {
1401 statesref: Arc<Mutex<State>>,
1402}
1403
1404#[zenoh_macros::unstable]
1405impl Drop for InitialRepliesHandler {
1406 fn drop(&mut self) {
1407 let states = &mut *zlock!(self.statesref);
1408 states.global_pending_queries = states.global_pending_queries.saturating_sub(1);
1409 tracing::trace!(
1410 "AdvancedSubscriber{{key_expr: {}}}: Flush initial replies",
1411 states.key_expr
1412 );
1413
1414 if states.global_pending_queries == 0 {
1415 for (source_id, state) in states.sequenced_states.iter_mut() {
1416 flush_sequenced_source(state, &states.callback, source_id, &states.miss_handlers);
1417 spawn_periodic_queries!(states, *source_id, self.statesref.clone());
1418 }
1419 for state in states.timestamped_states.values_mut() {
1420 flush_timestamped_source(state, &states.callback);
1421 }
1422 }
1423 }
1424}
1425
1426#[zenoh_macros::unstable]
1427#[derive(Clone)]
1428struct SequencedRepliesHandler {
1429 source_id: EntityGlobalId,
1430 statesref: Arc<Mutex<State>>,
1431}
1432
1433#[zenoh_macros::unstable]
1434impl Drop for SequencedRepliesHandler {
1435 fn drop(&mut self) {
1436 let states = &mut *zlock!(self.statesref);
1437 if let Some(state) = states.sequenced_states.get_mut(&self.source_id) {
1438 state.pending_queries = state.pending_queries.saturating_sub(1);
1439 if states.global_pending_queries == 0 {
1440 tracing::trace!(
1441 "AdvancedSubscriber{{key_expr: {}}}: Flush sequenced samples",
1442 states.key_expr
1443 );
1444 flush_sequenced_source(
1445 state,
1446 &states.callback,
1447 &self.source_id,
1448 &states.miss_handlers,
1449 )
1450 }
1451 }
1452 }
1453}
1454
1455#[zenoh_macros::unstable]
1456#[derive(Clone)]
1457struct TimestampedRepliesHandler {
1458 id: ID,
1459 statesref: Arc<Mutex<State>>,
1460 callback: Callback<Sample>,
1461}
1462
1463#[zenoh_macros::unstable]
1464impl Drop for TimestampedRepliesHandler {
1465 fn drop(&mut self) {
1466 let states = &mut *zlock!(self.statesref);
1467 if let Some(state) = states.timestamped_states.get_mut(&self.id) {
1468 state.pending_queries = state.pending_queries.saturating_sub(1);
1469 if states.global_pending_queries == 0 {
1470 tracing::trace!(
1471 "AdvancedSubscriber{{key_expr: {}}}: Flush timestamped samples",
1472 states.key_expr
1473 );
1474 flush_timestamped_source(state, &self.callback);
1475 }
1476 }
1477 }
1478}
1479
1480#[zenoh_macros::unstable]
1482#[derive(Debug, Clone)]
1483pub struct Miss {
1484 source: EntityGlobalId,
1485 nb: u32,
1486}
1487
1488impl Miss {
1489 pub fn source(&self) -> EntityGlobalId {
1491 self.source
1492 }
1493
1494 pub fn nb(&self) -> u32 {
1496 self.nb
1497 }
1498}
1499
1500impl CallbackParameter for Miss {
1501 type Message<'a> = Self;
1502
1503 fn from_message(msg: Self::Message<'_>) -> Self {
1504 msg
1505 }
1506}
1507
1508#[zenoh_macros::unstable]
1513pub struct SampleMissListener<Handler> {
1514 id: usize,
1515 statesref: Arc<Mutex<State>>,
1516 handler: Handler,
1517 undeclare_on_drop: bool,
1518}
1519
1520#[zenoh_macros::unstable]
1521impl<Handler> SampleMissListener<Handler> {
1522 #[inline]
1523 pub fn undeclare(self) -> SampleMissHandlerUndeclaration<Handler>
1524 where
1525 Handler: Send,
1526 {
1527 SampleMissHandlerUndeclaration(self)
1529 }
1530
1531 fn undeclare_impl(&mut self) -> ZResult<()> {
1532 self.undeclare_on_drop = false;
1534 zlock!(self.statesref).unregister_miss_callback(&self.id);
1535 Ok(())
1536 }
1537
1538 #[zenoh_macros::internal]
1539 pub fn set_background(&mut self, background: bool) {
1540 self.undeclare_on_drop = !background;
1541 }
1542}
1543
1544#[cfg(feature = "unstable")]
1545impl<Handler> Drop for SampleMissListener<Handler> {
1546 fn drop(&mut self) {
1547 if self.undeclare_on_drop {
1548 if let Err(error) = self.undeclare_impl() {
1549 tracing::error!(error);
1550 }
1551 }
1552 }
1553}
1554
1555#[zenoh_macros::unstable]
1565impl<Handler> std::ops::Deref for SampleMissListener<Handler> {
1566 type Target = Handler;
1567
1568 fn deref(&self) -> &Self::Target {
1569 &self.handler
1570 }
1571}
1572#[zenoh_macros::unstable]
1573impl<Handler> std::ops::DerefMut for SampleMissListener<Handler> {
1574 fn deref_mut(&mut self) -> &mut Self::Target {
1575 &mut self.handler
1576 }
1577}
1578
1579#[zenoh_macros::unstable]
1581pub struct SampleMissHandlerUndeclaration<Handler>(SampleMissListener<Handler>);
1582
1583#[zenoh_macros::unstable]
1584impl<Handler> Resolvable for SampleMissHandlerUndeclaration<Handler> {
1585 type To = ZResult<()>;
1586}
1587
1588#[zenoh_macros::unstable]
1589impl<Handler> Wait for SampleMissHandlerUndeclaration<Handler> {
1590 fn wait(mut self) -> <Self as Resolvable>::To {
1591 self.0.undeclare_impl()
1592 }
1593}
1594
1595#[zenoh_macros::unstable]
1596impl<Handler> IntoFuture for SampleMissHandlerUndeclaration<Handler> {
1597 type Output = <Self as Resolvable>::To;
1598 type IntoFuture = Ready<<Self as Resolvable>::To>;
1599
1600 fn into_future(self) -> Self::IntoFuture {
1601 std::future::ready(self.wait())
1602 }
1603}
1604
1605#[zenoh_macros::unstable]
1607pub struct SampleMissListenerBuilder<'a, Handler, const BACKGROUND: bool = false> {
1608 statesref: &'a Arc<Mutex<State>>,
1609 handler: Handler,
1610}
1611
1612#[zenoh_macros::unstable]
1613impl<'a> SampleMissListenerBuilder<'a, DefaultHandler> {
1614 #[inline]
1616 #[zenoh_macros::unstable]
1617 pub fn callback<F>(self, callback: F) -> SampleMissListenerBuilder<'a, Callback<Miss>>
1618 where
1619 F: Fn(Miss) + Send + Sync + 'static,
1620 {
1621 self.with(Callback::from(callback))
1622 }
1623
1624 #[inline]
1626 #[zenoh_macros::unstable]
1627 pub fn callback_mut<F>(self, callback: F) -> SampleMissListenerBuilder<'a, Callback<Miss>>
1628 where
1629 F: FnMut(Miss) + Send + Sync + 'static,
1630 {
1631 self.callback(zenoh::handlers::locked(callback))
1632 }
1633
1634 #[inline]
1636 #[zenoh_macros::unstable]
1637 pub fn with<Handler>(self, handler: Handler) -> SampleMissListenerBuilder<'a, Handler>
1638 where
1639 Handler: IntoHandler<Miss>,
1640 {
1641 SampleMissListenerBuilder {
1642 statesref: self.statesref,
1643 handler,
1644 }
1645 }
1646}
1647
1648#[zenoh_macros::unstable]
1649impl<'a> SampleMissListenerBuilder<'a, Callback<Miss>> {
1650 #[zenoh_macros::unstable]
1654 pub fn background(self) -> SampleMissListenerBuilder<'a, Callback<Miss>, true> {
1655 SampleMissListenerBuilder {
1656 statesref: self.statesref,
1657 handler: self.handler,
1658 }
1659 }
1660}
1661
1662#[zenoh_macros::unstable]
1663impl<Handler> Resolvable for SampleMissListenerBuilder<'_, Handler>
1664where
1665 Handler: IntoHandler<Miss> + Send,
1666 Handler::Handler: Send,
1667{
1668 type To = ZResult<SampleMissListener<Handler::Handler>>;
1669}
1670
1671#[zenoh_macros::unstable]
1672impl<Handler> Wait for SampleMissListenerBuilder<'_, Handler>
1673where
1674 Handler: IntoHandler<Miss> + Send,
1675 Handler::Handler: Send,
1676{
1677 #[zenoh_macros::unstable]
1678 fn wait(self) -> <Self as Resolvable>::To {
1679 let (callback, handler) = self.handler.into_handler();
1680 let id = zlock!(self.statesref).register_miss_callback(callback);
1681 Ok(SampleMissListener {
1682 id,
1683 statesref: self.statesref.clone(),
1684 handler,
1685 undeclare_on_drop: true,
1686 })
1687 }
1688}
1689
1690#[zenoh_macros::unstable]
1691impl<Handler> IntoFuture for SampleMissListenerBuilder<'_, Handler>
1692where
1693 Handler: IntoHandler<Miss> + Send,
1694 Handler::Handler: Send,
1695{
1696 type Output = <Self as Resolvable>::To;
1697 type IntoFuture = Ready<<Self as Resolvable>::To>;
1698
1699 #[zenoh_macros::unstable]
1700 fn into_future(self) -> Self::IntoFuture {
1701 std::future::ready(self.wait())
1702 }
1703}
1704
1705#[zenoh_macros::unstable]
1706impl Resolvable for SampleMissListenerBuilder<'_, Callback<Miss>, true> {
1707 type To = ZResult<()>;
1708}
1709
1710#[zenoh_macros::unstable]
1711impl Wait for SampleMissListenerBuilder<'_, Callback<Miss>, true> {
1712 #[zenoh_macros::unstable]
1713 fn wait(self) -> <Self as Resolvable>::To {
1714 let (callback, _) = self.handler.into_handler();
1715 zlock!(self.statesref).register_miss_callback(callback);
1716 Ok(())
1717 }
1718}
1719
1720#[zenoh_macros::unstable]
1721impl IntoFuture for SampleMissListenerBuilder<'_, Callback<Miss>, true> {
1722 type Output = <Self as Resolvable>::To;
1723 type IntoFuture = Ready<<Self as Resolvable>::To>;
1724
1725 #[zenoh_macros::unstable]
1726 fn into_future(self) -> Self::IntoFuture {
1727 std::future::ready(self.wait())
1728 }
1729}