1use std::{collections::BTreeMap, future::IntoFuture, str::FromStr};
15
16use zenoh::{
17 config::ZenohId,
18 handlers::{Callback, CallbackParameter, IntoHandler},
19 key_expr::KeyExpr,
20 liveliness::{LivelinessSubscriberBuilder, LivelinessToken},
21 pubsub::SubscriberBuilder,
22 query::{
23 ConsolidationMode, Parameters, Selector, TimeBound, TimeExpr, TimeRange, ZenohParameters,
24 },
25 sample::{Locality, Sample, SampleKind},
26 session::{EntityGlobalId, EntityId},
27 Resolvable, Resolve, Session, Wait, KE_ADV_PREFIX, KE_EMPTY, KE_PUB, KE_STAR, KE_STARSTAR,
28 KE_SUB,
29};
30use zenoh_util::{Timed, TimedEvent, Timer};
31#[zenoh_macros::unstable]
32use {
33 async_trait::async_trait,
34 std::collections::hash_map::Entry,
35 std::collections::HashMap,
36 std::convert::TryFrom,
37 std::future::Ready,
38 std::sync::{Arc, Mutex},
39 std::time::Duration,
40 uhlc::ID,
41 zenoh::handlers::{locked, DefaultHandler},
42 zenoh::internal::{runtime::ZRuntime, zlock},
43 zenoh::pubsub::Subscriber,
44 zenoh::query::{QueryTarget, Reply, ReplyKeyExpr},
45 zenoh::time::Timestamp,
46 zenoh::Result as ZResult,
47};
48
49use crate::{
50 advanced_cache::{ke_liveliness, KE_UHLC},
51 utils::WrappingSn,
52 z_deserialize,
53};
54
55#[derive(Debug, Default, Clone)]
56#[zenoh_macros::unstable]
58pub struct HistoryConfig {
59 liveliness: bool,
60 sample_depth: Option<usize>,
61 age: Option<f64>,
62}
63
64#[zenoh_macros::unstable]
65impl HistoryConfig {
66 #[inline]
71 #[zenoh_macros::unstable]
72 pub fn detect_late_publishers(mut self) -> Self {
73 self.liveliness = true;
74 self
75 }
76
77 #[zenoh_macros::unstable]
79 pub fn max_samples(mut self, depth: usize) -> Self {
80 self.sample_depth = Some(depth);
81 self
82 }
83
84 #[zenoh_macros::unstable]
86 pub fn max_age(mut self, seconds: f64) -> Self {
87 self.age = Some(seconds);
88 self
89 }
90}
91
92#[derive(Debug, Default, Clone, Copy)]
93#[zenoh_macros::unstable]
95pub struct RecoveryConfig<const CONFIGURED: bool = true> {
96 periodic_queries: Option<Duration>,
97 heartbeat: bool,
98}
99
100#[zenoh_macros::unstable]
101impl RecoveryConfig<false> {
102 #[zenoh_macros::unstable]
111 #[inline]
112 pub fn periodic_queries(self, period: Duration) -> RecoveryConfig<true> {
113 RecoveryConfig {
114 periodic_queries: Some(period),
115 heartbeat: false,
116 }
117 }
118
119 #[zenoh_macros::unstable]
128 #[inline]
129 pub fn heartbeat(self) -> RecoveryConfig<true> {
130 RecoveryConfig {
131 periodic_queries: None,
132 heartbeat: true,
133 }
134 }
135}
136
137#[zenoh_macros::unstable]
139pub struct AdvancedSubscriberBuilder<'a, 'b, 'c, Handler, const BACKGROUND: bool = false> {
140 pub(crate) session: &'a Session,
141 pub(crate) key_expr: ZResult<KeyExpr<'b>>,
142 pub(crate) origin: Locality,
143 pub(crate) retransmission: Option<RecoveryConfig>,
144 pub(crate) query_target: QueryTarget,
145 pub(crate) query_timeout: Duration,
146 pub(crate) history: Option<HistoryConfig>,
147 pub(crate) liveliness: bool,
148 pub(crate) meta_key_expr: Option<ZResult<KeyExpr<'c>>>,
149 pub(crate) handler: Handler,
150}
151
152#[zenoh_macros::unstable]
153impl<'a, 'b, Handler> AdvancedSubscriberBuilder<'a, 'b, '_, Handler> {
154 #[zenoh_macros::unstable]
155 pub(crate) fn new(builder: SubscriberBuilder<'a, 'b, Handler>) -> Self {
156 AdvancedSubscriberBuilder {
157 session: builder.session,
158 key_expr: builder.key_expr,
159 origin: builder.origin,
160 handler: builder.handler,
161 retransmission: None,
162 query_target: QueryTarget::All,
163 query_timeout: Duration::from_secs(10),
164 history: None,
165 liveliness: false,
166 meta_key_expr: None,
167 }
168 }
169}
170
171#[zenoh_macros::unstable]
172impl<'a, 'b, 'c> AdvancedSubscriberBuilder<'a, 'b, 'c, DefaultHandler> {
173 #[inline]
175 #[zenoh_macros::unstable]
176 pub fn callback<F>(self, callback: F) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback<Sample>>
177 where
178 F: Fn(Sample) + Send + Sync + 'static,
179 {
180 self.with(Callback::from(callback))
181 }
182
183 #[inline]
188 #[zenoh_macros::unstable]
189 pub fn callback_mut<F>(
190 self,
191 callback: F,
192 ) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback<Sample>>
193 where
194 F: FnMut(Sample) + Send + Sync + 'static,
195 {
196 self.callback(locked(callback))
197 }
198
199 #[inline]
201 #[zenoh_macros::unstable]
202 pub fn with<Handler>(self, handler: Handler) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler>
203 where
204 Handler: IntoHandler<Sample>,
205 {
206 AdvancedSubscriberBuilder {
207 session: self.session,
208 key_expr: self.key_expr,
209 origin: self.origin,
210 retransmission: self.retransmission,
211 query_target: self.query_target,
212 query_timeout: self.query_timeout,
213 history: self.history,
214 liveliness: self.liveliness,
215 meta_key_expr: self.meta_key_expr,
216 handler,
217 }
218 }
219}
220
221#[zenoh_macros::unstable]
222impl<'a, 'b, 'c> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback<Sample>> {
223 pub fn background(self) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback<Sample>, true> {
227 AdvancedSubscriberBuilder {
228 session: self.session,
229 key_expr: self.key_expr,
230 origin: self.origin,
231 retransmission: self.retransmission,
232 query_target: self.query_target,
233 query_timeout: self.query_timeout,
234 history: self.history,
235 liveliness: self.liveliness,
236 meta_key_expr: self.meta_key_expr,
237 handler: self.handler,
238 }
239 }
240}
241
242#[zenoh_macros::unstable]
243impl<'a, 'c, Handler, const BACKGROUND: bool>
244 AdvancedSubscriberBuilder<'a, '_, 'c, Handler, BACKGROUND>
245{
246 #[zenoh_macros::unstable]
249 #[inline]
250 pub fn allowed_origin(mut self, origin: Locality) -> Self {
251 self.origin = origin;
252 self
253 }
254
255 #[zenoh_macros::unstable]
261 #[inline]
262 pub fn recovery(mut self, conf: RecoveryConfig) -> Self {
263 self.retransmission = Some(conf);
264 self
265 }
266
267 #[zenoh_macros::unstable]
277 #[inline]
278 pub fn query_timeout(mut self, query_timeout: Duration) -> Self {
279 self.query_timeout = query_timeout;
280 self
281 }
282
283 #[zenoh_macros::unstable]
287 #[inline]
288 pub fn history(mut self, config: HistoryConfig) -> Self {
289 self.history = Some(config);
290 self
291 }
292
293 #[zenoh_macros::unstable]
295 pub fn subscriber_detection(mut self) -> Self {
296 self.liveliness = true;
297 self
298 }
299
300 #[zenoh_macros::unstable]
303 pub fn subscriber_detection_metadata<TryIntoKeyExpr>(mut self, meta: TryIntoKeyExpr) -> Self
304 where
305 TryIntoKeyExpr: TryInto<KeyExpr<'c>>,
306 <TryIntoKeyExpr as TryInto<KeyExpr<'c>>>::Error: Into<zenoh::Error>,
307 {
308 self.meta_key_expr = Some(meta.try_into().map_err(Into::into));
309 self
310 }
311
312 #[zenoh_macros::unstable]
313 fn with_static_keys(self) -> AdvancedSubscriberBuilder<'a, 'static, 'static, Handler> {
314 AdvancedSubscriberBuilder {
315 session: self.session,
316 key_expr: self.key_expr.map(|s| s.into_owned()),
317 origin: self.origin,
318 retransmission: self.retransmission,
319 query_target: self.query_target,
320 query_timeout: self.query_timeout,
321 history: self.history,
322 liveliness: self.liveliness,
323 meta_key_expr: self.meta_key_expr.map(|s| s.map(|s| s.into_owned())),
324 handler: self.handler,
325 }
326 }
327}
328
329#[zenoh_macros::unstable]
330impl<Handler> Resolvable for AdvancedSubscriberBuilder<'_, '_, '_, Handler>
331where
332 Handler: IntoHandler<Sample>,
333 Handler::Handler: Send,
334{
335 type To = ZResult<AdvancedSubscriber<Handler::Handler>>;
336}
337
338#[zenoh_macros::unstable]
339impl<Handler> Wait for AdvancedSubscriberBuilder<'_, '_, '_, Handler>
340where
341 Handler: IntoHandler<Sample> + Send,
342 Handler::Handler: Send,
343{
344 #[zenoh_macros::unstable]
345 fn wait(self) -> <Self as Resolvable>::To {
346 AdvancedSubscriber::new(self.with_static_keys())
347 }
348}
349
350#[zenoh_macros::unstable]
351impl<Handler> IntoFuture for AdvancedSubscriberBuilder<'_, '_, '_, Handler>
352where
353 Handler: IntoHandler<Sample> + Send,
354 Handler::Handler: Send,
355{
356 type Output = <Self as Resolvable>::To;
357 type IntoFuture = Ready<<Self as Resolvable>::To>;
358
359 #[zenoh_macros::unstable]
360 fn into_future(self) -> Self::IntoFuture {
361 std::future::ready(self.wait())
362 }
363}
364
365#[zenoh_macros::unstable]
366impl Resolvable for AdvancedSubscriberBuilder<'_, '_, '_, Callback<Sample>, true> {
367 type To = ZResult<()>;
368}
369
370#[zenoh_macros::unstable]
371impl Wait for AdvancedSubscriberBuilder<'_, '_, '_, Callback<Sample>, true> {
372 #[zenoh_macros::unstable]
373 fn wait(self) -> <Self as Resolvable>::To {
374 let mut sub = AdvancedSubscriber::new(self.with_static_keys())?;
375 sub.set_background_impl(true);
376 Ok(())
377 }
378}
379
380#[zenoh_macros::unstable]
381impl IntoFuture for AdvancedSubscriberBuilder<'_, '_, '_, Callback<Sample>, true> {
382 type Output = <Self as Resolvable>::To;
383 type IntoFuture = Ready<<Self as Resolvable>::To>;
384
385 #[zenoh_macros::unstable]
386 fn into_future(self) -> Self::IntoFuture {
387 std::future::ready(self.wait())
388 }
389}
390
391#[zenoh_macros::unstable]
392struct Period {
393 timer: Timer,
394 period: Duration,
395}
396
397#[zenoh_macros::unstable]
398struct State {
399 next_id: usize,
400 global_pending_queries: u64,
401 sequenced_states: HashMap<EntityGlobalId, SourceState<WrappingSn>>,
402 timestamped_states: HashMap<ID, SourceState<Timestamp>>,
403 session: Session,
404 key_expr: KeyExpr<'static>,
405 retransmission: bool,
406 period: Option<Period>,
407 history_depth: usize,
408 query_target: QueryTarget,
409 query_timeout: Duration,
410 callback: Callback<Sample>,
411 miss_handlers: HashMap<usize, Callback<Miss>>,
412 token: Option<LivelinessToken>,
413}
414
415#[zenoh_macros::unstable]
416impl State {
417 #[zenoh_macros::unstable]
418 fn register_miss_callback(&mut self, callback: Callback<Miss>) -> usize {
419 let id = self.next_id;
420 self.next_id += 1;
421 self.miss_handlers.insert(id, callback);
422 id
423 }
424 #[zenoh_macros::unstable]
425 fn unregister_miss_callback(&mut self, id: &usize) {
426 self.miss_handlers.remove(id);
427 }
428}
429
430macro_rules! spawn_periodic_queries {
431 ($p:expr,$s:expr,$r:expr) => {{
432 if let Some(period) = &$p.period {
433 period.timer.add(TimedEvent::periodic(
434 period.period,
435 PeriodicQuery {
436 source_id: $s,
437 statesref: $r,
438 },
439 ))
440 }
441 }};
442}
443
444#[zenoh_macros::unstable]
445struct SourceState<T> {
446 last_delivered: Option<T>,
447 pending_queries: u64,
448 pending_samples: BTreeMap<T, Sample>,
449}
450
451#[zenoh_macros::unstable]
522pub struct AdvancedSubscriber<Receiver> {
523 statesref: Arc<Mutex<State>>,
524 subscriber: Subscriber<()>,
525 receiver: Receiver,
526 liveliness_subscriber: Option<Subscriber<()>>,
527 heartbeat_subscriber: Option<Subscriber<()>>,
528}
529
530#[zenoh_macros::unstable]
531impl<Receiver> std::ops::Deref for AdvancedSubscriber<Receiver> {
532 type Target = Receiver;
533 fn deref(&self) -> &Self::Target {
534 &self.receiver
535 }
536}
537
538#[zenoh_macros::unstable]
539impl<Receiver> std::ops::DerefMut for AdvancedSubscriber<Receiver> {
540 fn deref_mut(&mut self) -> &mut Self::Target {
541 &mut self.receiver
542 }
543}
544
545#[zenoh_macros::unstable]
546fn handle_sample(states: &mut State, sample: Sample) -> bool {
547 if let Some(source_info) = sample.source_info().cloned() {
548 #[inline]
549 fn deliver_and_flush(
550 sample: Sample,
551 source_sn: impl Into<WrappingSn>,
552 callback: &Callback<Sample>,
553 state: &mut SourceState<WrappingSn>,
554 ) {
555 let mut source_sn = source_sn.into();
556 callback.call(sample);
557 state.last_delivered = Some(source_sn);
558 while let Some(sample) = state.pending_samples.remove(&(source_sn + 1)) {
559 callback.call(sample);
560 source_sn += 1;
561 state.last_delivered = Some(source_sn);
562 }
563 }
564
565 let entry = states.sequenced_states.entry(*source_info.source_id());
566 let new = matches!(&entry, Entry::Vacant(_));
567 let state = entry.or_insert(SourceState::<WrappingSn> {
568 last_delivered: None,
569 pending_queries: 0,
570 pending_samples: BTreeMap::new(),
571 });
572 if state.last_delivered.is_none() && states.global_pending_queries != 0 {
573 if states.history_depth == 1 {
575 state.last_delivered = Some(source_info.source_sn().into());
576 states.callback.call(sample);
577 } else {
578 state
579 .pending_samples
580 .insert(source_info.source_sn().into(), sample);
581 if state.pending_samples.len() >= states.history_depth {
582 if let Some((sn, sample)) = state.pending_samples.pop_first() {
583 deliver_and_flush(sample, sn, &states.callback, state);
584 }
585 }
586 }
587 } else if state.last_delivered.is_some()
588 && source_info.source_sn() != state.last_delivered.unwrap() + 1
589 {
590 if source_info.source_sn() > state.last_delivered.unwrap() {
591 if states.retransmission {
592 state
593 .pending_samples
594 .insert(source_info.source_sn().into(), sample);
595 } else {
596 tracing::info!(
597 "Sample missed: missed {} samples from {:?}.",
598 source_info.source_sn() - state.last_delivered.unwrap() - 1,
599 source_info.source_id(),
600 );
601 for miss_callback in states.miss_handlers.values() {
602 miss_callback.call(Miss {
603 source: *source_info.source_id(),
604 nb: source_info.source_sn() - state.last_delivered.unwrap() - 1,
605 });
606 }
607 states.callback.call(sample);
608 state.last_delivered = Some(source_info.source_sn().into());
609 }
610 }
611 } else {
612 deliver_and_flush(sample, source_info.source_sn(), &states.callback, state);
613 }
614 new
615 } else if let Some(timestamp) = sample.timestamp() {
616 let entry = states.timestamped_states.entry(*timestamp.get_id());
617 let state = entry.or_insert(SourceState::<Timestamp> {
618 last_delivered: None,
619 pending_queries: 0,
620 pending_samples: BTreeMap::new(),
621 });
622 if state.last_delivered.map(|t| t < *timestamp).unwrap_or(true) {
623 if (states.global_pending_queries == 0 && state.pending_queries == 0)
624 || states.history_depth == 1
625 {
626 state.last_delivered = Some(*timestamp);
627 states.callback.call(sample);
628 } else {
629 state.pending_samples.entry(*timestamp).or_insert(sample);
630 if state.pending_samples.len() >= states.history_depth {
631 flush_timestamped_source(state, &states.callback);
632 }
633 }
634 }
635 false
636 } else {
637 states.callback.call(sample);
638 false
639 }
640}
641
642#[zenoh_macros::unstable]
643fn seq_num_range(start: Option<WrappingSn>, end: Option<WrappingSn>) -> String {
644 match (start, end) {
645 (Some(start), Some(end)) => format!("_sn={start}..{end}"),
646 (Some(start), None) => format!("_sn={start}.."),
647 (None, Some(end)) => format!("_sn=..{end}"),
648 (None, None) => "_sn=..".to_string(),
649 }
650}
651
652#[zenoh_macros::unstable]
653#[derive(Clone)]
654struct PeriodicQuery {
655 source_id: EntityGlobalId,
656 statesref: Arc<Mutex<State>>,
657}
658
659#[zenoh_macros::unstable]
660#[async_trait]
661impl Timed for PeriodicQuery {
662 async fn run(&mut self) {
663 let mut lock = zlock!(self.statesref);
664 let states = &mut *lock;
665 if let Some(state) = states.sequenced_states.get_mut(&self.source_id) {
666 state.pending_queries += 1;
667 let query_expr = &states.key_expr
668 / KE_ADV_PREFIX
669 / KE_STAR
670 / &self.source_id.zid().into_keyexpr()
671 / &KeyExpr::try_from(self.source_id.eid().to_string()).unwrap()
672 / KE_STARSTAR;
673 let seq_num_range = seq_num_range(state.last_delivered.map(|s| s + 1), None);
674
675 let session = states.session.clone();
676 let key_expr = states.key_expr.clone().into_owned();
677 let query_target = states.query_target;
678 let query_timeout = states.query_timeout;
679
680 tracing::trace!(
681 "AdvancedSubscriber{{key_expr: {}}}: Querying undelivered samples {}?{}",
682 states.key_expr,
683 query_expr,
684 seq_num_range
685 );
686 drop(lock);
687
688 let handler = SequencedRepliesHandler {
689 source_id: self.source_id,
690 statesref: self.statesref.clone(),
691 };
692 let _ = session
693 .get(Selector::from((query_expr, seq_num_range)))
694 .callback({
695 move |r: Reply| {
696 if let Ok(s) = r.into_result() {
697 if key_expr.intersects(s.key_expr()) {
698 let states = &mut *zlock!(handler.statesref);
699 tracing::trace!(
700 "AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}",
701 states.key_expr,
702 s.source_info(),
703 s.timestamp()
704 );
705 handle_sample(states, s);
706 }
707 }
708 }
709 })
710 .consolidation(ConsolidationMode::None)
711 .accept_replies(ReplyKeyExpr::Any)
712 .target(query_target)
713 .timeout(query_timeout)
714 .wait();
715 }
716 }
717}
718
719#[zenoh_macros::unstable]
720impl<Handler> AdvancedSubscriber<Handler> {
721 fn new<H>(conf: AdvancedSubscriberBuilder<'_, '_, '_, H>) -> ZResult<Self>
722 where
723 H: IntoHandler<Sample, Handler = Handler> + Send,
724 {
725 let (callback, receiver) = conf.handler.into_handler();
726 let key_expr = conf.key_expr?;
727 let meta = match conf.meta_key_expr {
728 Some(meta) => Some(meta?),
729 None => None,
730 };
731 let retransmission = conf.retransmission;
732 let query_target = conf.query_target;
733 let query_timeout = conf.query_timeout;
734 let session = conf.session.clone();
735 let statesref = Arc::new(Mutex::new(State {
736 next_id: 0,
737 sequenced_states: HashMap::new(),
738 timestamped_states: HashMap::new(),
739 global_pending_queries: if conf.history.is_some() { 1 } else { 0 },
740 session,
741 period: retransmission.as_ref().and_then(|r| {
742 let _rt = ZRuntime::Application.enter();
743 r.periodic_queries.map(|p| Period {
744 timer: Timer::new(false),
745 period: p,
746 })
747 }),
748 key_expr: key_expr.clone().into_owned(),
749 retransmission: retransmission.is_some(),
750 history_depth: conf
751 .history
752 .as_ref()
753 .and_then(|h| h.sample_depth)
754 .unwrap_or_default(),
755 query_target: conf.query_target,
756 query_timeout: conf.query_timeout,
757 callback: callback.clone(),
758 miss_handlers: HashMap::new(),
759 token: None,
760 }));
761
762 let sub_callback = {
763 let statesref = statesref.clone();
764 let session = conf.session.clone();
765 let key_expr = key_expr.clone().into_owned();
766
767 move |s: Sample| {
768 let mut lock = zlock!(statesref);
769 let states = &mut *lock;
770 let source_id = s.source_info().map(|si| *si.source_id());
771 let new = handle_sample(states, s);
772
773 if let Some(source_id) = source_id {
774 if new {
775 spawn_periodic_queries!(states, source_id, statesref.clone());
776 }
777 if let Some(state) = states.sequenced_states.get_mut(&source_id) {
778 if retransmission.is_some()
779 && state.pending_queries == 0
780 && !state.pending_samples.is_empty()
781 {
782 state.pending_queries += 1;
783 let query_expr = &key_expr
784 / KE_ADV_PREFIX
785 / KE_STAR
786 / &source_id.zid().into_keyexpr()
787 / &KeyExpr::try_from(source_id.eid().to_string()).unwrap()
788 / KE_STARSTAR;
789 let seq_num_range =
790 seq_num_range(state.last_delivered.map(|s| s + 1), None);
791 tracing::trace!(
792 "AdvancedSubscriber{{key_expr: {}}}: Querying missing samples {}?{}",
793 states.key_expr,
794 query_expr,
795 seq_num_range
796 );
797 drop(lock);
798 let handler = SequencedRepliesHandler {
799 source_id,
800 statesref: statesref.clone(),
801 };
802 let _ = session
803 .get(Selector::from((query_expr, seq_num_range)))
804 .callback({
805 let key_expr = key_expr.clone().into_owned();
806 move |r: Reply| {
807 if let Ok(s) = r.into_result() {
808 if key_expr.intersects(s.key_expr()) {
809 let states = &mut *zlock!(handler.statesref);
810 tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
811 handle_sample(states, s);
812 }
813 }
814 }
815 })
816 .consolidation(ConsolidationMode::None)
817 .accept_replies(ReplyKeyExpr::Any)
818 .target(query_target)
819 .timeout(query_timeout)
820 .wait();
821 }
822 }
823 }
824 }
825 };
826
827 let subscriber = conf
828 .session
829 .declare_subscriber(&key_expr)
830 .callback(sub_callback)
831 .allowed_origin(conf.origin)
832 .wait()?;
833
834 tracing::debug!("Create AdvancedSubscriber{{key_expr: {}}}", key_expr,);
835
836 if let Some(historyconf) = conf.history.as_ref() {
837 let handler = InitialRepliesHandler {
838 statesref: statesref.clone(),
839 };
840 let mut params = Parameters::empty();
841 if let Some(max) = historyconf.sample_depth {
842 params.insert("_max", max.to_string());
843 }
844 if let Some(age) = historyconf.age {
845 params.set_time_range(TimeRange {
846 start: TimeBound::Inclusive(TimeExpr::Now { offset_secs: -age }),
847 end: TimeBound::Unbounded,
848 });
849 }
850 tracing::trace!(
851 "AdvancedSubscriber{{key_expr: {}}} Querying historical samples {}?{}",
852 key_expr,
853 &key_expr / KE_ADV_PREFIX / KE_STARSTAR,
854 params
855 );
856 let _ = conf
857 .session
858 .get(Selector::from((
859 &key_expr / KE_ADV_PREFIX / KE_STARSTAR,
860 params,
861 )))
862 .callback({
863 let key_expr = key_expr.clone().into_owned();
864 move |r: Reply| {
865 if let Ok(s) = r.into_result() {
866 if key_expr.intersects(s.key_expr()) {
867 let states = &mut *zlock!(handler.statesref);
868 tracing::trace!(
869 "AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}",
870 states.key_expr,
871 s.source_info(),
872 s.timestamp()
873 );
874 handle_sample(states, s);
875 }
876 }
877 }
878 })
879 .consolidation(ConsolidationMode::None)
880 .accept_replies(ReplyKeyExpr::Any)
881 .target(query_target)
882 .timeout(query_timeout)
883 .wait();
884 }
885
886 let liveliness_subscriber = if let Some(historyconf) = conf.history.as_ref() {
887 if historyconf.liveliness {
888 let live_callback = {
889 let session = conf.session.clone();
890 let statesref = statesref.clone();
891 let key_expr = key_expr.clone().into_owned();
892 let historyconf = historyconf.clone();
893 move |s: Sample| {
894 if s.kind() == SampleKind::Put {
895 if let Ok(parsed) = ke_liveliness::parse(s.key_expr().as_keyexpr()) {
896 if let Ok(zid) = ZenohId::from_str(parsed.zid().as_str()) {
897 if parsed.eid() == KE_UHLC {
900 let mut lock = zlock!(statesref);
901 let states = &mut *lock;
902 tracing::trace!(
903 "AdvancedSubscriber{{key_expr: {}}}: Detect late joiner publishers with zid={}",
904 states.key_expr,
905 parsed.zid().as_str()
906 );
907 let entry = states.timestamped_states.entry(ID::from(zid));
908 let state = entry.or_insert(SourceState::<Timestamp> {
909 last_delivered: None,
910 pending_queries: 0,
911 pending_samples: BTreeMap::new(),
912 });
913 state.pending_queries += 1;
914
915 let mut params = Parameters::empty();
916 if let Some(max) = historyconf.sample_depth {
917 params.insert("_max", max.to_string());
918 }
919 if let Some(age) = historyconf.age {
920 params.set_time_range(TimeRange {
921 start: TimeBound::Inclusive(TimeExpr::Now {
922 offset_secs: -age,
923 }),
924 end: TimeBound::Unbounded,
925 });
926 }
927 tracing::trace!(
928 "AdvancedSubscriber{{key_expr: {}}}: Querying historical samples {}?{}",
929 states.key_expr,
930 s.key_expr(),
931 params
932 );
933 drop(lock);
934
935 let handler = TimestampedRepliesHandler {
936 id: ID::from(zid),
937 statesref: statesref.clone(),
938 callback: callback.clone(),
939 };
940 let _ = session
941 .get(Selector::from((s.key_expr(), params)))
942 .callback({
943 let key_expr = key_expr.clone().into_owned();
944 move |r: Reply| {
945 if let Ok(s) = r.into_result() {
946 if key_expr.intersects(s.key_expr()) {
947 let states =
948 &mut *zlock!(handler.statesref);
949 tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
950 handle_sample(states, s);
951 }
952 }
953 }
954 })
955 .consolidation(ConsolidationMode::None)
956 .accept_replies(ReplyKeyExpr::Any)
957 .target(query_target)
958 .timeout(query_timeout)
959 .wait();
960 } else if let Ok(eid) =
961 EntityId::from_str(parsed.eid().as_str())
962 {
963 let source_id = EntityGlobalId::new(zid, eid);
964 let mut lock = zlock!(statesref);
965 let states = &mut *lock;
966 tracing::trace!(
967 "AdvancedSubscriber{{key_expr: {}}}: Detect late joiner publishers with zid={}",
968 states.key_expr,
969 parsed.zid().as_str()
970 );
971 let entry = states.sequenced_states.entry(source_id);
972 let new = matches!(&entry, Entry::Vacant(_));
973 let state = entry.or_insert(SourceState::<WrappingSn> {
974 last_delivered: None,
975 pending_queries: 0,
976 pending_samples: BTreeMap::new(),
977 });
978 state.pending_queries += 1;
979
980 let mut params = Parameters::empty();
981 if let Some(max) = historyconf.sample_depth {
982 params.insert("_max", max.to_string());
983 }
984 if let Some(age) = historyconf.age {
985 params.set_time_range(TimeRange {
986 start: TimeBound::Inclusive(TimeExpr::Now {
987 offset_secs: -age,
988 }),
989 end: TimeBound::Unbounded,
990 });
991 }
992 tracing::trace!(
993 "AdvancedSubscriber{{key_expr: {}}}: Querying historical samples {}?{}",
994 states.key_expr,
995 s.key_expr(),
996 params,
997 );
998 drop(lock);
999
1000 let handler = SequencedRepliesHandler {
1001 source_id,
1002 statesref: statesref.clone(),
1003 };
1004 let _ = session
1005 .get(Selector::from((s.key_expr(), params)))
1006 .callback({
1007 let key_expr = key_expr.clone().into_owned();
1008 move |r: Reply| {
1009 if let Ok(s) = r.into_result() {
1010 if key_expr.intersects(s.key_expr()) {
1011 let states =
1012 &mut *zlock!(handler.statesref);
1013 tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
1014 handle_sample(states, s);
1015 }
1016 }
1017 }
1018 })
1019 .consolidation(ConsolidationMode::None)
1020 .accept_replies(ReplyKeyExpr::Any)
1021 .target(query_target)
1022 .timeout(query_timeout)
1023 .wait();
1024
1025 if new {
1026 spawn_periodic_queries!(
1027 zlock!(statesref),
1028 source_id,
1029 statesref.clone()
1030 );
1031 }
1032 }
1033 } else {
1034 let mut lock = zlock!(statesref);
1035 let states = &mut *lock;
1036 tracing::trace!(
1037 "AdvancedSubscriber{{key_expr: {}}}: Detect late joiner publishers with zid={}",
1038 states.key_expr,
1039 parsed.zid().as_str()
1040 );
1041 states.global_pending_queries += 1;
1042
1043 let mut params = Parameters::empty();
1044 if let Some(max) = historyconf.sample_depth {
1045 params.insert("_max", max.to_string());
1046 }
1047 if let Some(age) = historyconf.age {
1048 params.set_time_range(TimeRange {
1049 start: TimeBound::Inclusive(TimeExpr::Now {
1050 offset_secs: -age,
1051 }),
1052 end: TimeBound::Unbounded,
1053 });
1054 }
1055 tracing::trace!(
1056 "AdvancedSubscriber{{key_expr: {}}}: Querying historical samples {}?{}",
1057 states.key_expr,
1058 s.key_expr(),
1059 params,
1060 );
1061 drop(lock);
1062
1063 let handler = InitialRepliesHandler {
1064 statesref: statesref.clone(),
1065 };
1066 let _ = session
1067 .get(Selector::from((s.key_expr(), params)))
1068 .callback({
1069 let key_expr = key_expr.clone().into_owned();
1070 move |r: Reply| {
1071 if let Ok(s) = r.into_result() {
1072 if key_expr.intersects(s.key_expr()) {
1073 let states =
1074 &mut *zlock!(handler.statesref);
1075 tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
1076 handle_sample(states, s);
1077 }
1078 }
1079 }
1080 })
1081 .consolidation(ConsolidationMode::None)
1082 .accept_replies(ReplyKeyExpr::Any)
1083 .target(query_target)
1084 .timeout(query_timeout)
1085 .wait();
1086 }
1087 } else {
1088 tracing::warn!(
1089 "AdvancedSubscriber{{}}: Received malformed liveliness token key expression: {}",
1090 s.key_expr()
1091 );
1092 }
1093 }
1094 }
1095 };
1096
1097 tracing::debug!(
1098 "AdvancedSubscriber{{key_expr: {}}}: Detect late joiner publishers on {}",
1099 key_expr,
1100 &key_expr / KE_ADV_PREFIX / KE_PUB / KE_STARSTAR
1101 );
1102 Some(
1103 conf.session
1104 .liveliness()
1105 .declare_subscriber(&key_expr / KE_ADV_PREFIX / KE_PUB / KE_STARSTAR)
1106 .history(true)
1108 .callback(live_callback)
1109 .wait()?,
1110 )
1111 } else {
1112 None
1113 }
1114 } else {
1115 None
1116 };
1117
1118 let heartbeat_subscriber = if retransmission.is_some_and(|r| r.heartbeat) {
1119 let ke_heartbeat_sub = &key_expr / KE_ADV_PREFIX / KE_PUB / KE_STARSTAR;
1120 let statesref = statesref.clone();
1121 tracing::debug!(
1122 "AdvancedSubscriber{{key_expr: {}}}: Enable heartbeat subscriber on {}",
1123 key_expr,
1124 ke_heartbeat_sub
1125 );
1126 let heartbeat_sub = conf
1127 .session
1128 .declare_subscriber(ke_heartbeat_sub)
1129 .callback(move |sample_hb| {
1130 if sample_hb.kind() != SampleKind::Put {
1131 return;
1132 }
1133
1134 let heartbeat_keyexpr = sample_hb.key_expr().as_keyexpr();
1135 let Ok(parsed_keyexpr) = ke_liveliness::parse(heartbeat_keyexpr) else {
1136 return;
1137 };
1138 let source_id = {
1139 let Ok(zid) = ZenohId::from_str(parsed_keyexpr.zid().as_str()) else {
1140 return;
1141 };
1142 let Ok(eid) = EntityId::from_str(parsed_keyexpr.eid().as_str()) else {
1143 return;
1144 };
1145 EntityGlobalId::new(zid, eid)
1146 };
1147
1148 let Ok(heartbeat_sn) = z_deserialize::<WrappingSn>(sample_hb.payload()) else {
1149 tracing::debug!(
1150 "AdvancedSubscriber{{}}: Skipping invalid heartbeat payload on '{}'",
1151 heartbeat_keyexpr
1152 );
1153 return;
1154 };
1155
1156 let mut lock = zlock!(statesref);
1157 let states = &mut *lock;
1158 let entry = states.sequenced_states.entry(source_id);
1159 if matches!(&entry, Entry::Vacant(_)) {
1160 spawn_periodic_queries!(states, source_id, statesref.clone());
1162 if states.global_pending_queries > 0 {
1163 tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Skipping heartbeat on '{}' from publisher that is currently being pulled by global query", states.key_expr, heartbeat_keyexpr);
1164 return;
1165 }
1166 }
1167
1168 let state = entry.or_insert(SourceState::<WrappingSn> {
1169 last_delivered: None,
1170 pending_queries: 0,
1171 pending_samples: BTreeMap::new(),
1172 });
1173
1174 if (state.last_delivered.is_none()
1176 || state.last_delivered.is_some_and(|sn| heartbeat_sn > sn))
1177 && state.pending_queries == 0
1178 {
1179 let seq_num_range = seq_num_range(
1180 state.last_delivered.map(|s| s + 1),
1181 Some(heartbeat_sn),
1182 );
1183
1184 let session = states.session.clone();
1185 let key_expr = states.key_expr.clone().into_owned();
1186 let query_target = states.query_target;
1187 let query_timeout = states.query_timeout;
1188 state.pending_queries += 1;
1189
1190 tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Querying missing samples {}?{}", states.key_expr, heartbeat_keyexpr, seq_num_range);
1191 drop(lock);
1192
1193 let handler = SequencedRepliesHandler {
1194 source_id,
1195 statesref: statesref.clone(),
1196 };
1197 let _ = session
1198 .get(Selector::from((heartbeat_keyexpr, seq_num_range)))
1199 .callback({
1200 move |r: Reply| {
1201 if let Ok(s) = r.into_result() {
1202 if key_expr.intersects(s.key_expr()) {
1203 let states = &mut *zlock!(handler.statesref);
1204 tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
1205 handle_sample(states, s);
1206 }
1207 }
1208 }
1209 })
1210 .consolidation(ConsolidationMode::None)
1211 .accept_replies(ReplyKeyExpr::Any)
1212 .target(query_target)
1213 .timeout(query_timeout)
1214 .wait();
1215 }
1216 })
1217 .allowed_origin(conf.origin)
1218 .wait()?;
1219 Some(heartbeat_sub)
1220 } else {
1221 None
1222 };
1223
1224 if conf.liveliness {
1225 let suffix = KE_ADV_PREFIX
1226 / KE_SUB
1227 / &subscriber.id().zid().into_keyexpr()
1228 / &KeyExpr::try_from(subscriber.id().eid().to_string()).unwrap();
1229 let suffix = match meta {
1230 Some(meta) => suffix / &meta,
1231 _ => suffix / KE_EMPTY,
1233 };
1234 tracing::debug!(
1235 "AdvancedSubscriber{{key_expr: {}}}: Declare liveliness token {}",
1236 key_expr,
1237 &key_expr / &suffix,
1238 );
1239 let token = conf
1240 .session
1241 .liveliness()
1242 .declare_token(&key_expr / &suffix)
1243 .wait()?;
1244 zlock!(statesref).token = Some(token)
1245 }
1246
1247 let reliable_subscriber = AdvancedSubscriber {
1248 statesref,
1249 subscriber,
1250 receiver,
1251 liveliness_subscriber,
1252 heartbeat_subscriber,
1253 };
1254
1255 Ok(reliable_subscriber)
1256 }
1257
1258 #[zenoh_macros::unstable]
1260 pub fn id(&self) -> EntityGlobalId {
1261 self.subscriber.id()
1262 }
1263
1264 #[zenoh_macros::unstable]
1266 pub fn key_expr(&self) -> &KeyExpr<'static> {
1267 self.subscriber.key_expr()
1268 }
1269
1270 #[zenoh_macros::unstable]
1274 pub fn handler(&self) -> &Handler {
1275 &self.receiver
1276 }
1277
1278 #[zenoh_macros::unstable]
1282 pub fn handler_mut(&mut self) -> &mut Handler {
1283 &mut self.receiver
1284 }
1285
1286 #[zenoh_macros::unstable]
1291 pub fn sample_miss_listener(&self) -> SampleMissListenerBuilder<'_, DefaultHandler> {
1292 SampleMissListenerBuilder {
1293 statesref: &self.statesref,
1294 handler: DefaultHandler::default(),
1295 }
1296 }
1297
1298 #[zenoh_macros::unstable]
1303 pub fn detect_publishers(&self) -> LivelinessSubscriberBuilder<'_, '_, DefaultHandler> {
1304 self.subscriber
1305 .session()
1306 .liveliness()
1307 .declare_subscriber(self.subscriber.key_expr() / KE_ADV_PREFIX / KE_PUB / KE_STARSTAR)
1308 }
1309
1310 #[inline]
1312 #[zenoh_macros::unstable]
1313 pub fn undeclare(self) -> impl Resolve<ZResult<()>> {
1314 tracing::debug!(
1315 "AdvancedSubscriber{{key_expr: {}}}: Undeclare",
1316 self.key_expr()
1317 );
1318 self.subscriber.undeclare()
1319 }
1320
1321 fn set_background_impl(&mut self, background: bool) {
1322 self.subscriber.set_background(background);
1323 if let Some(mut liveliness_sub) = self.liveliness_subscriber.take() {
1324 liveliness_sub.set_background(background);
1325 }
1326 if let Some(mut heartbeat_sub) = self.heartbeat_subscriber.take() {
1327 heartbeat_sub.set_background(background);
1328 }
1329 }
1330
1331 #[zenoh_macros::internal]
1332 pub fn set_background(&mut self, background: bool) {
1333 self.set_background_impl(background)
1334 }
1335}
1336
1337#[zenoh_macros::unstable]
1338#[inline]
1339fn flush_sequenced_source(
1340 state: &mut SourceState<WrappingSn>,
1341 callback: &Callback<Sample>,
1342 source_id: &EntityGlobalId,
1343 miss_handlers: &HashMap<usize, Callback<Miss>>,
1344) {
1345 if state.pending_queries == 0 && !state.pending_samples.is_empty() {
1346 let mut pending_samples = BTreeMap::new();
1347 std::mem::swap(&mut state.pending_samples, &mut pending_samples);
1348 for (seq_num, sample) in pending_samples {
1349 match state.last_delivered {
1350 None => {
1351 state.last_delivered = Some(seq_num);
1352 callback.call(sample);
1353 }
1354 Some(last) if seq_num == last + 1 => {
1355 state.last_delivered = Some(seq_num);
1356 callback.call(sample);
1357 }
1358 Some(last) if seq_num > last + 1 => {
1359 tracing::warn!(
1360 "Sample missed: missed {} samples from {:?}.",
1361 seq_num - last - 1,
1362 source_id,
1363 );
1364 for miss_callback in miss_handlers.values() {
1365 miss_callback.call(Miss {
1366 source: *source_id,
1367 nb: seq_num - last - 1,
1368 })
1369 }
1370 state.last_delivered = Some(seq_num);
1371 callback.call(sample);
1372 }
1373 _ => {
1374 }
1376 }
1377 }
1378 }
1379}
1380
1381#[zenoh_macros::unstable]
1382#[inline]
1383fn flush_timestamped_source(state: &mut SourceState<Timestamp>, callback: &Callback<Sample>) {
1384 if state.pending_queries == 0 && !state.pending_samples.is_empty() {
1385 let mut pending_samples = BTreeMap::new();
1386 std::mem::swap(&mut state.pending_samples, &mut pending_samples);
1387 for (timestamp, sample) in pending_samples {
1388 if state
1389 .last_delivered
1390 .map(|last| timestamp > last)
1391 .unwrap_or(true)
1392 {
1393 state.last_delivered = Some(timestamp);
1394 callback.call(sample);
1395 }
1396 }
1397 }
1398}
1399
1400#[zenoh_macros::unstable]
1401#[derive(Clone)]
1402struct InitialRepliesHandler {
1403 statesref: Arc<Mutex<State>>,
1404}
1405
1406#[zenoh_macros::unstable]
1407impl Drop for InitialRepliesHandler {
1408 fn drop(&mut self) {
1409 let states = &mut *zlock!(self.statesref);
1410 states.global_pending_queries = states.global_pending_queries.saturating_sub(1);
1411 tracing::trace!(
1412 "AdvancedSubscriber{{key_expr: {}}}: Flush initial replies",
1413 states.key_expr
1414 );
1415
1416 if states.global_pending_queries == 0 {
1417 for (source_id, state) in states.sequenced_states.iter_mut() {
1418 flush_sequenced_source(state, &states.callback, source_id, &states.miss_handlers);
1419 spawn_periodic_queries!(states, *source_id, self.statesref.clone());
1420 }
1421 for state in states.timestamped_states.values_mut() {
1422 flush_timestamped_source(state, &states.callback);
1423 }
1424 }
1425 }
1426}
1427
1428#[zenoh_macros::unstable]
1429#[derive(Clone)]
1430struct SequencedRepliesHandler {
1431 source_id: EntityGlobalId,
1432 statesref: Arc<Mutex<State>>,
1433}
1434
1435#[zenoh_macros::unstable]
1436impl Drop for SequencedRepliesHandler {
1437 fn drop(&mut self) {
1438 let states = &mut *zlock!(self.statesref);
1439 if let Some(state) = states.sequenced_states.get_mut(&self.source_id) {
1440 state.pending_queries = state.pending_queries.saturating_sub(1);
1441 if states.global_pending_queries == 0 {
1442 tracing::trace!(
1443 "AdvancedSubscriber{{key_expr: {}}}: Flush sequenced samples",
1444 states.key_expr
1445 );
1446 flush_sequenced_source(
1447 state,
1448 &states.callback,
1449 &self.source_id,
1450 &states.miss_handlers,
1451 )
1452 }
1453 }
1454 }
1455}
1456
1457#[zenoh_macros::unstable]
1458#[derive(Clone)]
1459struct TimestampedRepliesHandler {
1460 id: ID,
1461 statesref: Arc<Mutex<State>>,
1462 callback: Callback<Sample>,
1463}
1464
1465#[zenoh_macros::unstable]
1466impl Drop for TimestampedRepliesHandler {
1467 fn drop(&mut self) {
1468 let states = &mut *zlock!(self.statesref);
1469 if let Some(state) = states.timestamped_states.get_mut(&self.id) {
1470 state.pending_queries = state.pending_queries.saturating_sub(1);
1471 if states.global_pending_queries == 0 {
1472 tracing::trace!(
1473 "AdvancedSubscriber{{key_expr: {}}}: Flush timestamped samples",
1474 states.key_expr
1475 );
1476 flush_timestamped_source(state, &self.callback);
1477 }
1478 }
1479 }
1480}
1481
1482#[zenoh_macros::unstable]
1484#[derive(Debug, Clone)]
1485pub struct Miss {
1486 source: EntityGlobalId,
1487 nb: u32,
1488}
1489
1490impl Miss {
1491 pub fn source(&self) -> EntityGlobalId {
1493 self.source
1494 }
1495
1496 pub fn nb(&self) -> u32 {
1498 self.nb
1499 }
1500}
1501
1502impl CallbackParameter for Miss {
1503 type Message<'a> = Self;
1504
1505 fn from_message(msg: Self::Message<'_>) -> Self {
1506 msg
1507 }
1508}
1509
1510#[zenoh_macros::unstable]
1515pub struct SampleMissListener<Handler> {
1516 id: usize,
1517 statesref: Arc<Mutex<State>>,
1518 handler: Handler,
1519 undeclare_on_drop: bool,
1520}
1521
1522#[zenoh_macros::unstable]
1523impl<Handler> SampleMissListener<Handler> {
1524 #[inline]
1525 pub fn undeclare(self) -> SampleMissHandlerUndeclaration<Handler>
1526 where
1527 Handler: Send,
1528 {
1529 SampleMissHandlerUndeclaration(self)
1531 }
1532
1533 fn undeclare_impl(&mut self) -> ZResult<()> {
1534 self.undeclare_on_drop = false;
1536 zlock!(self.statesref).unregister_miss_callback(&self.id);
1537 Ok(())
1538 }
1539
1540 #[zenoh_macros::internal]
1541 pub fn set_background(&mut self, background: bool) {
1542 self.undeclare_on_drop = !background;
1543 }
1544}
1545
1546#[cfg(feature = "unstable")]
1547impl<Handler> Drop for SampleMissListener<Handler> {
1548 fn drop(&mut self) {
1549 if self.undeclare_on_drop {
1550 if let Err(error) = self.undeclare_impl() {
1551 tracing::error!(error);
1552 }
1553 }
1554 }
1555}
1556
1557#[zenoh_macros::unstable]
1567impl<Handler> std::ops::Deref for SampleMissListener<Handler> {
1568 type Target = Handler;
1569
1570 fn deref(&self) -> &Self::Target {
1571 &self.handler
1572 }
1573}
1574#[zenoh_macros::unstable]
1575impl<Handler> std::ops::DerefMut for SampleMissListener<Handler> {
1576 fn deref_mut(&mut self) -> &mut Self::Target {
1577 &mut self.handler
1578 }
1579}
1580
1581#[zenoh_macros::unstable]
1583pub struct SampleMissHandlerUndeclaration<Handler>(SampleMissListener<Handler>);
1584
1585#[zenoh_macros::unstable]
1586impl<Handler> Resolvable for SampleMissHandlerUndeclaration<Handler> {
1587 type To = ZResult<()>;
1588}
1589
1590#[zenoh_macros::unstable]
1591impl<Handler> Wait for SampleMissHandlerUndeclaration<Handler> {
1592 fn wait(mut self) -> <Self as Resolvable>::To {
1593 self.0.undeclare_impl()
1594 }
1595}
1596
1597#[zenoh_macros::unstable]
1598impl<Handler> IntoFuture for SampleMissHandlerUndeclaration<Handler> {
1599 type Output = <Self as Resolvable>::To;
1600 type IntoFuture = Ready<<Self as Resolvable>::To>;
1601
1602 fn into_future(self) -> Self::IntoFuture {
1603 std::future::ready(self.wait())
1604 }
1605}
1606
1607#[zenoh_macros::unstable]
1609pub struct SampleMissListenerBuilder<'a, Handler, const BACKGROUND: bool = false> {
1610 statesref: &'a Arc<Mutex<State>>,
1611 handler: Handler,
1612}
1613
1614#[zenoh_macros::unstable]
1615impl<'a> SampleMissListenerBuilder<'a, DefaultHandler> {
1616 #[inline]
1618 #[zenoh_macros::unstable]
1619 pub fn callback<F>(self, callback: F) -> SampleMissListenerBuilder<'a, Callback<Miss>>
1620 where
1621 F: Fn(Miss) + Send + Sync + 'static,
1622 {
1623 self.with(Callback::from(callback))
1624 }
1625
1626 #[inline]
1628 #[zenoh_macros::unstable]
1629 pub fn callback_mut<F>(self, callback: F) -> SampleMissListenerBuilder<'a, Callback<Miss>>
1630 where
1631 F: FnMut(Miss) + Send + Sync + 'static,
1632 {
1633 self.callback(zenoh::handlers::locked(callback))
1634 }
1635
1636 #[inline]
1638 #[zenoh_macros::unstable]
1639 pub fn with<Handler>(self, handler: Handler) -> SampleMissListenerBuilder<'a, Handler>
1640 where
1641 Handler: IntoHandler<Miss>,
1642 {
1643 SampleMissListenerBuilder {
1644 statesref: self.statesref,
1645 handler,
1646 }
1647 }
1648}
1649
1650#[zenoh_macros::unstable]
1651impl<'a> SampleMissListenerBuilder<'a, Callback<Miss>> {
1652 #[zenoh_macros::unstable]
1656 pub fn background(self) -> SampleMissListenerBuilder<'a, Callback<Miss>, true> {
1657 SampleMissListenerBuilder {
1658 statesref: self.statesref,
1659 handler: self.handler,
1660 }
1661 }
1662}
1663
1664#[zenoh_macros::unstable]
1665impl<Handler> Resolvable for SampleMissListenerBuilder<'_, Handler>
1666where
1667 Handler: IntoHandler<Miss> + Send,
1668 Handler::Handler: Send,
1669{
1670 type To = ZResult<SampleMissListener<Handler::Handler>>;
1671}
1672
1673#[zenoh_macros::unstable]
1674impl<Handler> Wait for SampleMissListenerBuilder<'_, Handler>
1675where
1676 Handler: IntoHandler<Miss> + Send,
1677 Handler::Handler: Send,
1678{
1679 #[zenoh_macros::unstable]
1680 fn wait(self) -> <Self as Resolvable>::To {
1681 let (callback, handler) = self.handler.into_handler();
1682 let id = zlock!(self.statesref).register_miss_callback(callback);
1683 Ok(SampleMissListener {
1684 id,
1685 statesref: self.statesref.clone(),
1686 handler,
1687 undeclare_on_drop: true,
1688 })
1689 }
1690}
1691
1692#[zenoh_macros::unstable]
1693impl<Handler> IntoFuture for SampleMissListenerBuilder<'_, Handler>
1694where
1695 Handler: IntoHandler<Miss> + Send,
1696 Handler::Handler: Send,
1697{
1698 type Output = <Self as Resolvable>::To;
1699 type IntoFuture = Ready<<Self as Resolvable>::To>;
1700
1701 #[zenoh_macros::unstable]
1702 fn into_future(self) -> Self::IntoFuture {
1703 std::future::ready(self.wait())
1704 }
1705}
1706
1707#[zenoh_macros::unstable]
1708impl Resolvable for SampleMissListenerBuilder<'_, Callback<Miss>, true> {
1709 type To = ZResult<()>;
1710}
1711
1712#[zenoh_macros::unstable]
1713impl Wait for SampleMissListenerBuilder<'_, Callback<Miss>, true> {
1714 #[zenoh_macros::unstable]
1715 fn wait(self) -> <Self as Resolvable>::To {
1716 let (callback, _) = self.handler.into_handler();
1717 zlock!(self.statesref).register_miss_callback(callback);
1718 Ok(())
1719 }
1720}
1721
1722#[zenoh_macros::unstable]
1723impl IntoFuture for SampleMissListenerBuilder<'_, Callback<Miss>, true> {
1724 type Output = <Self as Resolvable>::To;
1725 type IntoFuture = Ready<<Self as Resolvable>::To>;
1726
1727 #[zenoh_macros::unstable]
1728 fn into_future(self) -> Self::IntoFuture {
1729 std::future::ready(self.wait())
1730 }
1731}