1use std::{collections::BTreeMap, future::IntoFuture, str::FromStr};
15
16use zenoh::{
17 config::ZenohId,
18 handlers::{Callback, CallbackParameter, IntoHandler},
19 key_expr::KeyExpr,
20 liveliness::{LivelinessSubscriberBuilder, LivelinessToken},
21 pubsub::SubscriberBuilder,
22 query::{
23 ConsolidationMode, Parameters, Selector, TimeBound, TimeExpr, TimeRange, ZenohParameters,
24 },
25 sample::{Locality, Sample, SampleKind, SourceSn},
26 session::{EntityGlobalId, EntityId},
27 Resolvable, Resolve, Session, Wait, KE_ADV_PREFIX, KE_EMPTY, KE_PUB, KE_STAR, KE_STARSTAR,
28 KE_SUB,
29};
30use zenoh_util::{Timed, TimedEvent, Timer};
31#[zenoh_macros::unstable]
32use {
33 async_trait::async_trait,
34 std::collections::hash_map::Entry,
35 std::collections::HashMap,
36 std::convert::TryFrom,
37 std::future::Ready,
38 std::sync::{Arc, Mutex},
39 std::time::Duration,
40 uhlc::ID,
41 zenoh::handlers::{locked, DefaultHandler},
42 zenoh::internal::{runtime::ZRuntime, zlock},
43 zenoh::pubsub::Subscriber,
44 zenoh::query::{QueryTarget, Reply, ReplyKeyExpr},
45 zenoh::time::Timestamp,
46 zenoh::Result as ZResult,
47};
48
49use crate::{
50 advanced_cache::{ke_liveliness, KE_UHLC},
51 z_deserialize,
52};
53
54#[derive(Debug, Default, Clone)]
55#[zenoh_macros::unstable]
57pub struct HistoryConfig {
58 liveliness: bool,
59 sample_depth: Option<usize>,
60 age: Option<f64>,
61}
62
63#[zenoh_macros::unstable]
64impl HistoryConfig {
65 #[inline]
70 #[zenoh_macros::unstable]
71 pub fn detect_late_publishers(mut self) -> Self {
72 self.liveliness = true;
73 self
74 }
75
76 #[zenoh_macros::unstable]
78 pub fn max_samples(mut self, depth: usize) -> Self {
79 self.sample_depth = Some(depth);
80 self
81 }
82
83 #[zenoh_macros::unstable]
85 pub fn max_age(mut self, seconds: f64) -> Self {
86 self.age = Some(seconds);
87 self
88 }
89}
90
91#[derive(Debug, Default, Clone, Copy)]
92#[zenoh_macros::unstable]
94pub struct RecoveryConfig<const CONFIGURED: bool = true> {
95 periodic_queries: Option<Duration>,
96 heartbeat: bool,
97}
98
99#[zenoh_macros::unstable]
100impl RecoveryConfig<false> {
101 #[zenoh_macros::unstable]
110 #[inline]
111 pub fn periodic_queries(self, period: Duration) -> RecoveryConfig<true> {
112 RecoveryConfig {
113 periodic_queries: Some(period),
114 heartbeat: false,
115 }
116 }
117
118 #[zenoh_macros::unstable]
127 #[inline]
128 pub fn heartbeat(self) -> RecoveryConfig<true> {
129 RecoveryConfig {
130 periodic_queries: None,
131 heartbeat: true,
132 }
133 }
134}
135
136#[zenoh_macros::unstable]
138pub struct AdvancedSubscriberBuilder<'a, 'b, 'c, Handler, const BACKGROUND: bool = false> {
139 pub(crate) session: &'a Session,
140 pub(crate) key_expr: ZResult<KeyExpr<'b>>,
141 pub(crate) origin: Locality,
142 pub(crate) retransmission: Option<RecoveryConfig>,
143 pub(crate) query_target: QueryTarget,
144 pub(crate) query_timeout: Duration,
145 pub(crate) history: Option<HistoryConfig>,
146 pub(crate) liveliness: bool,
147 pub(crate) meta_key_expr: Option<ZResult<KeyExpr<'c>>>,
148 pub(crate) handler: Handler,
149}
150
151#[zenoh_macros::unstable]
152impl<'a, 'b, Handler> AdvancedSubscriberBuilder<'a, 'b, '_, Handler> {
153 #[zenoh_macros::unstable]
154 pub(crate) fn new(builder: SubscriberBuilder<'a, 'b, Handler>) -> Self {
155 AdvancedSubscriberBuilder {
156 session: builder.session,
157 key_expr: builder.key_expr,
158 origin: builder.origin,
159 handler: builder.handler,
160 retransmission: None,
161 query_target: QueryTarget::All,
162 query_timeout: Duration::from_secs(10),
163 history: None,
164 liveliness: false,
165 meta_key_expr: None,
166 }
167 }
168}
169
170#[zenoh_macros::unstable]
171impl<'a, 'b, 'c> AdvancedSubscriberBuilder<'a, 'b, 'c, DefaultHandler> {
172 #[inline]
174 #[zenoh_macros::unstable]
175 pub fn callback<F>(self, callback: F) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback<Sample>>
176 where
177 F: Fn(Sample) + Send + Sync + 'static,
178 {
179 self.with(Callback::from(callback))
180 }
181
182 #[inline]
187 #[zenoh_macros::unstable]
188 pub fn callback_mut<F>(
189 self,
190 callback: F,
191 ) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback<Sample>>
192 where
193 F: FnMut(Sample) + Send + Sync + 'static,
194 {
195 self.callback(locked(callback))
196 }
197
198 #[inline]
200 #[zenoh_macros::unstable]
201 pub fn with<Handler>(self, handler: Handler) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler>
202 where
203 Handler: IntoHandler<Sample>,
204 {
205 AdvancedSubscriberBuilder {
206 session: self.session,
207 key_expr: self.key_expr,
208 origin: self.origin,
209 retransmission: self.retransmission,
210 query_target: self.query_target,
211 query_timeout: self.query_timeout,
212 history: self.history,
213 liveliness: self.liveliness,
214 meta_key_expr: self.meta_key_expr,
215 handler,
216 }
217 }
218}
219
220#[zenoh_macros::unstable]
221impl<'a, 'b, 'c> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback<Sample>> {
222 pub fn background(self) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback<Sample>, true> {
226 AdvancedSubscriberBuilder {
227 session: self.session,
228 key_expr: self.key_expr,
229 origin: self.origin,
230 retransmission: self.retransmission,
231 query_target: self.query_target,
232 query_timeout: self.query_timeout,
233 history: self.history,
234 liveliness: self.liveliness,
235 meta_key_expr: self.meta_key_expr,
236 handler: self.handler,
237 }
238 }
239}
240
241#[zenoh_macros::unstable]
242impl<'a, 'c, Handler, const BACKGROUND: bool>
243 AdvancedSubscriberBuilder<'a, '_, 'c, Handler, BACKGROUND>
244{
245 #[zenoh_macros::unstable]
248 #[inline]
249 pub fn allowed_origin(mut self, origin: Locality) -> Self {
250 self.origin = origin;
251 self
252 }
253
254 #[zenoh_macros::unstable]
260 #[inline]
261 pub fn recovery(mut self, conf: RecoveryConfig) -> Self {
262 self.retransmission = Some(conf);
263 self
264 }
265
266 #[zenoh_macros::unstable]
276 #[inline]
277 pub fn query_timeout(mut self, query_timeout: Duration) -> Self {
278 self.query_timeout = query_timeout;
279 self
280 }
281
282 #[zenoh_macros::unstable]
286 #[inline]
287 pub fn history(mut self, config: HistoryConfig) -> Self {
288 self.history = Some(config);
289 self
290 }
291
292 #[zenoh_macros::unstable]
294 pub fn subscriber_detection(mut self) -> Self {
295 self.liveliness = true;
296 self
297 }
298
299 #[zenoh_macros::unstable]
302 pub fn subscriber_detection_metadata<TryIntoKeyExpr>(mut self, meta: TryIntoKeyExpr) -> Self
303 where
304 TryIntoKeyExpr: TryInto<KeyExpr<'c>>,
305 <TryIntoKeyExpr as TryInto<KeyExpr<'c>>>::Error: Into<zenoh::Error>,
306 {
307 self.meta_key_expr = Some(meta.try_into().map_err(Into::into));
308 self
309 }
310
311 #[zenoh_macros::unstable]
312 fn with_static_keys(self) -> AdvancedSubscriberBuilder<'a, 'static, 'static, Handler> {
313 AdvancedSubscriberBuilder {
314 session: self.session,
315 key_expr: self.key_expr.map(|s| s.into_owned()),
316 origin: self.origin,
317 retransmission: self.retransmission,
318 query_target: self.query_target,
319 query_timeout: self.query_timeout,
320 history: self.history,
321 liveliness: self.liveliness,
322 meta_key_expr: self.meta_key_expr.map(|s| s.map(|s| s.into_owned())),
323 handler: self.handler,
324 }
325 }
326}
327
328#[zenoh_macros::unstable]
329impl<Handler> Resolvable for AdvancedSubscriberBuilder<'_, '_, '_, Handler>
330where
331 Handler: IntoHandler<Sample>,
332 Handler::Handler: Send,
333{
334 type To = ZResult<AdvancedSubscriber<Handler::Handler>>;
335}
336
337#[zenoh_macros::unstable]
338impl<Handler> Wait for AdvancedSubscriberBuilder<'_, '_, '_, Handler>
339where
340 Handler: IntoHandler<Sample> + Send,
341 Handler::Handler: Send,
342{
343 #[zenoh_macros::unstable]
344 fn wait(self) -> <Self as Resolvable>::To {
345 AdvancedSubscriber::new(self.with_static_keys())
346 }
347}
348
349#[zenoh_macros::unstable]
350impl<Handler> IntoFuture for AdvancedSubscriberBuilder<'_, '_, '_, Handler>
351where
352 Handler: IntoHandler<Sample> + Send,
353 Handler::Handler: Send,
354{
355 type Output = <Self as Resolvable>::To;
356 type IntoFuture = Ready<<Self as Resolvable>::To>;
357
358 #[zenoh_macros::unstable]
359 fn into_future(self) -> Self::IntoFuture {
360 std::future::ready(self.wait())
361 }
362}
363
364#[zenoh_macros::unstable]
365impl Resolvable for AdvancedSubscriberBuilder<'_, '_, '_, Callback<Sample>, true> {
366 type To = ZResult<()>;
367}
368
369#[zenoh_macros::unstable]
370impl Wait for AdvancedSubscriberBuilder<'_, '_, '_, Callback<Sample>, true> {
371 #[zenoh_macros::unstable]
372 fn wait(self) -> <Self as Resolvable>::To {
373 let mut sub = AdvancedSubscriber::new(self.with_static_keys())?;
374 sub.set_background_impl(true);
375 Ok(())
376 }
377}
378
379#[zenoh_macros::unstable]
380impl IntoFuture for AdvancedSubscriberBuilder<'_, '_, '_, Callback<Sample>, true> {
381 type Output = <Self as Resolvable>::To;
382 type IntoFuture = Ready<<Self as Resolvable>::To>;
383
384 #[zenoh_macros::unstable]
385 fn into_future(self) -> Self::IntoFuture {
386 std::future::ready(self.wait())
387 }
388}
389
390#[zenoh_macros::unstable]
391struct Period {
392 timer: Timer,
393 period: Duration,
394}
395
396#[zenoh_macros::unstable]
397struct State {
398 next_id: usize,
399 global_pending_queries: u64,
400 sequenced_states: HashMap<EntityGlobalId, SourceState<u32>>,
401 timestamped_states: HashMap<ID, SourceState<Timestamp>>,
402 session: Session,
403 key_expr: KeyExpr<'static>,
404 retransmission: bool,
405 period: Option<Period>,
406 history_depth: usize,
407 query_target: QueryTarget,
408 query_timeout: Duration,
409 callback: Callback<Sample>,
410 miss_handlers: HashMap<usize, Callback<Miss>>,
411 token: Option<LivelinessToken>,
412}
413
414#[zenoh_macros::unstable]
415impl State {
416 #[zenoh_macros::unstable]
417 fn register_miss_callback(&mut self, callback: Callback<Miss>) -> usize {
418 let id = self.next_id;
419 self.next_id += 1;
420 self.miss_handlers.insert(id, callback);
421 id
422 }
423 #[zenoh_macros::unstable]
424 fn unregister_miss_callback(&mut self, id: &usize) {
425 self.miss_handlers.remove(id);
426 }
427}
428
429macro_rules! spawn_periodoic_queries {
430 ($p:expr,$s:expr,$r:expr) => {{
431 if let Some(period) = &$p.period {
432 period.timer.add(TimedEvent::periodic(
433 period.period,
434 PeriodicQuery {
435 source_id: $s,
436 statesref: $r,
437 },
438 ))
439 }
440 }};
441}
442
443#[zenoh_macros::unstable]
444struct SourceState<T> {
445 last_delivered: Option<T>,
446 pending_queries: u64,
447 pending_samples: BTreeMap<T, Sample>,
448}
449
450#[zenoh_macros::unstable]
452pub struct AdvancedSubscriber<Receiver> {
453 statesref: Arc<Mutex<State>>,
454 subscriber: Subscriber<()>,
455 receiver: Receiver,
456 liveliness_subscriber: Option<Subscriber<()>>,
457 heartbeat_subscriber: Option<Subscriber<()>>,
458}
459
460#[zenoh_macros::unstable]
461impl<Receiver> std::ops::Deref for AdvancedSubscriber<Receiver> {
462 type Target = Receiver;
463 fn deref(&self) -> &Self::Target {
464 &self.receiver
465 }
466}
467
468#[zenoh_macros::unstable]
469impl<Receiver> std::ops::DerefMut for AdvancedSubscriber<Receiver> {
470 fn deref_mut(&mut self) -> &mut Self::Target {
471 &mut self.receiver
472 }
473}
474
475#[zenoh_macros::unstable]
476fn handle_sample(states: &mut State, sample: Sample) -> bool {
477 if let (Some(source_id), Some(source_sn)) = (
478 sample.source_info().source_id(),
479 sample.source_info().source_sn(),
480 ) {
481 #[inline]
482 fn deliver_and_flush(
483 sample: Sample,
484 mut source_sn: SourceSn,
485 callback: &Callback<Sample>,
486 state: &mut SourceState<u32>,
487 ) {
488 callback.call(sample);
489 state.last_delivered = Some(source_sn);
490 while let Some(sample) = state.pending_samples.remove(&(source_sn + 1)) {
491 callback.call(sample);
492 source_sn += 1;
493 state.last_delivered = Some(source_sn);
494 }
495 }
496
497 let entry = states.sequenced_states.entry(*source_id);
498 let new = matches!(&entry, Entry::Vacant(_));
499 let state = entry.or_insert(SourceState::<u32> {
500 last_delivered: None,
501 pending_queries: 0,
502 pending_samples: BTreeMap::new(),
503 });
504 if state.last_delivered.is_none() && states.global_pending_queries != 0 {
505 if states.history_depth == 1 {
507 state.last_delivered = Some(source_sn);
508 states.callback.call(sample);
509 } else {
510 state.pending_samples.insert(source_sn, sample);
511 if state.pending_samples.len() >= states.history_depth {
512 if let Some((sn, sample)) = state.pending_samples.pop_first() {
513 deliver_and_flush(sample, sn, &states.callback, state);
514 }
515 }
516 }
517 } else if state.last_delivered.is_some() && source_sn != state.last_delivered.unwrap() + 1 {
518 if source_sn > state.last_delivered.unwrap() {
519 if states.retransmission {
520 state.pending_samples.insert(source_sn, sample);
521 } else {
522 tracing::info!(
523 "Sample missed: missed {} samples from {:?}.",
524 source_sn - state.last_delivered.unwrap() - 1,
525 source_id,
526 );
527 for miss_callback in states.miss_handlers.values() {
528 miss_callback.call(Miss {
529 source: *source_id,
530 nb: source_sn - state.last_delivered.unwrap() - 1,
531 });
532 }
533 states.callback.call(sample);
534 state.last_delivered = Some(source_sn);
535 }
536 }
537 } else {
538 deliver_and_flush(sample, source_sn, &states.callback, state);
539 }
540 new
541 } else if let Some(timestamp) = sample.timestamp() {
542 let entry = states.timestamped_states.entry(*timestamp.get_id());
543 let state = entry.or_insert(SourceState::<Timestamp> {
544 last_delivered: None,
545 pending_queries: 0,
546 pending_samples: BTreeMap::new(),
547 });
548 if state.last_delivered.map(|t| t < *timestamp).unwrap_or(true) {
549 if (states.global_pending_queries == 0 && state.pending_queries == 0)
550 || states.history_depth == 1
551 {
552 state.last_delivered = Some(*timestamp);
553 states.callback.call(sample);
554 } else {
555 state.pending_samples.entry(*timestamp).or_insert(sample);
556 if state.pending_samples.len() >= states.history_depth {
557 flush_timestamped_source(state, &states.callback);
558 }
559 }
560 }
561 false
562 } else {
563 states.callback.call(sample);
564 false
565 }
566}
567
568#[zenoh_macros::unstable]
569fn seq_num_range(start: Option<u32>, end: Option<u32>) -> String {
570 match (start, end) {
571 (Some(start), Some(end)) => format!("_sn={start}..{end}"),
572 (Some(start), None) => format!("_sn={start}.."),
573 (None, Some(end)) => format!("_sn=..{end}"),
574 (None, None) => "_sn=..".to_string(),
575 }
576}
577
578#[zenoh_macros::unstable]
579#[derive(Clone)]
580struct PeriodicQuery {
581 source_id: EntityGlobalId,
582 statesref: Arc<Mutex<State>>,
583}
584
585#[zenoh_macros::unstable]
586#[async_trait]
587impl Timed for PeriodicQuery {
588 async fn run(&mut self) {
589 let mut lock = zlock!(self.statesref);
590 let states = &mut *lock;
591 if let Some(state) = states.sequenced_states.get_mut(&self.source_id) {
592 state.pending_queries += 1;
593 let query_expr = &states.key_expr
594 / KE_ADV_PREFIX
595 / KE_STAR
596 / &self.source_id.zid().into_keyexpr()
597 / &KeyExpr::try_from(self.source_id.eid().to_string()).unwrap()
598 / KE_STARSTAR;
599 let seq_num_range = seq_num_range(state.last_delivered.map(|s| s + 1), None);
600
601 let session = states.session.clone();
602 let key_expr = states.key_expr.clone().into_owned();
603 let query_target = states.query_target;
604 let query_timeout = states.query_timeout;
605
606 tracing::trace!(
607 "AdvancedSubscriber{{key_expr: {}}}: Querying undelivered samples {}?{}",
608 states.key_expr,
609 query_expr,
610 seq_num_range
611 );
612 drop(lock);
613
614 let handler = SequencedRepliesHandler {
615 source_id: self.source_id,
616 statesref: self.statesref.clone(),
617 };
618 let _ = session
619 .get(Selector::from((query_expr, seq_num_range)))
620 .callback({
621 move |r: Reply| {
622 if let Ok(s) = r.into_result() {
623 if key_expr.intersects(s.key_expr()) {
624 let states = &mut *zlock!(handler.statesref);
625 tracing::trace!(
626 "AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}",
627 states.key_expr,
628 s.source_info(),
629 s.timestamp()
630 );
631 handle_sample(states, s);
632 }
633 }
634 }
635 })
636 .consolidation(ConsolidationMode::None)
637 .accept_replies(ReplyKeyExpr::Any)
638 .target(query_target)
639 .timeout(query_timeout)
640 .wait();
641 }
642 }
643}
644
645#[zenoh_macros::unstable]
646impl<Handler> AdvancedSubscriber<Handler> {
647 fn new<H>(conf: AdvancedSubscriberBuilder<'_, '_, '_, H>) -> ZResult<Self>
648 where
649 H: IntoHandler<Sample, Handler = Handler> + Send,
650 {
651 let (callback, receiver) = conf.handler.into_handler();
652 let key_expr = conf.key_expr?;
653 let meta = match conf.meta_key_expr {
654 Some(meta) => Some(meta?),
655 None => None,
656 };
657 let retransmission = conf.retransmission;
658 let query_target = conf.query_target;
659 let query_timeout = conf.query_timeout;
660 let session = conf.session.clone();
661 let statesref = Arc::new(Mutex::new(State {
662 next_id: 0,
663 sequenced_states: HashMap::new(),
664 timestamped_states: HashMap::new(),
665 global_pending_queries: if conf.history.is_some() { 1 } else { 0 },
666 session,
667 period: retransmission.as_ref().and_then(|r| {
668 let _rt = ZRuntime::Application.enter();
669 r.periodic_queries.map(|p| Period {
670 timer: Timer::new(false),
671 period: p,
672 })
673 }),
674 key_expr: key_expr.clone().into_owned(),
675 retransmission: retransmission.is_some(),
676 history_depth: conf
677 .history
678 .as_ref()
679 .and_then(|h| h.sample_depth)
680 .unwrap_or_default(),
681 query_target: conf.query_target,
682 query_timeout: conf.query_timeout,
683 callback: callback.clone(),
684 miss_handlers: HashMap::new(),
685 token: None,
686 }));
687
688 let sub_callback = {
689 let statesref = statesref.clone();
690 let session = conf.session.clone();
691 let key_expr = key_expr.clone().into_owned();
692
693 move |s: Sample| {
694 let mut lock = zlock!(statesref);
695 let states = &mut *lock;
696 let source_id = s.source_info().source_id().cloned();
697 let new = handle_sample(states, s);
698
699 if let Some(source_id) = source_id {
700 if new {
701 spawn_periodoic_queries!(states, source_id, statesref.clone());
702 }
703
704 if let Some(state) = states.sequenced_states.get_mut(&source_id) {
705 if retransmission.is_some()
706 && state.pending_queries == 0
707 && !state.pending_samples.is_empty()
708 {
709 state.pending_queries += 1;
710 let query_expr = &key_expr
711 / KE_ADV_PREFIX
712 / KE_STAR
713 / &source_id.zid().into_keyexpr()
714 / &KeyExpr::try_from(source_id.eid().to_string()).unwrap()
715 / KE_STARSTAR;
716 let seq_num_range =
717 seq_num_range(state.last_delivered.map(|s| s + 1), None);
718 tracing::trace!(
719 "AdvancedSubscriber{{key_expr: {}}}: Querying missing samples {}?{}",
720 states.key_expr,
721 query_expr,
722 seq_num_range
723 );
724 drop(lock);
725 let handler = SequencedRepliesHandler {
726 source_id,
727 statesref: statesref.clone(),
728 };
729 let _ = session
730 .get(Selector::from((query_expr, seq_num_range)))
731 .callback({
732 let key_expr = key_expr.clone().into_owned();
733 move |r: Reply| {
734 if let Ok(s) = r.into_result() {
735 if key_expr.intersects(s.key_expr()) {
736 let states = &mut *zlock!(handler.statesref);
737 tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
738 handle_sample(states, s);
739 }
740 }
741 }
742 })
743 .consolidation(ConsolidationMode::None)
744 .accept_replies(ReplyKeyExpr::Any)
745 .target(query_target)
746 .timeout(query_timeout)
747 .wait();
748 }
749 }
750 }
751 }
752 };
753
754 let subscriber = conf
755 .session
756 .declare_subscriber(&key_expr)
757 .callback(sub_callback)
758 .allowed_origin(conf.origin)
759 .wait()?;
760
761 tracing::debug!("Create AdvancedSubscriber{{key_expr: {}}}", key_expr,);
762
763 if let Some(historyconf) = conf.history.as_ref() {
764 let handler = InitialRepliesHandler {
765 statesref: statesref.clone(),
766 };
767 let mut params = Parameters::empty();
768 if let Some(max) = historyconf.sample_depth {
769 params.insert("_max", max.to_string());
770 }
771 if let Some(age) = historyconf.age {
772 params.set_time_range(TimeRange {
773 start: TimeBound::Inclusive(TimeExpr::Now { offset_secs: -age }),
774 end: TimeBound::Unbounded,
775 });
776 }
777 tracing::trace!(
778 "AdvancedSubscriber{{key_expr: {}}} Querying historical samples {}?{}",
779 key_expr,
780 &key_expr / KE_ADV_PREFIX / KE_STARSTAR,
781 params
782 );
783 let _ = conf
784 .session
785 .get(Selector::from((
786 &key_expr / KE_ADV_PREFIX / KE_STARSTAR,
787 params,
788 )))
789 .callback({
790 let key_expr = key_expr.clone().into_owned();
791 move |r: Reply| {
792 if let Ok(s) = r.into_result() {
793 if key_expr.intersects(s.key_expr()) {
794 let states = &mut *zlock!(handler.statesref);
795 tracing::trace!(
796 "AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}",
797 states.key_expr,
798 s.source_info(),
799 s.timestamp()
800 );
801 handle_sample(states, s);
802 }
803 }
804 }
805 })
806 .consolidation(ConsolidationMode::None)
807 .accept_replies(ReplyKeyExpr::Any)
808 .target(query_target)
809 .timeout(query_timeout)
810 .wait();
811 }
812
813 let liveliness_subscriber = if let Some(historyconf) = conf.history.as_ref() {
814 if historyconf.liveliness {
815 let live_callback = {
816 let session = conf.session.clone();
817 let statesref = statesref.clone();
818 let key_expr = key_expr.clone().into_owned();
819 let historyconf = historyconf.clone();
820 move |s: Sample| {
821 if s.kind() == SampleKind::Put {
822 if let Ok(parsed) = ke_liveliness::parse(s.key_expr().as_keyexpr()) {
823 if let Ok(zid) = ZenohId::from_str(parsed.zid().as_str()) {
824 if parsed.eid() == KE_UHLC {
827 let mut lock = zlock!(statesref);
828 let states = &mut *lock;
829 tracing::trace!(
830 "AdvancedSubscriber{{key_expr: {}}}: Detect late joiner publishers with zid={}",
831 states.key_expr,
832 parsed.zid().as_str()
833 );
834 let entry = states.timestamped_states.entry(ID::from(zid));
835 let state = entry.or_insert(SourceState::<Timestamp> {
836 last_delivered: None,
837 pending_queries: 0,
838 pending_samples: BTreeMap::new(),
839 });
840 state.pending_queries += 1;
841
842 let mut params = Parameters::empty();
843 if let Some(max) = historyconf.sample_depth {
844 params.insert("_max", max.to_string());
845 }
846 if let Some(age) = historyconf.age {
847 params.set_time_range(TimeRange {
848 start: TimeBound::Inclusive(TimeExpr::Now {
849 offset_secs: -age,
850 }),
851 end: TimeBound::Unbounded,
852 });
853 }
854 tracing::trace!(
855 "AdvancedSubscriber{{key_expr: {}}}: Querying historical samples {}?{}",
856 states.key_expr,
857 s.key_expr(),
858 params
859 );
860 drop(lock);
861
862 let handler = TimestampedRepliesHandler {
863 id: ID::from(zid),
864 statesref: statesref.clone(),
865 callback: callback.clone(),
866 };
867 let _ = session
868 .get(Selector::from((s.key_expr(), params)))
869 .callback({
870 let key_expr = key_expr.clone().into_owned();
871 move |r: Reply| {
872 if let Ok(s) = r.into_result() {
873 if key_expr.intersects(s.key_expr()) {
874 let states =
875 &mut *zlock!(handler.statesref);
876 tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
877 handle_sample(states, s);
878 }
879 }
880 }
881 })
882 .consolidation(ConsolidationMode::None)
883 .accept_replies(ReplyKeyExpr::Any)
884 .target(query_target)
885 .timeout(query_timeout)
886 .wait();
887 } else if let Ok(eid) =
888 EntityId::from_str(parsed.eid().as_str())
889 {
890 let source_id = EntityGlobalId::new(zid, eid);
891 let mut lock = zlock!(statesref);
892 let states = &mut *lock;
893 tracing::trace!(
894 "AdvancedSubscriber{{key_expr: {}}}: Detect late joiner publishers with zid={}",
895 states.key_expr,
896 parsed.zid().as_str()
897 );
898 let entry = states.sequenced_states.entry(source_id);
899 let new = matches!(&entry, Entry::Vacant(_));
900 let state = entry.or_insert(SourceState::<u32> {
901 last_delivered: None,
902 pending_queries: 0,
903 pending_samples: BTreeMap::new(),
904 });
905 state.pending_queries += 1;
906
907 let mut params = Parameters::empty();
908 if let Some(max) = historyconf.sample_depth {
909 params.insert("_max", max.to_string());
910 }
911 if let Some(age) = historyconf.age {
912 params.set_time_range(TimeRange {
913 start: TimeBound::Inclusive(TimeExpr::Now {
914 offset_secs: -age,
915 }),
916 end: TimeBound::Unbounded,
917 });
918 }
919 tracing::trace!(
920 "AdvancedSubscriber{{key_expr: {}}}: Querying historical samples {}?{}",
921 states.key_expr,
922 s.key_expr(),
923 params,
924 );
925 drop(lock);
926
927 let handler = SequencedRepliesHandler {
928 source_id,
929 statesref: statesref.clone(),
930 };
931 let _ = session
932 .get(Selector::from((s.key_expr(), params)))
933 .callback({
934 let key_expr = key_expr.clone().into_owned();
935 move |r: Reply| {
936 if let Ok(s) = r.into_result() {
937 if key_expr.intersects(s.key_expr()) {
938 let states =
939 &mut *zlock!(handler.statesref);
940 tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
941 handle_sample(states, s);
942 }
943 }
944 }
945 })
946 .consolidation(ConsolidationMode::None)
947 .accept_replies(ReplyKeyExpr::Any)
948 .target(query_target)
949 .timeout(query_timeout)
950 .wait();
951
952 if new {
953 spawn_periodoic_queries!(
954 zlock!(statesref),
955 source_id,
956 statesref.clone()
957 );
958 }
959 }
960 } else {
961 let mut lock = zlock!(statesref);
962 let states = &mut *lock;
963 tracing::trace!(
964 "AdvancedSubscriber{{key_expr: {}}}: Detect late joiner publishers with zid={}",
965 states.key_expr,
966 parsed.zid().as_str()
967 );
968 states.global_pending_queries += 1;
969
970 let mut params = Parameters::empty();
971 if let Some(max) = historyconf.sample_depth {
972 params.insert("_max", max.to_string());
973 }
974 if let Some(age) = historyconf.age {
975 params.set_time_range(TimeRange {
976 start: TimeBound::Inclusive(TimeExpr::Now {
977 offset_secs: -age,
978 }),
979 end: TimeBound::Unbounded,
980 });
981 }
982 tracing::trace!(
983 "AdvancedSubscriber{{key_expr: {}}}: Querying historical samples {}?{}",
984 states.key_expr,
985 s.key_expr(),
986 params,
987 );
988 drop(lock);
989
990 let handler = InitialRepliesHandler {
991 statesref: statesref.clone(),
992 };
993 let _ = session
994 .get(Selector::from((s.key_expr(), params)))
995 .callback({
996 let key_expr = key_expr.clone().into_owned();
997 move |r: Reply| {
998 if let Ok(s) = r.into_result() {
999 if key_expr.intersects(s.key_expr()) {
1000 let states =
1001 &mut *zlock!(handler.statesref);
1002 tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
1003 handle_sample(states, s);
1004 }
1005 }
1006 }
1007 })
1008 .consolidation(ConsolidationMode::None)
1009 .accept_replies(ReplyKeyExpr::Any)
1010 .target(query_target)
1011 .timeout(query_timeout)
1012 .wait();
1013 }
1014 } else {
1015 tracing::warn!(
1016 "AdvancedSubscriber{{}}: Received malformed liveliness token key expression: {}",
1017 s.key_expr()
1018 );
1019 }
1020 }
1021 }
1022 };
1023
1024 tracing::debug!(
1025 "AdvancedSubscriber{{key_expr: {}}}: Detect late joiner publishers on {}",
1026 key_expr,
1027 &key_expr / KE_ADV_PREFIX / KE_PUB / KE_STARSTAR
1028 );
1029 Some(
1030 conf.session
1031 .liveliness()
1032 .declare_subscriber(&key_expr / KE_ADV_PREFIX / KE_PUB / KE_STARSTAR)
1033 .history(true)
1035 .callback(live_callback)
1036 .wait()?,
1037 )
1038 } else {
1039 None
1040 }
1041 } else {
1042 None
1043 };
1044
1045 let heartbeat_subscriber = if retransmission.is_some_and(|r| r.heartbeat) {
1046 let ke_heartbeat_sub = &key_expr / KE_ADV_PREFIX / KE_PUB / KE_STARSTAR;
1047 let statesref = statesref.clone();
1048 tracing::debug!(
1049 "AdvancedSubscriber{{key_expr: {}}}: Enable heartbeat subscriber on {}",
1050 key_expr,
1051 ke_heartbeat_sub
1052 );
1053 let heartbeat_sub = conf
1054 .session
1055 .declare_subscriber(ke_heartbeat_sub)
1056 .callback(move |sample_hb| {
1057 if sample_hb.kind() != SampleKind::Put {
1058 return;
1059 }
1060
1061 let heartbeat_keyexpr = sample_hb.key_expr().as_keyexpr();
1062 let Ok(parsed_keyexpr) = ke_liveliness::parse(heartbeat_keyexpr) else {
1063 return;
1064 };
1065 let source_id = {
1066 let Ok(zid) = ZenohId::from_str(parsed_keyexpr.zid().as_str()) else {
1067 return;
1068 };
1069 let Ok(eid) = EntityId::from_str(parsed_keyexpr.eid().as_str()) else {
1070 return;
1071 };
1072 EntityGlobalId::new(zid, eid)
1073 };
1074
1075 let Ok(heartbeat_sn) = z_deserialize::<u32>(sample_hb.payload()) else {
1076 tracing::debug!(
1077 "AdvancedSubscriber{{}}: Skipping invalid heartbeat payload on '{}'",
1078 heartbeat_keyexpr
1079 );
1080 return;
1081 };
1082
1083 let mut lock = zlock!(statesref);
1084 let states = &mut *lock;
1085 let entry = states.sequenced_states.entry(source_id);
1086 if matches!(&entry, Entry::Vacant(_)) {
1087 spawn_periodoic_queries!(states, source_id, statesref.clone());
1089 if states.global_pending_queries > 0 {
1090 tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Skipping heartbeat on '{}' from publisher that is currently being pulled by global query", states.key_expr, heartbeat_keyexpr);
1091 return;
1092 }
1093 }
1094
1095 let state = entry.or_insert(SourceState::<u32> {
1096 last_delivered: None,
1097 pending_queries: 0,
1098 pending_samples: BTreeMap::new(),
1099 });
1100
1101 if (state.last_delivered.is_none()
1103 || state.last_delivered.is_some_and(|sn| heartbeat_sn > sn))
1104 && state.pending_queries == 0
1105 {
1106 let seq_num_range = seq_num_range(
1107 state.last_delivered.map(|s| s + 1),
1108 Some(heartbeat_sn),
1109 );
1110
1111 let session = states.session.clone();
1112 let key_expr = states.key_expr.clone().into_owned();
1113 let query_target = states.query_target;
1114 let query_timeout = states.query_timeout;
1115 state.pending_queries += 1;
1116
1117 tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Querying missing samples {}?{}", states.key_expr, heartbeat_keyexpr, seq_num_range);
1118 drop(lock);
1119
1120 let handler = SequencedRepliesHandler {
1121 source_id,
1122 statesref: statesref.clone(),
1123 };
1124 let _ = session
1125 .get(Selector::from((heartbeat_keyexpr, seq_num_range)))
1126 .callback({
1127 move |r: Reply| {
1128 if let Ok(s) = r.into_result() {
1129 if key_expr.intersects(s.key_expr()) {
1130 let states = &mut *zlock!(handler.statesref);
1131 tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
1132 handle_sample(states, s);
1133 }
1134 }
1135 }
1136 })
1137 .consolidation(ConsolidationMode::None)
1138 .accept_replies(ReplyKeyExpr::Any)
1139 .target(query_target)
1140 .timeout(query_timeout)
1141 .wait();
1142 }
1143 })
1144 .allowed_origin(conf.origin)
1145 .wait()?;
1146 Some(heartbeat_sub)
1147 } else {
1148 None
1149 };
1150
1151 if conf.liveliness {
1152 let suffix = KE_ADV_PREFIX
1153 / KE_SUB
1154 / &subscriber.id().zid().into_keyexpr()
1155 / &KeyExpr::try_from(subscriber.id().eid().to_string()).unwrap();
1156 let suffix = match meta {
1157 Some(meta) => suffix / &meta,
1158 _ => suffix / KE_EMPTY,
1160 };
1161 tracing::debug!(
1162 "AdvancedSubscriber{{key_expr: {}}}: Declare liveliness token {}",
1163 key_expr,
1164 &key_expr / &suffix,
1165 );
1166 let token = conf
1167 .session
1168 .liveliness()
1169 .declare_token(&key_expr / &suffix)
1170 .wait()?;
1171 zlock!(statesref).token = Some(token)
1172 }
1173
1174 let reliable_subscriber = AdvancedSubscriber {
1175 statesref,
1176 subscriber,
1177 receiver,
1178 liveliness_subscriber,
1179 heartbeat_subscriber,
1180 };
1181
1182 Ok(reliable_subscriber)
1183 }
1184
1185 #[zenoh_macros::unstable]
1187 pub fn id(&self) -> EntityGlobalId {
1188 self.subscriber.id()
1189 }
1190
1191 #[zenoh_macros::unstable]
1193 pub fn key_expr(&self) -> &KeyExpr<'static> {
1194 self.subscriber.key_expr()
1195 }
1196
1197 #[zenoh_macros::unstable]
1201 pub fn handler(&self) -> &Handler {
1202 &self.receiver
1203 }
1204
1205 #[zenoh_macros::unstable]
1209 pub fn handler_mut(&mut self) -> &mut Handler {
1210 &mut self.receiver
1211 }
1212
1213 #[zenoh_macros::unstable]
1218 pub fn sample_miss_listener(&self) -> SampleMissListenerBuilder<'_, DefaultHandler> {
1219 SampleMissListenerBuilder {
1220 statesref: &self.statesref,
1221 handler: DefaultHandler::default(),
1222 }
1223 }
1224
1225 #[zenoh_macros::unstable]
1230 pub fn detect_publishers(&self) -> LivelinessSubscriberBuilder<'_, '_, DefaultHandler> {
1231 self.subscriber
1232 .session()
1233 .liveliness()
1234 .declare_subscriber(self.subscriber.key_expr() / KE_ADV_PREFIX / KE_PUB / KE_STARSTAR)
1235 }
1236
1237 #[inline]
1239 #[zenoh_macros::unstable]
1240 pub fn undeclare(self) -> impl Resolve<ZResult<()>> {
1241 tracing::debug!(
1242 "AdvancedSubscriber{{key_expr: {}}}: Undeclare",
1243 self.key_expr()
1244 );
1245 self.subscriber.undeclare()
1246 }
1247
1248 fn set_background_impl(&mut self, background: bool) {
1249 self.subscriber.set_background(background);
1250 if let Some(mut liveliness_sub) = self.liveliness_subscriber.take() {
1251 liveliness_sub.set_background(background);
1252 }
1253 if let Some(mut heartbeat_sub) = self.heartbeat_subscriber.take() {
1254 heartbeat_sub.set_background(background);
1255 }
1256 }
1257
1258 #[zenoh_macros::internal]
1259 pub fn set_background(&mut self, background: bool) {
1260 self.set_background_impl(background)
1261 }
1262}
1263
1264#[zenoh_macros::unstable]
1265#[inline]
1266fn flush_sequenced_source(
1267 state: &mut SourceState<u32>,
1268 callback: &Callback<Sample>,
1269 source_id: &EntityGlobalId,
1270 miss_handlers: &HashMap<usize, Callback<Miss>>,
1271) {
1272 if state.pending_queries == 0 && !state.pending_samples.is_empty() {
1273 let mut pending_samples = BTreeMap::new();
1274 std::mem::swap(&mut state.pending_samples, &mut pending_samples);
1275 for (seq_num, sample) in pending_samples {
1276 match state.last_delivered {
1277 None => {
1278 state.last_delivered = Some(seq_num);
1279 callback.call(sample);
1280 }
1281 Some(last) if seq_num == last + 1 => {
1282 state.last_delivered = Some(seq_num);
1283 callback.call(sample);
1284 }
1285 Some(last) if seq_num > last + 1 => {
1286 tracing::warn!(
1287 "Sample missed: missed {} samples from {:?}.",
1288 seq_num - last - 1,
1289 source_id,
1290 );
1291 for miss_callback in miss_handlers.values() {
1292 miss_callback.call(Miss {
1293 source: *source_id,
1294 nb: seq_num - last - 1,
1295 })
1296 }
1297 state.last_delivered = Some(seq_num);
1298 callback.call(sample);
1299 }
1300 _ => {
1301 }
1303 }
1304 }
1305 }
1306}
1307
1308#[zenoh_macros::unstable]
1309#[inline]
1310fn flush_timestamped_source(state: &mut SourceState<Timestamp>, callback: &Callback<Sample>) {
1311 if state.pending_queries == 0 && !state.pending_samples.is_empty() {
1312 let mut pending_samples = BTreeMap::new();
1313 std::mem::swap(&mut state.pending_samples, &mut pending_samples);
1314 for (timestamp, sample) in pending_samples {
1315 if state
1316 .last_delivered
1317 .map(|last| timestamp > last)
1318 .unwrap_or(true)
1319 {
1320 state.last_delivered = Some(timestamp);
1321 callback.call(sample);
1322 }
1323 }
1324 }
1325}
1326
1327#[zenoh_macros::unstable]
1328#[derive(Clone)]
1329struct InitialRepliesHandler {
1330 statesref: Arc<Mutex<State>>,
1331}
1332
1333#[zenoh_macros::unstable]
1334impl Drop for InitialRepliesHandler {
1335 fn drop(&mut self) {
1336 let states = &mut *zlock!(self.statesref);
1337 states.global_pending_queries = states.global_pending_queries.saturating_sub(1);
1338 tracing::trace!(
1339 "AdvancedSubscriber{{key_expr: {}}}: Flush initial replies",
1340 states.key_expr
1341 );
1342
1343 if states.global_pending_queries == 0 {
1344 for (source_id, state) in states.sequenced_states.iter_mut() {
1345 flush_sequenced_source(state, &states.callback, source_id, &states.miss_handlers);
1346 spawn_periodoic_queries!(states, *source_id, self.statesref.clone());
1347 }
1348 for state in states.timestamped_states.values_mut() {
1349 flush_timestamped_source(state, &states.callback);
1350 }
1351 }
1352 }
1353}
1354
1355#[zenoh_macros::unstable]
1356#[derive(Clone)]
1357struct SequencedRepliesHandler {
1358 source_id: EntityGlobalId,
1359 statesref: Arc<Mutex<State>>,
1360}
1361
1362#[zenoh_macros::unstable]
1363impl Drop for SequencedRepliesHandler {
1364 fn drop(&mut self) {
1365 let states = &mut *zlock!(self.statesref);
1366 if let Some(state) = states.sequenced_states.get_mut(&self.source_id) {
1367 state.pending_queries = state.pending_queries.saturating_sub(1);
1368 if states.global_pending_queries == 0 {
1369 tracing::trace!(
1370 "AdvancedSubscriber{{key_expr: {}}}: Flush sequenced samples",
1371 states.key_expr
1372 );
1373 flush_sequenced_source(
1374 state,
1375 &states.callback,
1376 &self.source_id,
1377 &states.miss_handlers,
1378 )
1379 }
1380 }
1381 }
1382}
1383
1384#[zenoh_macros::unstable]
1385#[derive(Clone)]
1386struct TimestampedRepliesHandler {
1387 id: ID,
1388 statesref: Arc<Mutex<State>>,
1389 callback: Callback<Sample>,
1390}
1391
1392#[zenoh_macros::unstable]
1393impl Drop for TimestampedRepliesHandler {
1394 fn drop(&mut self) {
1395 let states = &mut *zlock!(self.statesref);
1396 if let Some(state) = states.timestamped_states.get_mut(&self.id) {
1397 state.pending_queries = state.pending_queries.saturating_sub(1);
1398 if states.global_pending_queries == 0 {
1399 tracing::trace!(
1400 "AdvancedSubscriber{{key_expr: {}}}: Flush timestamped samples",
1401 states.key_expr
1402 );
1403 flush_timestamped_source(state, &self.callback);
1404 }
1405 }
1406 }
1407}
1408
1409#[zenoh_macros::unstable]
1411#[derive(Debug, Clone)]
1412pub struct Miss {
1413 source: EntityGlobalId,
1414 nb: u32,
1415}
1416
1417impl Miss {
1418 pub fn source(&self) -> EntityGlobalId {
1420 self.source
1421 }
1422
1423 pub fn nb(&self) -> u32 {
1425 self.nb
1426 }
1427}
1428
1429impl CallbackParameter for Miss {
1430 type Message<'a> = Self;
1431
1432 fn from_message(msg: Self::Message<'_>) -> Self {
1433 msg
1434 }
1435}
1436
1437#[zenoh_macros::unstable]
1442pub struct SampleMissListener<Handler> {
1443 id: usize,
1444 statesref: Arc<Mutex<State>>,
1445 handler: Handler,
1446 undeclare_on_drop: bool,
1447}
1448
1449#[zenoh_macros::unstable]
1450impl<Handler> SampleMissListener<Handler> {
1451 #[inline]
1452 pub fn undeclare(self) -> SampleMissHandlerUndeclaration<Handler>
1453 where
1454 Handler: Send,
1455 {
1456 SampleMissHandlerUndeclaration(self)
1458 }
1459
1460 fn undeclare_impl(&mut self) -> ZResult<()> {
1461 self.undeclare_on_drop = false;
1463 zlock!(self.statesref).unregister_miss_callback(&self.id);
1464 Ok(())
1465 }
1466
1467 #[zenoh_macros::internal]
1468 pub fn set_background(&mut self, background: bool) {
1469 self.undeclare_on_drop = !background;
1470 }
1471}
1472
1473#[cfg(feature = "unstable")]
1474impl<Handler> Drop for SampleMissListener<Handler> {
1475 fn drop(&mut self) {
1476 if self.undeclare_on_drop {
1477 if let Err(error) = self.undeclare_impl() {
1478 tracing::error!(error);
1479 }
1480 }
1481 }
1482}
1483
1484#[zenoh_macros::unstable]
1494impl<Handler> std::ops::Deref for SampleMissListener<Handler> {
1495 type Target = Handler;
1496
1497 fn deref(&self) -> &Self::Target {
1498 &self.handler
1499 }
1500}
1501#[zenoh_macros::unstable]
1502impl<Handler> std::ops::DerefMut for SampleMissListener<Handler> {
1503 fn deref_mut(&mut self) -> &mut Self::Target {
1504 &mut self.handler
1505 }
1506}
1507
1508#[zenoh_macros::unstable]
1510pub struct SampleMissHandlerUndeclaration<Handler>(SampleMissListener<Handler>);
1511
1512#[zenoh_macros::unstable]
1513impl<Handler> Resolvable for SampleMissHandlerUndeclaration<Handler> {
1514 type To = ZResult<()>;
1515}
1516
1517#[zenoh_macros::unstable]
1518impl<Handler> Wait for SampleMissHandlerUndeclaration<Handler> {
1519 fn wait(mut self) -> <Self as Resolvable>::To {
1520 self.0.undeclare_impl()
1521 }
1522}
1523
1524#[zenoh_macros::unstable]
1525impl<Handler> IntoFuture for SampleMissHandlerUndeclaration<Handler> {
1526 type Output = <Self as Resolvable>::To;
1527 type IntoFuture = Ready<<Self as Resolvable>::To>;
1528
1529 fn into_future(self) -> Self::IntoFuture {
1530 std::future::ready(self.wait())
1531 }
1532}
1533
1534#[zenoh_macros::unstable]
1536pub struct SampleMissListenerBuilder<'a, Handler, const BACKGROUND: bool = false> {
1537 statesref: &'a Arc<Mutex<State>>,
1538 handler: Handler,
1539}
1540
1541#[zenoh_macros::unstable]
1542impl<'a> SampleMissListenerBuilder<'a, DefaultHandler> {
1543 #[inline]
1545 #[zenoh_macros::unstable]
1546 pub fn callback<F>(self, callback: F) -> SampleMissListenerBuilder<'a, Callback<Miss>>
1547 where
1548 F: Fn(Miss) + Send + Sync + 'static,
1549 {
1550 self.with(Callback::from(callback))
1551 }
1552
1553 #[inline]
1555 #[zenoh_macros::unstable]
1556 pub fn callback_mut<F>(self, callback: F) -> SampleMissListenerBuilder<'a, Callback<Miss>>
1557 where
1558 F: FnMut(Miss) + Send + Sync + 'static,
1559 {
1560 self.callback(zenoh::handlers::locked(callback))
1561 }
1562
1563 #[inline]
1565 #[zenoh_macros::unstable]
1566 pub fn with<Handler>(self, handler: Handler) -> SampleMissListenerBuilder<'a, Handler>
1567 where
1568 Handler: IntoHandler<Miss>,
1569 {
1570 SampleMissListenerBuilder {
1571 statesref: self.statesref,
1572 handler,
1573 }
1574 }
1575}
1576
1577#[zenoh_macros::unstable]
1578impl<'a> SampleMissListenerBuilder<'a, Callback<Miss>> {
1579 #[zenoh_macros::unstable]
1583 pub fn background(self) -> SampleMissListenerBuilder<'a, Callback<Miss>, true> {
1584 SampleMissListenerBuilder {
1585 statesref: self.statesref,
1586 handler: self.handler,
1587 }
1588 }
1589}
1590
1591#[zenoh_macros::unstable]
1592impl<Handler> Resolvable for SampleMissListenerBuilder<'_, Handler>
1593where
1594 Handler: IntoHandler<Miss> + Send,
1595 Handler::Handler: Send,
1596{
1597 type To = ZResult<SampleMissListener<Handler::Handler>>;
1598}
1599
1600#[zenoh_macros::unstable]
1601impl<Handler> Wait for SampleMissListenerBuilder<'_, Handler>
1602where
1603 Handler: IntoHandler<Miss> + Send,
1604 Handler::Handler: Send,
1605{
1606 #[zenoh_macros::unstable]
1607 fn wait(self) -> <Self as Resolvable>::To {
1608 let (callback, handler) = self.handler.into_handler();
1609 let id = zlock!(self.statesref).register_miss_callback(callback);
1610 Ok(SampleMissListener {
1611 id,
1612 statesref: self.statesref.clone(),
1613 handler,
1614 undeclare_on_drop: true,
1615 })
1616 }
1617}
1618
1619#[zenoh_macros::unstable]
1620impl<Handler> IntoFuture for SampleMissListenerBuilder<'_, Handler>
1621where
1622 Handler: IntoHandler<Miss> + Send,
1623 Handler::Handler: Send,
1624{
1625 type Output = <Self as Resolvable>::To;
1626 type IntoFuture = Ready<<Self as Resolvable>::To>;
1627
1628 #[zenoh_macros::unstable]
1629 fn into_future(self) -> Self::IntoFuture {
1630 std::future::ready(self.wait())
1631 }
1632}
1633
1634#[zenoh_macros::unstable]
1635impl Resolvable for SampleMissListenerBuilder<'_, Callback<Miss>, true> {
1636 type To = ZResult<()>;
1637}
1638
1639#[zenoh_macros::unstable]
1640impl Wait for SampleMissListenerBuilder<'_, Callback<Miss>, true> {
1641 #[zenoh_macros::unstable]
1642 fn wait(self) -> <Self as Resolvable>::To {
1643 let (callback, _) = self.handler.into_handler();
1644 zlock!(self.statesref).register_miss_callback(callback);
1645 Ok(())
1646 }
1647}
1648
1649#[zenoh_macros::unstable]
1650impl IntoFuture for SampleMissListenerBuilder<'_, Callback<Miss>, true> {
1651 type Output = <Self as Resolvable>::To;
1652 type IntoFuture = Ready<<Self as Resolvable>::To>;
1653
1654 #[zenoh_macros::unstable]
1655 fn into_future(self) -> Self::IntoFuture {
1656 std::future::ready(self.wait())
1657 }
1658}