1use std::{collections::BTreeMap, future::IntoFuture, str::FromStr};
15
16use zenoh::{
17 config::ZenohId,
18 handlers::{Callback, CallbackDrop, CallbackParameter, IntoHandler},
19 key_expr::KeyExpr,
20 liveliness::{LivelinessSubscriberBuilder, LivelinessToken},
21 pubsub::{SubscriberBuilder, SubscriberUndeclaration},
22 query::{
23 ConsolidationMode, Parameters, Selector, TimeBound, TimeExpr, TimeRange, ZenohParameters,
24 },
25 sample::{Locality, Sample, SampleKind},
26 session::{EntityGlobalId, EntityId, WeakSession},
27 Resolvable, Session, Wait, KE_ADV_PREFIX, KE_EMPTY, KE_PUB, KE_STAR, KE_STARSTAR, KE_SUB,
28};
29use zenoh_util::{Timed, TimedEvent, Timer};
30#[zenoh_macros::unstable]
31use {
32 async_trait::async_trait,
33 std::collections::hash_map::Entry,
34 std::collections::HashMap,
35 std::convert::TryFrom,
36 std::future::Ready,
37 std::sync::{Arc, Mutex},
38 std::time::Duration,
39 uhlc::ID,
40 zenoh::handlers::{locked, DefaultHandler},
41 zenoh::internal::{runtime::ZRuntime, zlock},
42 zenoh::pubsub::Subscriber,
43 zenoh::query::{QueryTarget, Reply, ReplyKeyExpr},
44 zenoh::time::Timestamp,
45 zenoh::Result as ZResult,
46};
47
48use crate::{
49 advanced_cache::{ke_liveliness, KE_UHLC},
50 utils::WrappingSn,
51 z_deserialize,
52};
53
54#[derive(Debug, Default, Clone)]
55#[zenoh_macros::unstable]
57pub struct HistoryConfig {
58 liveliness: bool,
59 sample_depth: Option<usize>,
60 age: Option<f64>,
61}
62
63#[zenoh_macros::unstable]
64impl HistoryConfig {
65 #[inline]
70 #[zenoh_macros::unstable]
71 pub fn detect_late_publishers(mut self) -> Self {
72 self.liveliness = true;
73 self
74 }
75
76 #[zenoh_macros::unstable]
78 pub fn max_samples(mut self, depth: usize) -> Self {
79 self.sample_depth = Some(depth);
80 self
81 }
82
83 #[zenoh_macros::unstable]
85 pub fn max_age(mut self, seconds: f64) -> Self {
86 self.age = Some(seconds);
87 self
88 }
89}
90
91#[derive(Debug, Default, Clone, Copy)]
92#[zenoh_macros::unstable]
94pub struct RecoveryConfig<const CONFIGURED: bool = true> {
95 periodic_queries: Option<Duration>,
96 heartbeat: bool,
97}
98
99#[zenoh_macros::unstable]
100impl RecoveryConfig<false> {
101 #[zenoh_macros::unstable]
110 #[inline]
111 pub fn periodic_queries(self, period: Duration) -> RecoveryConfig<true> {
112 RecoveryConfig {
113 periodic_queries: Some(period),
114 heartbeat: false,
115 }
116 }
117
118 #[zenoh_macros::unstable]
127 #[inline]
128 pub fn heartbeat(self) -> RecoveryConfig<true> {
129 RecoveryConfig {
130 periodic_queries: None,
131 heartbeat: true,
132 }
133 }
134}
135
136#[zenoh_macros::unstable]
138pub struct AdvancedSubscriberBuilder<'a, 'b, 'c, Handler, const BACKGROUND: bool = false> {
139 pub(crate) session: &'a Session,
140 pub(crate) key_expr: ZResult<KeyExpr<'b>>,
141 pub(crate) origin: Locality,
142 pub(crate) retransmission: Option<RecoveryConfig>,
143 pub(crate) query_target: QueryTarget,
144 pub(crate) query_timeout: Duration,
145 pub(crate) history: Option<HistoryConfig>,
146 pub(crate) liveliness: bool,
147 pub(crate) meta_key_expr: Option<ZResult<KeyExpr<'c>>>,
148 pub(crate) handler: Handler,
149}
150
151#[zenoh_macros::unstable]
152impl<'a, 'b, Handler> AdvancedSubscriberBuilder<'a, 'b, '_, Handler> {
153 #[zenoh_macros::unstable]
154 pub(crate) fn new(builder: SubscriberBuilder<'a, 'b, Handler>) -> Self {
155 AdvancedSubscriberBuilder {
156 session: builder.session,
157 key_expr: builder.key_expr,
158 origin: builder.origin,
159 handler: builder.handler,
160 retransmission: None,
161 query_target: QueryTarget::All,
162 query_timeout: Duration::from_secs(10),
163 history: None,
164 liveliness: false,
165 meta_key_expr: None,
166 }
167 }
168}
169
170#[zenoh_macros::unstable]
171impl<'a, 'b, 'c> AdvancedSubscriberBuilder<'a, 'b, 'c, DefaultHandler> {
172 #[inline]
174 #[zenoh_macros::unstable]
175 pub fn callback<F>(self, callback: F) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback<Sample>>
176 where
177 F: Fn(Sample) + Send + Sync + 'static,
178 {
179 self.with(Callback::from(callback))
180 }
181
182 #[inline]
187 #[zenoh_macros::unstable]
188 pub fn callback_mut<F>(
189 self,
190 callback: F,
191 ) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback<Sample>>
192 where
193 F: FnMut(Sample) + Send + Sync + 'static,
194 {
195 self.callback(locked(callback))
196 }
197
198 #[inline]
200 #[zenoh_macros::unstable]
201 pub fn with<Handler>(self, handler: Handler) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler>
202 where
203 Handler: IntoHandler<Sample>,
204 {
205 AdvancedSubscriberBuilder {
206 session: self.session,
207 key_expr: self.key_expr,
208 origin: self.origin,
209 retransmission: self.retransmission,
210 query_target: self.query_target,
211 query_timeout: self.query_timeout,
212 history: self.history,
213 liveliness: self.liveliness,
214 meta_key_expr: self.meta_key_expr,
215 handler,
216 }
217 }
218}
219
220#[zenoh_macros::unstable]
221impl<'a, 'b, 'c> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback<Sample>> {
222 pub fn background(self) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback<Sample>, true> {
226 AdvancedSubscriberBuilder {
227 session: self.session,
228 key_expr: self.key_expr,
229 origin: self.origin,
230 retransmission: self.retransmission,
231 query_target: self.query_target,
232 query_timeout: self.query_timeout,
233 history: self.history,
234 liveliness: self.liveliness,
235 meta_key_expr: self.meta_key_expr,
236 handler: self.handler,
237 }
238 }
239}
240
241#[zenoh_macros::unstable]
242impl<'a, 'c, Handler, const BACKGROUND: bool>
243 AdvancedSubscriberBuilder<'a, '_, 'c, Handler, BACKGROUND>
244{
245 #[zenoh_macros::unstable]
247 #[inline]
248 pub fn allowed_origin(mut self, origin: Locality) -> Self {
249 self.origin = origin;
250 self
251 }
252
253 #[zenoh_macros::unstable]
259 #[inline]
260 pub fn recovery(mut self, conf: RecoveryConfig) -> Self {
261 self.retransmission = Some(conf);
262 self
263 }
264
265 #[zenoh_macros::unstable]
275 #[inline]
276 pub fn query_timeout(mut self, query_timeout: Duration) -> Self {
277 self.query_timeout = query_timeout;
278 self
279 }
280
281 #[zenoh_macros::unstable]
285 #[inline]
286 pub fn history(mut self, config: HistoryConfig) -> Self {
287 self.history = Some(config);
288 self
289 }
290
291 #[zenoh_macros::unstable]
293 pub fn subscriber_detection(mut self) -> Self {
294 self.liveliness = true;
295 self
296 }
297
298 #[zenoh_macros::unstable]
302 pub fn subscriber_detection_metadata<TryIntoKeyExpr>(mut self, meta: TryIntoKeyExpr) -> Self
303 where
304 TryIntoKeyExpr: TryInto<KeyExpr<'c>>,
305 <TryIntoKeyExpr as TryInto<KeyExpr<'c>>>::Error: Into<zenoh::Error>,
306 {
307 self.meta_key_expr = Some(meta.try_into().map_err(Into::into));
308 self
309 }
310
311 #[zenoh_macros::unstable]
312 fn with_static_keys(self) -> AdvancedSubscriberBuilder<'a, 'static, 'static, Handler> {
313 AdvancedSubscriberBuilder {
314 session: self.session,
315 key_expr: self.key_expr.map(|s| s.into_owned()),
316 origin: self.origin,
317 retransmission: self.retransmission,
318 query_target: self.query_target,
319 query_timeout: self.query_timeout,
320 history: self.history,
321 liveliness: self.liveliness,
322 meta_key_expr: self.meta_key_expr.map(|s| s.map(|s| s.into_owned())),
323 handler: self.handler,
324 }
325 }
326}
327
328#[zenoh_macros::unstable]
329impl<Handler> Resolvable for AdvancedSubscriberBuilder<'_, '_, '_, Handler>
330where
331 Handler: IntoHandler<Sample>,
332 Handler::Handler: Send,
333{
334 type To = ZResult<AdvancedSubscriber<Handler::Handler>>;
335}
336
337#[zenoh_macros::unstable]
338impl<Handler> Wait for AdvancedSubscriberBuilder<'_, '_, '_, Handler>
339where
340 Handler: IntoHandler<Sample> + Send,
341 Handler::Handler: Send,
342{
343 #[zenoh_macros::unstable]
344 fn wait(self) -> <Self as Resolvable>::To {
345 AdvancedSubscriber::new(self.with_static_keys())
346 }
347}
348
349#[zenoh_macros::unstable]
350impl<Handler> IntoFuture for AdvancedSubscriberBuilder<'_, '_, '_, Handler>
351where
352 Handler: IntoHandler<Sample> + Send,
353 Handler::Handler: Send,
354{
355 type Output = <Self as Resolvable>::To;
356 type IntoFuture = Ready<<Self as Resolvable>::To>;
357
358 #[zenoh_macros::unstable]
359 fn into_future(self) -> Self::IntoFuture {
360 std::future::ready(self.wait())
361 }
362}
363
364#[zenoh_macros::unstable]
365impl Resolvable for AdvancedSubscriberBuilder<'_, '_, '_, Callback<Sample>, true> {
366 type To = ZResult<()>;
367}
368
369#[zenoh_macros::unstable]
370impl Wait for AdvancedSubscriberBuilder<'_, '_, '_, Callback<Sample>, true> {
371 #[zenoh_macros::unstable]
372 fn wait(self) -> <Self as Resolvable>::To {
373 let mut sub = AdvancedSubscriber::new(self.with_static_keys())?;
374 sub.set_background_impl(true);
375 Ok(())
376 }
377}
378
379#[zenoh_macros::unstable]
380impl IntoFuture for AdvancedSubscriberBuilder<'_, '_, '_, Callback<Sample>, true> {
381 type Output = <Self as Resolvable>::To;
382 type IntoFuture = Ready<<Self as Resolvable>::To>;
383
384 #[zenoh_macros::unstable]
385 fn into_future(self) -> Self::IntoFuture {
386 std::future::ready(self.wait())
387 }
388}
389
390#[zenoh_macros::unstable]
391struct Period {
392 timer: Timer,
393 period: Duration,
394}
395
396#[zenoh_macros::unstable]
397struct State {
398 next_id: usize,
399 global_pending_queries: u64,
400 sequenced_states: HashMap<EntityGlobalId, SourceState<WrappingSn>>,
401 timestamped_states: HashMap<ID, SourceState<Timestamp>>,
402 session: WeakSession,
403 key_expr: KeyExpr<'static>,
404 retransmission: bool,
405 period: Option<Period>,
406 history_depth: usize,
407 query_target: QueryTarget,
408 query_timeout: Duration,
409 callback: Option<Callback<Sample>>,
413 miss_handlers: HashMap<usize, Callback<Miss>>,
414 token: Option<LivelinessToken>,
415}
416
417#[zenoh_macros::unstable]
418impl State {
419 #[zenoh_macros::unstable]
420 fn register_miss_callback(&mut self, callback: Callback<Miss>) -> usize {
421 let id = self.next_id;
422 self.next_id += 1;
423 self.miss_handlers.insert(id, callback);
424 id
425 }
426 #[zenoh_macros::unstable]
427 fn unregister_miss_callback(&mut self, id: &usize) {
428 self.miss_handlers.remove(id);
429 }
430}
431
432macro_rules! spawn_periodic_queries {
433 ($p:expr,$s:expr,$r:expr) => {{
434 if let Some(period) = &$p.period {
435 period.timer.add(TimedEvent::periodic(
436 period.period,
437 PeriodicQuery {
438 source_id: $s,
439 statesref: $r,
440 },
441 ))
442 }
443 }};
444}
445
446#[zenoh_macros::unstable]
447struct SourceState<T> {
448 last_delivered: Option<T>,
449 pending_queries: u64,
450 pending_samples: BTreeMap<T, Sample>,
451}
452
453#[zenoh_macros::unstable]
524pub struct AdvancedSubscriber<Receiver> {
525 statesref: Arc<Mutex<State>>,
526 subscriber: Subscriber<()>,
527 receiver: Receiver,
528 liveliness_subscriber: Option<Subscriber<()>>,
529 heartbeat_subscriber: Option<Subscriber<()>>,
530}
531
532#[zenoh_macros::unstable]
533impl<Receiver> std::ops::Deref for AdvancedSubscriber<Receiver> {
534 type Target = Receiver;
535 fn deref(&self) -> &Self::Target {
536 &self.receiver
537 }
538}
539
540#[zenoh_macros::unstable]
541impl<Receiver> std::ops::DerefMut for AdvancedSubscriber<Receiver> {
542 fn deref_mut(&mut self) -> &mut Self::Target {
543 &mut self.receiver
544 }
545}
546
547#[zenoh_macros::unstable]
548fn handle_sample(states: &mut State, sample: Sample) -> bool {
549 let Some(callback) = states.callback.as_ref() else {
550 return false;
551 };
552 if let Some(source_info) = sample.source_info().cloned() {
553 #[inline]
554 fn deliver_and_flush(
555 sample: Sample,
556 source_sn: impl Into<WrappingSn>,
557 callback: &Callback<Sample>,
558 state: &mut SourceState<WrappingSn>,
559 ) {
560 let mut source_sn = source_sn.into();
561 callback.call(sample);
562 state.last_delivered = Some(source_sn);
563 while let Some(sample) = state.pending_samples.remove(&(source_sn + 1)) {
564 callback.call(sample);
565 source_sn += 1;
566 state.last_delivered = Some(source_sn);
567 }
568 }
569
570 let entry = states.sequenced_states.entry(*source_info.source_id());
571 let new = matches!(&entry, Entry::Vacant(_));
572 let state = entry.or_insert(SourceState::<WrappingSn> {
573 last_delivered: None,
574 pending_queries: 0,
575 pending_samples: BTreeMap::new(),
576 });
577 if state.last_delivered.is_none() && states.global_pending_queries != 0 {
578 if states.history_depth == 1 {
580 state.last_delivered = Some(source_info.source_sn().into());
581 callback.call(sample);
582 } else {
583 state
584 .pending_samples
585 .insert(source_info.source_sn().into(), sample);
586 if state.pending_samples.len() >= states.history_depth {
587 if let Some((sn, sample)) = state.pending_samples.pop_first() {
588 deliver_and_flush(sample, sn, callback, state);
589 }
590 }
591 }
592 } else if state.last_delivered.is_some()
593 && source_info.source_sn() != state.last_delivered.unwrap() + 1
594 {
595 if source_info.source_sn() > state.last_delivered.unwrap() {
596 if states.retransmission {
597 state
598 .pending_samples
599 .insert(source_info.source_sn().into(), sample);
600 } else {
601 tracing::info!(
602 "Sample missed: missed {} samples from {:?}.",
603 source_info.source_sn() - state.last_delivered.unwrap() - 1,
604 source_info.source_id(),
605 );
606 for miss_callback in states.miss_handlers.values() {
607 miss_callback.call(Miss {
608 source: *source_info.source_id(),
609 nb: source_info.source_sn() - state.last_delivered.unwrap() - 1,
610 });
611 }
612 callback.call(sample);
613 state.last_delivered = Some(source_info.source_sn().into());
614 }
615 }
616 } else {
617 deliver_and_flush(sample, source_info.source_sn(), callback, state);
618 }
619 new
620 } else if let Some(timestamp) = sample.timestamp() {
621 let entry = states.timestamped_states.entry(*timestamp.get_id());
622 let state = entry.or_insert(SourceState::<Timestamp> {
623 last_delivered: None,
624 pending_queries: 0,
625 pending_samples: BTreeMap::new(),
626 });
627 if state.last_delivered.map(|t| t < *timestamp).unwrap_or(true) {
628 if (states.global_pending_queries == 0 && state.pending_queries == 0)
629 || states.history_depth == 1
630 {
631 state.last_delivered = Some(*timestamp);
632 callback.call(sample);
633 } else {
634 state.pending_samples.entry(*timestamp).or_insert(sample);
635 if state.pending_samples.len() >= states.history_depth {
636 flush_timestamped_source(state, Some(callback));
637 }
638 }
639 }
640 false
641 } else {
642 callback.call(sample);
643 false
644 }
645}
646
647#[zenoh_macros::unstable]
648fn seq_num_range(start: Option<WrappingSn>, end: Option<WrappingSn>) -> String {
649 match (start, end) {
650 (Some(start), Some(end)) => format!("_sn={start}..{end}"),
651 (Some(start), None) => format!("_sn={start}.."),
652 (None, Some(end)) => format!("_sn=..{end}"),
653 (None, None) => "_sn=..".to_string(),
654 }
655}
656
657#[zenoh_macros::unstable]
658#[derive(Clone)]
659struct PeriodicQuery {
660 source_id: EntityGlobalId,
661 statesref: Arc<Mutex<State>>,
662}
663
664#[zenoh_macros::unstable]
665#[async_trait]
666impl Timed for PeriodicQuery {
667 async fn run(&mut self) {
668 let mut lock = zlock!(self.statesref);
669 let states = &mut *lock;
670 if let Some(state) = states.sequenced_states.get_mut(&self.source_id) {
671 state.pending_queries += 1;
672 let query_expr = &states.key_expr
673 / KE_ADV_PREFIX
674 / KE_STAR
675 / &self.source_id.zid().into_keyexpr()
676 / &KeyExpr::try_from(self.source_id.eid().to_string()).unwrap()
677 / KE_STARSTAR;
678 let seq_num_range = seq_num_range(state.last_delivered.map(|s| s + 1), None);
679
680 let session = states.session.clone();
681 let key_expr = states.key_expr.clone().into_owned();
682 let query_target = states.query_target;
683 let query_timeout = states.query_timeout;
684
685 tracing::trace!(
686 "AdvancedSubscriber{{key_expr: {}}}: Querying undelivered samples {}?{}",
687 states.key_expr,
688 query_expr,
689 seq_num_range
690 );
691 drop(lock);
692
693 let handler = SequencedRepliesHandler {
694 source_id: self.source_id,
695 statesref: self.statesref.clone(),
696 };
697 let _ = session
698 .get(Selector::from((query_expr, seq_num_range)))
699 .callback({
700 move |r: Reply| {
701 if let Ok(s) = r.into_result() {
702 if key_expr.intersects(s.key_expr()) {
703 let states = &mut *zlock!(handler.statesref);
704 tracing::trace!(
705 "AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}",
706 states.key_expr,
707 s.source_info(),
708 s.timestamp()
709 );
710 handle_sample(states, s);
711 }
712 }
713 }
714 })
715 .consolidation(ConsolidationMode::None)
716 .accept_replies(ReplyKeyExpr::Any)
717 .target(query_target)
718 .timeout(query_timeout)
719 .wait();
720 }
721 }
722}
723
724#[zenoh_macros::unstable]
725impl<Handler> AdvancedSubscriber<Handler> {
726 fn new<H>(conf: AdvancedSubscriberBuilder<'_, '_, '_, H>) -> ZResult<Self>
727 where
728 H: IntoHandler<Sample, Handler = Handler> + Send,
729 {
730 let (callback, receiver) = conf.handler.into_handler();
731 let key_expr = conf.key_expr?;
732 let meta = match conf.meta_key_expr {
733 Some(meta) => Some(meta?),
734 None => None,
735 };
736 let retransmission = conf.retransmission;
737 let query_target = conf.query_target;
738 let query_timeout = conf.query_timeout;
739 let statesref = Arc::new(Mutex::new(State {
740 next_id: 0,
741 sequenced_states: HashMap::new(),
742 timestamped_states: HashMap::new(),
743 global_pending_queries: if conf.history.is_some() { 1 } else { 0 },
744 session: conf.session.downgrade(),
745 period: retransmission.as_ref().and_then(|r| {
746 let _rt = ZRuntime::Application.enter();
747 r.periodic_queries.map(|p| Period {
748 timer: Timer::new(false),
749 period: p,
750 })
751 }),
752 key_expr: key_expr.clone().into_owned(),
753 retransmission: retransmission.is_some(),
754 history_depth: conf
755 .history
756 .as_ref()
757 .and_then(|h| h.sample_depth)
758 .unwrap_or_default(),
759 query_target: conf.query_target,
760 query_timeout: conf.query_timeout,
761 callback: Some(callback),
762 miss_handlers: HashMap::new(),
763 token: None,
764 }));
765
766 let sub_callback = {
767 let statesref = statesref.clone();
768 let session = conf.session.downgrade();
769 let key_expr = key_expr.clone().into_owned();
770
771 move |s: Sample| {
772 let mut lock = zlock!(statesref);
773 let states = &mut *lock;
774 let source_id = s.source_info().map(|si| *si.source_id());
775 let new = handle_sample(states, s);
776
777 if let Some(source_id) = source_id {
778 if new {
779 spawn_periodic_queries!(states, source_id, statesref.clone());
780 }
781 if let Some(state) = states.sequenced_states.get_mut(&source_id) {
782 if retransmission.is_some()
783 && state.pending_queries == 0
784 && !state.pending_samples.is_empty()
785 {
786 state.pending_queries += 1;
787 let query_expr = &key_expr
788 / KE_ADV_PREFIX
789 / KE_STAR
790 / &source_id.zid().into_keyexpr()
791 / &KeyExpr::try_from(source_id.eid().to_string()).unwrap()
792 / KE_STARSTAR;
793 let seq_num_range =
794 seq_num_range(state.last_delivered.map(|s| s + 1), None);
795 tracing::trace!(
796 "AdvancedSubscriber{{key_expr: {}}}: Querying missing samples {}?{}",
797 states.key_expr,
798 query_expr,
799 seq_num_range
800 );
801 drop(lock);
802 let handler = SequencedRepliesHandler {
803 source_id,
804 statesref: statesref.clone(),
805 };
806 let _ = session
807 .get(Selector::from((query_expr, seq_num_range)))
808 .callback({
809 let key_expr = key_expr.clone().into_owned();
810 move |r: Reply| {
811 if let Ok(s) = r.into_result() {
812 if key_expr.intersects(s.key_expr()) {
813 let states = &mut *zlock!(handler.statesref);
814 tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
815 handle_sample(states, s);
816 }
817 }
818 }
819 })
820 .consolidation(ConsolidationMode::None)
821 .accept_replies(ReplyKeyExpr::Any)
822 .target(query_target)
823 .timeout(query_timeout)
824 .wait();
825 }
826 }
827 }
828 }
829 };
830
831 let drop_callback = {
834 let statesref = statesref.clone();
835 move || {
836 let mut states = statesref.lock().unwrap();
837 states.callback.take();
838 states.miss_handlers.clear();
839 }
840 };
841
842 let subscriber = conf
843 .session
844 .declare_subscriber(&key_expr)
845 .with(CallbackDrop {
846 callback: sub_callback,
847 drop: drop_callback,
848 })
849 .allowed_origin(conf.origin)
850 .wait()?;
851
852 tracing::debug!("Create AdvancedSubscriber{{key_expr: {}}}", key_expr,);
853
854 if let Some(historyconf) = conf.history.as_ref() {
855 let handler = InitialRepliesHandler {
856 statesref: statesref.clone(),
857 };
858 let mut params = Parameters::empty();
859 if let Some(max) = historyconf.sample_depth {
860 params.insert("_max", max.to_string());
861 }
862 if let Some(age) = historyconf.age {
863 params.set_time_range(TimeRange {
864 start: TimeBound::Inclusive(TimeExpr::Now { offset_secs: -age }),
865 end: TimeBound::Unbounded,
866 });
867 }
868 tracing::trace!(
869 "AdvancedSubscriber{{key_expr: {}}} Querying historical samples {}?{}",
870 key_expr,
871 &key_expr / KE_ADV_PREFIX / KE_STARSTAR,
872 params
873 );
874 let _ = conf
875 .session
876 .get(Selector::from((
877 &key_expr / KE_ADV_PREFIX / KE_STARSTAR,
878 params,
879 )))
880 .callback({
881 let key_expr = key_expr.clone().into_owned();
882 move |r: Reply| {
883 if let Ok(s) = r.into_result() {
884 if key_expr.intersects(s.key_expr()) {
885 let states = &mut *zlock!(handler.statesref);
886 tracing::trace!(
887 "AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}",
888 states.key_expr,
889 s.source_info(),
890 s.timestamp()
891 );
892 handle_sample(states, s);
893 }
894 }
895 }
896 })
897 .consolidation(ConsolidationMode::None)
898 .accept_replies(ReplyKeyExpr::Any)
899 .target(query_target)
900 .timeout(query_timeout)
901 .wait();
902 }
903
904 let liveliness_subscriber = if let Some(historyconf) = conf.history.as_ref() {
905 if historyconf.liveliness {
906 let live_callback = {
907 let session = conf.session.downgrade();
908 let statesref = statesref.clone();
909 let key_expr = key_expr.clone().into_owned();
910 let historyconf = historyconf.clone();
911 move |s: Sample| {
912 if s.kind() == SampleKind::Put {
913 if let Ok(parsed) = ke_liveliness::parse(s.key_expr().as_keyexpr()) {
914 if let Ok(zid) = ZenohId::from_str(parsed.zid().as_str()) {
915 if parsed.eid() == KE_UHLC {
918 let mut lock = zlock!(statesref);
919 let states = &mut *lock;
920 tracing::trace!(
921 "AdvancedSubscriber{{key_expr: {}}}: Detect late joiner publishers with zid={}",
922 states.key_expr,
923 parsed.zid().as_str()
924 );
925 let entry = states.timestamped_states.entry(ID::from(zid));
926 let state = entry.or_insert(SourceState::<Timestamp> {
927 last_delivered: None,
928 pending_queries: 0,
929 pending_samples: BTreeMap::new(),
930 });
931 state.pending_queries += 1;
932
933 let mut params = Parameters::empty();
934 if let Some(max) = historyconf.sample_depth {
935 params.insert("_max", max.to_string());
936 }
937 if let Some(age) = historyconf.age {
938 params.set_time_range(TimeRange {
939 start: TimeBound::Inclusive(TimeExpr::Now {
940 offset_secs: -age,
941 }),
942 end: TimeBound::Unbounded,
943 });
944 }
945 tracing::trace!(
946 "AdvancedSubscriber{{key_expr: {}}}: Querying historical samples {}?{}",
947 states.key_expr,
948 s.key_expr(),
949 params
950 );
951 drop(lock);
952
953 let handler = TimestampedRepliesHandler {
954 id: ID::from(zid),
955 statesref: statesref.clone(),
956 };
957 let _ = session
958 .get(Selector::from((s.key_expr(), params)))
959 .callback({
960 let key_expr = key_expr.clone().into_owned();
961 move |r: Reply| {
962 if let Ok(s) = r.into_result() {
963 if key_expr.intersects(s.key_expr()) {
964 let states =
965 &mut *zlock!(handler.statesref);
966 tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
967 handle_sample(states, s);
968 }
969 }
970 }
971 })
972 .consolidation(ConsolidationMode::None)
973 .accept_replies(ReplyKeyExpr::Any)
974 .target(query_target)
975 .timeout(query_timeout)
976 .wait();
977 } else if let Ok(eid) =
978 EntityId::from_str(parsed.eid().as_str())
979 {
980 let source_id = EntityGlobalId::new(zid, eid);
981 let mut lock = zlock!(statesref);
982 let states = &mut *lock;
983 tracing::trace!(
984 "AdvancedSubscriber{{key_expr: {}}}: Detect late joiner publishers with zid={}",
985 states.key_expr,
986 parsed.zid().as_str()
987 );
988 let entry = states.sequenced_states.entry(source_id);
989 let new = matches!(&entry, Entry::Vacant(_));
990 let state = entry.or_insert(SourceState::<WrappingSn> {
991 last_delivered: None,
992 pending_queries: 0,
993 pending_samples: BTreeMap::new(),
994 });
995 state.pending_queries += 1;
996
997 let mut params = Parameters::empty();
998 if let Some(max) = historyconf.sample_depth {
999 params.insert("_max", max.to_string());
1000 }
1001 if let Some(age) = historyconf.age {
1002 params.set_time_range(TimeRange {
1003 start: TimeBound::Inclusive(TimeExpr::Now {
1004 offset_secs: -age,
1005 }),
1006 end: TimeBound::Unbounded,
1007 });
1008 }
1009 tracing::trace!(
1010 "AdvancedSubscriber{{key_expr: {}}}: Querying historical samples {}?{}",
1011 states.key_expr,
1012 s.key_expr(),
1013 params,
1014 );
1015 drop(lock);
1016
1017 let handler = SequencedRepliesHandler {
1018 source_id,
1019 statesref: statesref.clone(),
1020 };
1021 let _ = session
1022 .get(Selector::from((s.key_expr(), params)))
1023 .callback({
1024 let key_expr = key_expr.clone().into_owned();
1025 move |r: Reply| {
1026 if let Ok(s) = r.into_result() {
1027 if key_expr.intersects(s.key_expr()) {
1028 let states =
1029 &mut *zlock!(handler.statesref);
1030 tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
1031 handle_sample(states, s);
1032 }
1033 }
1034 }
1035 })
1036 .consolidation(ConsolidationMode::None)
1037 .accept_replies(ReplyKeyExpr::Any)
1038 .target(query_target)
1039 .timeout(query_timeout)
1040 .wait();
1041
1042 if new {
1043 spawn_periodic_queries!(
1044 zlock!(statesref),
1045 source_id,
1046 statesref.clone()
1047 );
1048 }
1049 }
1050 } else {
1051 let mut lock = zlock!(statesref);
1052 let states = &mut *lock;
1053 tracing::trace!(
1054 "AdvancedSubscriber{{key_expr: {}}}: Detect late joiner publishers with zid={}",
1055 states.key_expr,
1056 parsed.zid().as_str()
1057 );
1058 states.global_pending_queries += 1;
1059
1060 let mut params = Parameters::empty();
1061 if let Some(max) = historyconf.sample_depth {
1062 params.insert("_max", max.to_string());
1063 }
1064 if let Some(age) = historyconf.age {
1065 params.set_time_range(TimeRange {
1066 start: TimeBound::Inclusive(TimeExpr::Now {
1067 offset_secs: -age,
1068 }),
1069 end: TimeBound::Unbounded,
1070 });
1071 }
1072 tracing::trace!(
1073 "AdvancedSubscriber{{key_expr: {}}}: Querying historical samples {}?{}",
1074 states.key_expr,
1075 s.key_expr(),
1076 params,
1077 );
1078 drop(lock);
1079
1080 let handler = InitialRepliesHandler {
1081 statesref: statesref.clone(),
1082 };
1083 let _ = session
1084 .get(Selector::from((s.key_expr(), params)))
1085 .callback({
1086 let key_expr = key_expr.clone().into_owned();
1087 move |r: Reply| {
1088 if let Ok(s) = r.into_result() {
1089 if key_expr.intersects(s.key_expr()) {
1090 let states =
1091 &mut *zlock!(handler.statesref);
1092 tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
1093 handle_sample(states, s);
1094 }
1095 }
1096 }
1097 })
1098 .consolidation(ConsolidationMode::None)
1099 .accept_replies(ReplyKeyExpr::Any)
1100 .target(query_target)
1101 .timeout(query_timeout)
1102 .wait();
1103 }
1104 } else {
1105 tracing::warn!(
1106 "AdvancedSubscriber{{}}: Received malformed liveliness token key expression: {}",
1107 s.key_expr()
1108 );
1109 }
1110 }
1111 }
1112 };
1113
1114 tracing::debug!(
1115 "AdvancedSubscriber{{key_expr: {}}}: Detect late joiner publishers on {}",
1116 key_expr,
1117 &key_expr / KE_ADV_PREFIX / KE_PUB / KE_STARSTAR
1118 );
1119 Some(
1120 conf.session
1121 .liveliness()
1122 .declare_subscriber(&key_expr / KE_ADV_PREFIX / KE_PUB / KE_STARSTAR)
1123 .history(true)
1125 .callback(live_callback)
1126 .wait()?,
1127 )
1128 } else {
1129 None
1130 }
1131 } else {
1132 None
1133 };
1134
1135 let heartbeat_subscriber = if retransmission.is_some_and(|r| r.heartbeat) {
1136 let ke_heartbeat_sub = &key_expr / KE_ADV_PREFIX / KE_PUB / KE_STARSTAR;
1137 let statesref = statesref.clone();
1138 tracing::debug!(
1139 "AdvancedSubscriber{{key_expr: {}}}: Enable heartbeat subscriber on {}",
1140 key_expr,
1141 ke_heartbeat_sub
1142 );
1143 let heartbeat_sub = conf
1144 .session
1145 .declare_subscriber(ke_heartbeat_sub)
1146 .callback(move |sample_hb| {
1147 if sample_hb.kind() != SampleKind::Put {
1148 return;
1149 }
1150
1151 let heartbeat_keyexpr = sample_hb.key_expr().as_keyexpr();
1152 let Ok(parsed_keyexpr) = ke_liveliness::parse(heartbeat_keyexpr) else {
1153 return;
1154 };
1155 let source_id = {
1156 let Ok(zid) = ZenohId::from_str(parsed_keyexpr.zid().as_str()) else {
1157 return;
1158 };
1159 let Ok(eid) = EntityId::from_str(parsed_keyexpr.eid().as_str()) else {
1160 return;
1161 };
1162 EntityGlobalId::new(zid, eid)
1163 };
1164
1165 let Ok(heartbeat_sn) = z_deserialize::<WrappingSn>(sample_hb.payload()) else {
1166 tracing::debug!(
1167 "AdvancedSubscriber{{}}: Skipping invalid heartbeat payload on '{}'",
1168 heartbeat_keyexpr
1169 );
1170 return;
1171 };
1172
1173 let mut lock = zlock!(statesref);
1174 let states = &mut *lock;
1175 let entry = states.sequenced_states.entry(source_id);
1176 if matches!(&entry, Entry::Vacant(_)) {
1177 spawn_periodic_queries!(states, source_id, statesref.clone());
1179 if states.global_pending_queries > 0 {
1180 tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Skipping heartbeat on '{}' from publisher that is currently being pulled by global query", states.key_expr, heartbeat_keyexpr);
1181 return;
1182 }
1183 }
1184
1185 let state = entry.or_insert(SourceState::<WrappingSn> {
1186 last_delivered: None,
1187 pending_queries: 0,
1188 pending_samples: BTreeMap::new(),
1189 });
1190
1191 if (state.last_delivered.is_none()
1193 || state.last_delivered.is_some_and(|sn| heartbeat_sn > sn))
1194 && state.pending_queries == 0
1195 {
1196 let seq_num_range = seq_num_range(
1197 state.last_delivered.map(|s| s + 1),
1198 Some(heartbeat_sn),
1199 );
1200
1201 let session = states.session.clone();
1202 let key_expr = states.key_expr.clone().into_owned();
1203 let query_target = states.query_target;
1204 let query_timeout = states.query_timeout;
1205 state.pending_queries += 1;
1206
1207 tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Querying missing samples {}?{}", states.key_expr, heartbeat_keyexpr, seq_num_range);
1208 drop(lock);
1209
1210 let handler = SequencedRepliesHandler {
1211 source_id,
1212 statesref: statesref.clone(),
1213 };
1214 let _ = session
1215 .get(Selector::from((heartbeat_keyexpr, seq_num_range)))
1216 .callback({
1217 move |r: Reply| {
1218 if let Ok(s) = r.into_result() {
1219 if key_expr.intersects(s.key_expr()) {
1220 let states = &mut *zlock!(handler.statesref);
1221 tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
1222 handle_sample(states, s);
1223 }
1224 }
1225 }
1226 })
1227 .consolidation(ConsolidationMode::None)
1228 .accept_replies(ReplyKeyExpr::Any)
1229 .target(query_target)
1230 .timeout(query_timeout)
1231 .wait();
1232 }
1233 })
1234 .allowed_origin(conf.origin)
1235 .wait()?;
1236 Some(heartbeat_sub)
1237 } else {
1238 None
1239 };
1240
1241 if conf.liveliness {
1242 let suffix = KE_ADV_PREFIX
1243 / KE_SUB
1244 / &subscriber.id().zid().into_keyexpr()
1245 / &KeyExpr::try_from(subscriber.id().eid().to_string()).unwrap();
1246 let suffix = match meta {
1247 Some(meta) => suffix / &meta,
1248 _ => suffix / KE_EMPTY,
1250 };
1251 tracing::debug!(
1252 "AdvancedSubscriber{{key_expr: {}}}: Declare liveliness token {}",
1253 key_expr,
1254 &key_expr / &suffix,
1255 );
1256 let token = conf
1257 .session
1258 .liveliness()
1259 .declare_token(&key_expr / &suffix)
1260 .wait()?;
1261 zlock!(statesref).token = Some(token)
1262 }
1263
1264 let reliable_subscriber = AdvancedSubscriber {
1265 statesref,
1266 subscriber,
1267 receiver,
1268 liveliness_subscriber,
1269 heartbeat_subscriber,
1270 };
1271
1272 Ok(reliable_subscriber)
1273 }
1274
1275 #[zenoh_macros::unstable]
1277 pub fn id(&self) -> EntityGlobalId {
1278 self.subscriber.id()
1279 }
1280
1281 #[zenoh_macros::unstable]
1283 pub fn key_expr(&self) -> &KeyExpr<'static> {
1284 self.subscriber.key_expr()
1285 }
1286
1287 #[zenoh_macros::unstable]
1292 pub fn handler(&self) -> &Handler {
1293 &self.receiver
1294 }
1295
1296 #[zenoh_macros::unstable]
1301 pub fn handler_mut(&mut self) -> &mut Handler {
1302 &mut self.receiver
1303 }
1304
1305 #[zenoh_macros::unstable]
1310 pub fn sample_miss_listener(&self) -> SampleMissListenerBuilder<'_, DefaultHandler> {
1311 SampleMissListenerBuilder {
1312 statesref: &self.statesref,
1313 handler: DefaultHandler::default(),
1314 }
1315 }
1316
1317 #[zenoh_macros::unstable]
1322 pub fn detect_publishers(&self) -> LivelinessSubscriberBuilder<'_, '_, DefaultHandler> {
1323 self.subscriber
1324 .session()
1325 .liveliness()
1326 .declare_subscriber(self.subscriber.key_expr() / KE_ADV_PREFIX / KE_PUB / KE_STARSTAR)
1327 }
1328
1329 #[inline]
1331 #[zenoh_macros::unstable]
1332 pub fn undeclare(self) -> SubscriberUndeclaration<()> {
1333 tracing::debug!(
1334 "AdvancedSubscriber{{key_expr: {}}}: Undeclare",
1335 self.key_expr()
1336 );
1337 self.subscriber.undeclare()
1338 }
1339
1340 fn set_background_impl(&mut self, background: bool) {
1341 self.subscriber.set_background(background);
1342 if let Some(mut liveliness_sub) = self.liveliness_subscriber.take() {
1343 liveliness_sub.set_background(background);
1344 }
1345 if let Some(mut heartbeat_sub) = self.heartbeat_subscriber.take() {
1346 heartbeat_sub.set_background(background);
1347 }
1348 }
1349
1350 #[zenoh_macros::internal]
1351 pub fn set_background(&mut self, background: bool) {
1352 self.set_background_impl(background)
1353 }
1354}
1355
1356#[zenoh_macros::unstable]
1357#[inline]
1358fn flush_sequenced_source(
1359 state: &mut SourceState<WrappingSn>,
1360 callback: Option<&Callback<Sample>>,
1361 source_id: &EntityGlobalId,
1362 miss_handlers: &HashMap<usize, Callback<Miss>>,
1363) {
1364 let Some(callback) = callback else {
1365 return;
1366 };
1367 if state.pending_queries == 0 && !state.pending_samples.is_empty() {
1368 let mut pending_samples = BTreeMap::new();
1369 std::mem::swap(&mut state.pending_samples, &mut pending_samples);
1370 for (seq_num, sample) in pending_samples {
1371 match state.last_delivered {
1372 None => {
1373 state.last_delivered = Some(seq_num);
1374 callback.call(sample);
1375 }
1376 Some(last) if seq_num == last + 1 => {
1377 state.last_delivered = Some(seq_num);
1378 callback.call(sample);
1379 }
1380 Some(last) if seq_num > last + 1 => {
1381 tracing::warn!(
1382 "Sample missed: missed {} samples from {:?}.",
1383 seq_num - last - 1,
1384 source_id,
1385 );
1386 for miss_callback in miss_handlers.values() {
1387 miss_callback.call(Miss {
1388 source: *source_id,
1389 nb: seq_num - last - 1,
1390 })
1391 }
1392 state.last_delivered = Some(seq_num);
1393 callback.call(sample);
1394 }
1395 _ => {
1396 }
1398 }
1399 }
1400 }
1401}
1402
1403#[zenoh_macros::unstable]
1404#[inline]
1405fn flush_timestamped_source(
1406 state: &mut SourceState<Timestamp>,
1407 callback: Option<&Callback<Sample>>,
1408) {
1409 let Some(callback) = callback else {
1410 return;
1411 };
1412 if state.pending_queries == 0 && !state.pending_samples.is_empty() {
1413 let mut pending_samples = BTreeMap::new();
1414 std::mem::swap(&mut state.pending_samples, &mut pending_samples);
1415 for (timestamp, sample) in pending_samples {
1416 if state
1417 .last_delivered
1418 .map(|last| timestamp > last)
1419 .unwrap_or(true)
1420 {
1421 state.last_delivered = Some(timestamp);
1422 callback.call(sample);
1423 }
1424 }
1425 }
1426}
1427
1428#[zenoh_macros::unstable]
1429#[derive(Clone)]
1430struct InitialRepliesHandler {
1431 statesref: Arc<Mutex<State>>,
1432}
1433
1434#[zenoh_macros::unstable]
1435impl Drop for InitialRepliesHandler {
1436 fn drop(&mut self) {
1437 let states = &mut *zlock!(self.statesref);
1438 states.global_pending_queries = states.global_pending_queries.saturating_sub(1);
1439 tracing::trace!(
1440 "AdvancedSubscriber{{key_expr: {}}}: Flush initial replies",
1441 states.key_expr
1442 );
1443
1444 if states.global_pending_queries == 0 {
1445 for (source_id, state) in states.sequenced_states.iter_mut() {
1446 flush_sequenced_source(
1447 state,
1448 states.callback.as_ref(),
1449 source_id,
1450 &states.miss_handlers,
1451 );
1452 spawn_periodic_queries!(states, *source_id, self.statesref.clone());
1453 }
1454 for state in states.timestamped_states.values_mut() {
1455 flush_timestamped_source(state, states.callback.as_ref());
1456 }
1457 }
1458 }
1459}
1460
1461#[zenoh_macros::unstable]
1462#[derive(Clone)]
1463struct SequencedRepliesHandler {
1464 source_id: EntityGlobalId,
1465 statesref: Arc<Mutex<State>>,
1466}
1467
1468#[zenoh_macros::unstable]
1469impl Drop for SequencedRepliesHandler {
1470 fn drop(&mut self) {
1471 let states = &mut *zlock!(self.statesref);
1472 if let Some(state) = states.sequenced_states.get_mut(&self.source_id) {
1473 state.pending_queries = state.pending_queries.saturating_sub(1);
1474 if states.global_pending_queries == 0 {
1475 tracing::trace!(
1476 "AdvancedSubscriber{{key_expr: {}}}: Flush sequenced samples",
1477 states.key_expr
1478 );
1479 flush_sequenced_source(
1480 state,
1481 states.callback.as_ref(),
1482 &self.source_id,
1483 &states.miss_handlers,
1484 )
1485 }
1486 }
1487 }
1488}
1489
1490#[zenoh_macros::unstable]
1491#[derive(Clone)]
1492struct TimestampedRepliesHandler {
1493 id: ID,
1494 statesref: Arc<Mutex<State>>,
1495}
1496
1497#[zenoh_macros::unstable]
1498impl Drop for TimestampedRepliesHandler {
1499 fn drop(&mut self) {
1500 let states = &mut *zlock!(self.statesref);
1501 if let Some(state) = states.timestamped_states.get_mut(&self.id) {
1502 state.pending_queries = state.pending_queries.saturating_sub(1);
1503 if states.global_pending_queries == 0 {
1504 tracing::trace!(
1505 "AdvancedSubscriber{{key_expr: {}}}: Flush timestamped samples",
1506 states.key_expr
1507 );
1508 flush_timestamped_source(state, states.callback.as_ref());
1509 }
1510 }
1511 }
1512}
1513
1514#[zenoh_macros::unstable]
1516#[derive(Debug, Clone)]
1517pub struct Miss {
1518 source: EntityGlobalId,
1519 nb: u32,
1520}
1521
1522impl Miss {
1523 pub fn source(&self) -> EntityGlobalId {
1525 self.source
1526 }
1527
1528 pub fn nb(&self) -> u32 {
1530 self.nb
1531 }
1532}
1533
1534impl CallbackParameter for Miss {
1535 type Message<'a> = Self;
1536
1537 fn from_message(msg: Self::Message<'_>) -> Self {
1538 msg
1539 }
1540}
1541
1542#[zenoh_macros::unstable]
1547pub struct SampleMissListener<Handler> {
1548 id: usize,
1549 statesref: Arc<Mutex<State>>,
1550 handler: Handler,
1551 undeclare_on_drop: bool,
1552}
1553
1554#[zenoh_macros::unstable]
1555impl<Handler> SampleMissListener<Handler> {
1556 #[inline]
1557 pub fn undeclare(self) -> SampleMissHandlerUndeclaration<Handler>
1558 where
1559 Handler: Send,
1560 {
1561 SampleMissHandlerUndeclaration { listener: self }
1562 }
1563
1564 fn undeclare_impl(&mut self) -> ZResult<()> {
1565 self.undeclare_on_drop = false;
1567 zlock!(self.statesref).unregister_miss_callback(&self.id);
1568 Ok(())
1569 }
1570
1571 #[zenoh_macros::internal]
1572 pub fn set_background(&mut self, background: bool) {
1573 self.undeclare_on_drop = !background;
1574 }
1575}
1576
1577#[cfg(feature = "unstable")]
1578impl<Handler> Drop for SampleMissListener<Handler> {
1579 fn drop(&mut self) {
1580 if self.undeclare_on_drop {
1581 if let Err(error) = self.undeclare_impl() {
1582 tracing::error!(error);
1583 }
1584 }
1585 }
1586}
1587
1588#[zenoh_macros::unstable]
1598impl<Handler> std::ops::Deref for SampleMissListener<Handler> {
1599 type Target = Handler;
1600
1601 fn deref(&self) -> &Self::Target {
1602 &self.handler
1603 }
1604}
1605#[zenoh_macros::unstable]
1606impl<Handler> std::ops::DerefMut for SampleMissListener<Handler> {
1607 fn deref_mut(&mut self) -> &mut Self::Target {
1608 &mut self.handler
1609 }
1610}
1611
1612#[zenoh_macros::unstable]
1614pub struct SampleMissHandlerUndeclaration<Handler> {
1615 listener: SampleMissListener<Handler>,
1616}
1617
1618impl<Handler> SampleMissHandlerUndeclaration<Handler> {
1619 pub fn wait_callbacks(self) -> Self {
1621 self
1624 }
1625}
1626
1627#[zenoh_macros::unstable]
1628impl<Handler> Resolvable for SampleMissHandlerUndeclaration<Handler> {
1629 type To = ZResult<()>;
1630}
1631
1632#[zenoh_macros::unstable]
1633impl<Handler> Wait for SampleMissHandlerUndeclaration<Handler> {
1634 fn wait(mut self) -> <Self as Resolvable>::To {
1635 self.listener.undeclare_impl()
1636 }
1637}
1638
1639#[zenoh_macros::unstable]
1640impl<Handler> IntoFuture for SampleMissHandlerUndeclaration<Handler> {
1641 type Output = <Self as Resolvable>::To;
1642 type IntoFuture = Ready<<Self as Resolvable>::To>;
1643
1644 fn into_future(self) -> Self::IntoFuture {
1645 std::future::ready(self.wait())
1646 }
1647}
1648
1649#[zenoh_macros::unstable]
1651pub struct SampleMissListenerBuilder<'a, Handler, const BACKGROUND: bool = false> {
1652 statesref: &'a Arc<Mutex<State>>,
1653 handler: Handler,
1654}
1655
1656#[zenoh_macros::unstable]
1657impl<'a> SampleMissListenerBuilder<'a, DefaultHandler> {
1658 #[inline]
1660 #[zenoh_macros::unstable]
1661 pub fn callback<F>(self, callback: F) -> SampleMissListenerBuilder<'a, Callback<Miss>>
1662 where
1663 F: Fn(Miss) + Send + Sync + 'static,
1664 {
1665 self.with(Callback::from(callback))
1666 }
1667
1668 #[inline]
1670 #[zenoh_macros::unstable]
1671 pub fn callback_mut<F>(self, callback: F) -> SampleMissListenerBuilder<'a, Callback<Miss>>
1672 where
1673 F: FnMut(Miss) + Send + Sync + 'static,
1674 {
1675 self.callback(zenoh::handlers::locked(callback))
1676 }
1677
1678 #[inline]
1680 #[zenoh_macros::unstable]
1681 pub fn with<Handler>(self, handler: Handler) -> SampleMissListenerBuilder<'a, Handler>
1682 where
1683 Handler: IntoHandler<Miss>,
1684 {
1685 SampleMissListenerBuilder {
1686 statesref: self.statesref,
1687 handler,
1688 }
1689 }
1690}
1691
1692#[zenoh_macros::unstable]
1693impl<'a> SampleMissListenerBuilder<'a, Callback<Miss>> {
1694 #[zenoh_macros::unstable]
1698 pub fn background(self) -> SampleMissListenerBuilder<'a, Callback<Miss>, true> {
1699 SampleMissListenerBuilder {
1700 statesref: self.statesref,
1701 handler: self.handler,
1702 }
1703 }
1704}
1705
1706#[zenoh_macros::unstable]
1707impl<Handler> Resolvable for SampleMissListenerBuilder<'_, Handler>
1708where
1709 Handler: IntoHandler<Miss> + Send,
1710 Handler::Handler: Send,
1711{
1712 type To = ZResult<SampleMissListener<Handler::Handler>>;
1713}
1714
1715#[zenoh_macros::unstable]
1716impl<Handler> Wait for SampleMissListenerBuilder<'_, Handler>
1717where
1718 Handler: IntoHandler<Miss> + Send,
1719 Handler::Handler: Send,
1720{
1721 #[zenoh_macros::unstable]
1722 fn wait(self) -> <Self as Resolvable>::To {
1723 let (callback, handler) = self.handler.into_handler();
1724 let id = zlock!(self.statesref).register_miss_callback(callback);
1725 Ok(SampleMissListener {
1726 id,
1727 statesref: self.statesref.clone(),
1728 handler,
1729 undeclare_on_drop: true,
1730 })
1731 }
1732}
1733
1734#[zenoh_macros::unstable]
1735impl<Handler> IntoFuture for SampleMissListenerBuilder<'_, Handler>
1736where
1737 Handler: IntoHandler<Miss> + Send,
1738 Handler::Handler: Send,
1739{
1740 type Output = <Self as Resolvable>::To;
1741 type IntoFuture = Ready<<Self as Resolvable>::To>;
1742
1743 #[zenoh_macros::unstable]
1744 fn into_future(self) -> Self::IntoFuture {
1745 std::future::ready(self.wait())
1746 }
1747}
1748
1749#[zenoh_macros::unstable]
1750impl Resolvable for SampleMissListenerBuilder<'_, Callback<Miss>, true> {
1751 type To = ZResult<()>;
1752}
1753
1754#[zenoh_macros::unstable]
1755impl Wait for SampleMissListenerBuilder<'_, Callback<Miss>, true> {
1756 #[zenoh_macros::unstable]
1757 fn wait(self) -> <Self as Resolvable>::To {
1758 let (callback, _) = self.handler.into_handler();
1759 zlock!(self.statesref).register_miss_callback(callback);
1760 Ok(())
1761 }
1762}
1763
1764#[zenoh_macros::unstable]
1765impl IntoFuture for SampleMissListenerBuilder<'_, Callback<Miss>, true> {
1766 type Output = <Self as Resolvable>::To;
1767 type IntoFuture = Ready<<Self as Resolvable>::To>;
1768
1769 #[zenoh_macros::unstable]
1770 fn into_future(self) -> Self::IntoFuture {
1771 std::future::ready(self.wait())
1772 }
1773}