zenoh_ext/
advanced_subscriber.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::{collections::BTreeMap, future::IntoFuture, str::FromStr};
15
16use zenoh::{
17    config::ZenohId,
18    handlers::{Callback, CallbackParameter, IntoHandler},
19    key_expr::KeyExpr,
20    liveliness::{LivelinessSubscriberBuilder, LivelinessToken},
21    pubsub::SubscriberBuilder,
22    query::{
23        ConsolidationMode, Parameters, Selector, TimeBound, TimeExpr, TimeRange, ZenohParameters,
24    },
25    sample::{Locality, Sample, SampleKind, SourceSn},
26    session::{EntityGlobalId, EntityId},
27    Resolvable, Resolve, Session, Wait, KE_ADV_PREFIX, KE_EMPTY, KE_PUB, KE_STAR, KE_STARSTAR,
28    KE_SUB,
29};
30use zenoh_util::{Timed, TimedEvent, Timer};
31#[zenoh_macros::unstable]
32use {
33    async_trait::async_trait,
34    std::collections::hash_map::Entry,
35    std::collections::HashMap,
36    std::convert::TryFrom,
37    std::future::Ready,
38    std::sync::{Arc, Mutex},
39    std::time::Duration,
40    uhlc::ID,
41    zenoh::handlers::{locked, DefaultHandler},
42    zenoh::internal::{runtime::ZRuntime, zlock},
43    zenoh::pubsub::Subscriber,
44    zenoh::query::{QueryTarget, Reply, ReplyKeyExpr},
45    zenoh::time::Timestamp,
46    zenoh::Result as ZResult,
47};
48
49use crate::{
50    advanced_cache::{ke_liveliness, KE_UHLC},
51    z_deserialize,
52};
53
54#[derive(Debug, Default, Clone)]
55/// Configure query for historical data for [`history`](crate::AdvancedSubscriberBuilder::history) method.
56#[zenoh_macros::unstable]
57pub struct HistoryConfig {
58    liveliness: bool,
59    sample_depth: Option<usize>,
60    age: Option<f64>,
61}
62
63#[zenoh_macros::unstable]
64impl HistoryConfig {
65    /// Enable detection of late joiner publishers and query for their historical data.
66    ///
67    /// Late joiner detection can only be achieved for [`AdvancedPublishers`](crate::AdvancedPublisher) that enable publisher_detection.
68    /// History can only be retransmitted by [`AdvancedPublishers`](crate::AdvancedPublisher) that enable [`cache`](crate::AdvancedPublisherBuilder::cache).
69    #[inline]
70    #[zenoh_macros::unstable]
71    pub fn detect_late_publishers(mut self) -> Self {
72        self.liveliness = true;
73        self
74    }
75
76    /// Specify how many samples to query for each resource.
77    #[zenoh_macros::unstable]
78    pub fn max_samples(mut self, depth: usize) -> Self {
79        self.sample_depth = Some(depth);
80        self
81    }
82
83    /// Specify the maximum age of samples to query.
84    #[zenoh_macros::unstable]
85    pub fn max_age(mut self, seconds: f64) -> Self {
86        self.age = Some(seconds);
87        self
88    }
89}
90
91#[derive(Debug, Default, Clone, Copy)]
92/// Configure retransmission.
93#[zenoh_macros::unstable]
94pub struct RecoveryConfig<const CONFIGURED: bool = true> {
95    periodic_queries: Option<Duration>,
96    heartbeat: bool,
97}
98
99#[zenoh_macros::unstable]
100impl RecoveryConfig<false> {
101    /// Enable periodic queries for not yet received Samples and specify their period.
102    ///
103    /// This allows retrieving the last Sample(s) if the last Sample(s) is/are lost.
104    /// So it is useful for sporadic publications but useless for periodic publications
105    /// with a period smaller or equal to this period.
106    /// Retransmission can only be achieved by [`AdvancedPublishers`](crate::AdvancedPublisher)
107    /// that enable [`cache`](crate::AdvancedPublisherBuilder::cache) and
108    /// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection).
109    #[zenoh_macros::unstable]
110    #[inline]
111    pub fn periodic_queries(self, period: Duration) -> RecoveryConfig<true> {
112        RecoveryConfig {
113            periodic_queries: Some(period),
114            heartbeat: false,
115        }
116    }
117
118    /// Subscribe to heartbeats of [`AdvancedPublishers`](crate::AdvancedPublisher).
119    ///
120    /// This allows receiving the last published Sample's sequence number and check for misses.
121    /// Heartbeat subscriber must be paired with [`AdvancedPublishers`](crate::AdvancedPublisher)
122    /// that enable [`cache`](crate::AdvancedPublisherBuilder::cache) and
123    /// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection) with
124    /// [`heartbeat`](crate::advanced_publisher::MissDetectionConfig::heartbeat) or
125    /// [`sporadic_heartbeat`](crate::advanced_publisher::MissDetectionConfig::sporadic_heartbeat).
126    #[zenoh_macros::unstable]
127    #[inline]
128    pub fn heartbeat(self) -> RecoveryConfig<true> {
129        RecoveryConfig {
130            periodic_queries: None,
131            heartbeat: true,
132        }
133    }
134}
135
136/// The builder of an [`AdvancedSubscriber`], allowing to configure it.
137#[zenoh_macros::unstable]
138pub struct AdvancedSubscriberBuilder<'a, 'b, 'c, Handler, const BACKGROUND: bool = false> {
139    pub(crate) session: &'a Session,
140    pub(crate) key_expr: ZResult<KeyExpr<'b>>,
141    pub(crate) origin: Locality,
142    pub(crate) retransmission: Option<RecoveryConfig>,
143    pub(crate) query_target: QueryTarget,
144    pub(crate) query_timeout: Duration,
145    pub(crate) history: Option<HistoryConfig>,
146    pub(crate) liveliness: bool,
147    pub(crate) meta_key_expr: Option<ZResult<KeyExpr<'c>>>,
148    pub(crate) handler: Handler,
149}
150
151#[zenoh_macros::unstable]
152impl<'a, 'b, Handler> AdvancedSubscriberBuilder<'a, 'b, '_, Handler> {
153    #[zenoh_macros::unstable]
154    pub(crate) fn new(builder: SubscriberBuilder<'a, 'b, Handler>) -> Self {
155        AdvancedSubscriberBuilder {
156            session: builder.session,
157            key_expr: builder.key_expr,
158            origin: builder.origin,
159            handler: builder.handler,
160            retransmission: None,
161            query_target: QueryTarget::All,
162            query_timeout: Duration::from_secs(10),
163            history: None,
164            liveliness: false,
165            meta_key_expr: None,
166        }
167    }
168}
169
170#[zenoh_macros::unstable]
171impl<'a, 'b, 'c> AdvancedSubscriberBuilder<'a, 'b, 'c, DefaultHandler> {
172    /// Add callback to AdvancedSubscriber.
173    #[inline]
174    #[zenoh_macros::unstable]
175    pub fn callback<F>(self, callback: F) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback<Sample>>
176    where
177        F: Fn(Sample) + Send + Sync + 'static,
178    {
179        self.with(Callback::from(callback))
180    }
181
182    /// Add callback to `AdvancedSubscriber`.
183    ///
184    /// Using this guarantees that your callback will never be called concurrently.
185    /// If your callback is also accepted by the [`callback`](AdvancedSubscriberBuilder::callback) method, we suggest you use it instead of `callback_mut`
186    #[inline]
187    #[zenoh_macros::unstable]
188    pub fn callback_mut<F>(
189        self,
190        callback: F,
191    ) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback<Sample>>
192    where
193        F: FnMut(Sample) + Send + Sync + 'static,
194    {
195        self.callback(locked(callback))
196    }
197
198    /// Make the built AdvancedSubscriber an [`AdvancedSubscriber`](AdvancedSubscriber).
199    #[inline]
200    #[zenoh_macros::unstable]
201    pub fn with<Handler>(self, handler: Handler) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler>
202    where
203        Handler: IntoHandler<Sample>,
204    {
205        AdvancedSubscriberBuilder {
206            session: self.session,
207            key_expr: self.key_expr,
208            origin: self.origin,
209            retransmission: self.retransmission,
210            query_target: self.query_target,
211            query_timeout: self.query_timeout,
212            history: self.history,
213            liveliness: self.liveliness,
214            meta_key_expr: self.meta_key_expr,
215            handler,
216        }
217    }
218}
219
220#[zenoh_macros::unstable]
221impl<'a, 'b, 'c> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback<Sample>> {
222    /// Make the subscriber run in background until the session is closed.
223    ///
224    /// Background builder doesn't return a `AdvancedSubscriber` object anymore.
225    pub fn background(self) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback<Sample>, true> {
226        AdvancedSubscriberBuilder {
227            session: self.session,
228            key_expr: self.key_expr,
229            origin: self.origin,
230            retransmission: self.retransmission,
231            query_target: self.query_target,
232            query_timeout: self.query_timeout,
233            history: self.history,
234            liveliness: self.liveliness,
235            meta_key_expr: self.meta_key_expr,
236            handler: self.handler,
237        }
238    }
239}
240
241#[zenoh_macros::unstable]
242impl<'a, 'c, Handler, const BACKGROUND: bool>
243    AdvancedSubscriberBuilder<'a, '_, 'c, Handler, BACKGROUND>
244{
245    /// Restrict the matching publications that will be received by this [`Subscriber`]
246    /// to the ones that have the given [`Locality`](crate::prelude::Locality).
247    #[zenoh_macros::unstable]
248    #[inline]
249    pub fn allowed_origin(mut self, origin: Locality) -> Self {
250        self.origin = origin;
251        self
252    }
253
254    /// Ask for retransmission of detected lost Samples.
255    ///
256    /// Retransmission can only be achieved by [`AdvancedPublishers`](crate::AdvancedPublisher)
257    /// that enable [`cache`](crate::AdvancedPublisherBuilder::cache) and
258    /// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection).
259    #[zenoh_macros::unstable]
260    #[inline]
261    pub fn recovery(mut self, conf: RecoveryConfig) -> Self {
262        self.retransmission = Some(conf);
263        self
264    }
265
266    // /// Change the target to be used for queries.
267
268    // #[inline]
269    // pub fn query_target(mut self, query_target: QueryTarget) -> Self {
270    //     self.query_target = query_target;
271    //     self
272    // }
273
274    /// Change the timeout to be used for queries (history, retransmission).
275    #[zenoh_macros::unstable]
276    #[inline]
277    pub fn query_timeout(mut self, query_timeout: Duration) -> Self {
278        self.query_timeout = query_timeout;
279        self
280    }
281
282    /// Enable query for historical data.
283    ///
284    /// History can only be retransmitted by [`AdvancedPublishers`](crate::AdvancedPublisher) that enable [`cache`](crate::AdvancedPublisherBuilder::cache).
285    #[zenoh_macros::unstable]
286    #[inline]
287    pub fn history(mut self, config: HistoryConfig) -> Self {
288        self.history = Some(config);
289        self
290    }
291
292    /// Allow this subscriber to be detected through liveliness.
293    #[zenoh_macros::unstable]
294    pub fn subscriber_detection(mut self) -> Self {
295        self.liveliness = true;
296        self
297    }
298
299    /// A key expression added to the liveliness token key expression.
300    /// It can be used to convey metadata.
301    #[zenoh_macros::unstable]
302    pub fn subscriber_detection_metadata<TryIntoKeyExpr>(mut self, meta: TryIntoKeyExpr) -> Self
303    where
304        TryIntoKeyExpr: TryInto<KeyExpr<'c>>,
305        <TryIntoKeyExpr as TryInto<KeyExpr<'c>>>::Error: Into<zenoh::Error>,
306    {
307        self.meta_key_expr = Some(meta.try_into().map_err(Into::into));
308        self
309    }
310
311    #[zenoh_macros::unstable]
312    fn with_static_keys(self) -> AdvancedSubscriberBuilder<'a, 'static, 'static, Handler> {
313        AdvancedSubscriberBuilder {
314            session: self.session,
315            key_expr: self.key_expr.map(|s| s.into_owned()),
316            origin: self.origin,
317            retransmission: self.retransmission,
318            query_target: self.query_target,
319            query_timeout: self.query_timeout,
320            history: self.history,
321            liveliness: self.liveliness,
322            meta_key_expr: self.meta_key_expr.map(|s| s.map(|s| s.into_owned())),
323            handler: self.handler,
324        }
325    }
326}
327
328#[zenoh_macros::unstable]
329impl<Handler> Resolvable for AdvancedSubscriberBuilder<'_, '_, '_, Handler>
330where
331    Handler: IntoHandler<Sample>,
332    Handler::Handler: Send,
333{
334    type To = ZResult<AdvancedSubscriber<Handler::Handler>>;
335}
336
337#[zenoh_macros::unstable]
338impl<Handler> Wait for AdvancedSubscriberBuilder<'_, '_, '_, Handler>
339where
340    Handler: IntoHandler<Sample> + Send,
341    Handler::Handler: Send,
342{
343    #[zenoh_macros::unstable]
344    fn wait(self) -> <Self as Resolvable>::To {
345        AdvancedSubscriber::new(self.with_static_keys())
346    }
347}
348
349#[zenoh_macros::unstable]
350impl<Handler> IntoFuture for AdvancedSubscriberBuilder<'_, '_, '_, Handler>
351where
352    Handler: IntoHandler<Sample> + Send,
353    Handler::Handler: Send,
354{
355    type Output = <Self as Resolvable>::To;
356    type IntoFuture = Ready<<Self as Resolvable>::To>;
357
358    #[zenoh_macros::unstable]
359    fn into_future(self) -> Self::IntoFuture {
360        std::future::ready(self.wait())
361    }
362}
363
364#[zenoh_macros::unstable]
365impl Resolvable for AdvancedSubscriberBuilder<'_, '_, '_, Callback<Sample>, true> {
366    type To = ZResult<()>;
367}
368
369#[zenoh_macros::unstable]
370impl Wait for AdvancedSubscriberBuilder<'_, '_, '_, Callback<Sample>, true> {
371    #[zenoh_macros::unstable]
372    fn wait(self) -> <Self as Resolvable>::To {
373        let mut sub = AdvancedSubscriber::new(self.with_static_keys())?;
374        sub.set_background_impl(true);
375        Ok(())
376    }
377}
378
379#[zenoh_macros::unstable]
380impl IntoFuture for AdvancedSubscriberBuilder<'_, '_, '_, Callback<Sample>, true> {
381    type Output = <Self as Resolvable>::To;
382    type IntoFuture = Ready<<Self as Resolvable>::To>;
383
384    #[zenoh_macros::unstable]
385    fn into_future(self) -> Self::IntoFuture {
386        std::future::ready(self.wait())
387    }
388}
389
390#[zenoh_macros::unstable]
391struct Period {
392    timer: Timer,
393    period: Duration,
394}
395
396#[zenoh_macros::unstable]
397struct State {
398    next_id: usize,
399    global_pending_queries: u64,
400    sequenced_states: HashMap<EntityGlobalId, SourceState<u32>>,
401    timestamped_states: HashMap<ID, SourceState<Timestamp>>,
402    session: Session,
403    key_expr: KeyExpr<'static>,
404    retransmission: bool,
405    period: Option<Period>,
406    history_depth: usize,
407    query_target: QueryTarget,
408    query_timeout: Duration,
409    callback: Callback<Sample>,
410    miss_handlers: HashMap<usize, Callback<Miss>>,
411    token: Option<LivelinessToken>,
412}
413
414#[zenoh_macros::unstable]
415impl State {
416    #[zenoh_macros::unstable]
417    fn register_miss_callback(&mut self, callback: Callback<Miss>) -> usize {
418        let id = self.next_id;
419        self.next_id += 1;
420        self.miss_handlers.insert(id, callback);
421        id
422    }
423    #[zenoh_macros::unstable]
424    fn unregister_miss_callback(&mut self, id: &usize) {
425        self.miss_handlers.remove(id);
426    }
427}
428
429macro_rules! spawn_periodic_queries {
430    ($p:expr,$s:expr,$r:expr) => {{
431        if let Some(period) = &$p.period {
432            period.timer.add(TimedEvent::periodic(
433                period.period,
434                PeriodicQuery {
435                    source_id: $s,
436                    statesref: $r,
437                },
438            ))
439        }
440    }};
441}
442
443#[zenoh_macros::unstable]
444struct SourceState<T> {
445    last_delivered: Option<T>,
446    pending_queries: u64,
447    pending_samples: BTreeMap<T, Sample>,
448}
449
450/*
451use zenoh_ext::{AdvancedSubscriberBuilderExt, HistoryConfig, RecoveryConfig};
452
453let session = zenoh::open(zenoh::Config::default()).await.unwrap();
454let subscriber = session
455    .declare_subscriber("key/expression")
456    .history(HistoryConfig::default().detect_late_publishers())
457    .recovery(RecoveryConfig::default())
458    .await
459    .unwrap();
460
461let miss_listener = subscriber.sample_miss_listener().await.unwrap();
462loop {
463    tokio::select! {
464        sample = subscriber.recv_async() => {
465            if let Ok(sample) = sample {
466                // ...
467            }
468        },
469        miss = miss_listener.recv_async() => {
470            if let Ok(miss) = miss {
471                // ...
472            }
473        },
474    }
475}
476*/
477
478/// The extension to [`Subscriber`](zenoh::pubsub::Subscriber) that provides advanced functionalities
479///
480/// The `AdvancedSubscriber` is constructed over a regular [`Subscriber`](zenoh::pubsub::Subscriber)
481/// through [`advanced`](crate::AdvancedSubscriberBuilderExt::advanced) method or by using
482/// any other method of [`AdvancedSubscriberBuilder`](crate::AdvancedSubscriberBuilder).
483///
484/// The `AdvancedSubscriber` works with [`AdvancedPublisher`](crate::AdvancedPublisher) to provide additional functionalities such as:
485/// * missing samples detection using periodic queries or heartbeat subscription configurable with [`recovery`](crate::AdvancedSubscriberBuilder::recovery) method
486/// * recovering missing samples, configured with [`history`](crate::AdvancedSubscriberBuilder::history) method
487///   (max age and sample count, late joiner detection and requesting)
488/// * liveliness-based subscriber detection with [`subscriber_detection`](crate::AdvancedSubscriberBuilder::subscriber_detection) method
489///
490/// # Examples
491/// ```no_run
492/// # #[tokio::main]
493/// # async fn main() {
494/// use zenoh_ext::{AdvancedSubscriberBuilderExt, HistoryConfig, RecoveryConfig};
495/// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
496/// let subscriber = session
497///     .declare_subscriber("key/expression")
498///     .history(HistoryConfig::default().detect_late_publishers())
499///     .recovery(RecoveryConfig::default().heartbeat())
500///     .subscriber_detection()
501///     .await
502///     .unwrap();
503/// let miss_listener = subscriber.sample_miss_listener().await.unwrap();
504/// loop {
505///     tokio::select! {
506///         sample = subscriber.recv_async() => {
507///             if let Ok(sample) = sample {
508///                 // ...
509///             }
510///         },
511///         miss = miss_listener.recv_async() => {
512///             if let Ok(miss) = miss {
513///                 // ...
514///             }
515///         },
516///     }
517/// }
518/// # }
519/// ```
520#[zenoh_macros::unstable]
521pub struct AdvancedSubscriber<Receiver> {
522    statesref: Arc<Mutex<State>>,
523    subscriber: Subscriber<()>,
524    receiver: Receiver,
525    liveliness_subscriber: Option<Subscriber<()>>,
526    heartbeat_subscriber: Option<Subscriber<()>>,
527}
528
529#[zenoh_macros::unstable]
530impl<Receiver> std::ops::Deref for AdvancedSubscriber<Receiver> {
531    type Target = Receiver;
532    fn deref(&self) -> &Self::Target {
533        &self.receiver
534    }
535}
536
537#[zenoh_macros::unstable]
538impl<Receiver> std::ops::DerefMut for AdvancedSubscriber<Receiver> {
539    fn deref_mut(&mut self) -> &mut Self::Target {
540        &mut self.receiver
541    }
542}
543
544#[zenoh_macros::unstable]
545fn handle_sample(states: &mut State, sample: Sample) -> bool {
546    if let Some(source_info) = sample.source_info().cloned() {
547        #[inline]
548        fn deliver_and_flush(
549            sample: Sample,
550            mut source_sn: SourceSn,
551            callback: &Callback<Sample>,
552            state: &mut SourceState<u32>,
553        ) {
554            callback.call(sample);
555            state.last_delivered = Some(source_sn);
556            while let Some(sample) = state.pending_samples.remove(&(source_sn + 1)) {
557                callback.call(sample);
558                source_sn += 1;
559                state.last_delivered = Some(source_sn);
560            }
561        }
562
563        let entry = states.sequenced_states.entry(*source_info.source_id());
564        let new = matches!(&entry, Entry::Vacant(_));
565        let state = entry.or_insert(SourceState::<u32> {
566            last_delivered: None,
567            pending_queries: 0,
568            pending_samples: BTreeMap::new(),
569        });
570        if state.last_delivered.is_none() && states.global_pending_queries != 0 {
571            // Avoid going through the Map if history_depth == 1
572            if states.history_depth == 1 {
573                state.last_delivered = Some(source_info.source_sn());
574                states.callback.call(sample);
575            } else {
576                state
577                    .pending_samples
578                    .insert(source_info.source_sn(), sample);
579                if state.pending_samples.len() >= states.history_depth {
580                    if let Some((sn, sample)) = state.pending_samples.pop_first() {
581                        deliver_and_flush(sample, sn, &states.callback, state);
582                    }
583                }
584            }
585        } else if state.last_delivered.is_some()
586            && source_info.source_sn() != state.last_delivered.unwrap() + 1
587        {
588            if source_info.source_sn() > state.last_delivered.unwrap() {
589                if states.retransmission {
590                    state
591                        .pending_samples
592                        .insert(source_info.source_sn(), sample);
593                } else {
594                    tracing::info!(
595                        "Sample missed: missed {} samples from {:?}.",
596                        source_info.source_sn() - state.last_delivered.unwrap() - 1,
597                        source_info.source_id(),
598                    );
599                    for miss_callback in states.miss_handlers.values() {
600                        miss_callback.call(Miss {
601                            source: *source_info.source_id(),
602                            nb: source_info.source_sn() - state.last_delivered.unwrap() - 1,
603                        });
604                    }
605                    states.callback.call(sample);
606                    state.last_delivered = Some(source_info.source_sn());
607                }
608            }
609        } else {
610            deliver_and_flush(sample, source_info.source_sn(), &states.callback, state);
611        }
612        new
613    } else if let Some(timestamp) = sample.timestamp() {
614        let entry = states.timestamped_states.entry(*timestamp.get_id());
615        let state = entry.or_insert(SourceState::<Timestamp> {
616            last_delivered: None,
617            pending_queries: 0,
618            pending_samples: BTreeMap::new(),
619        });
620        if state.last_delivered.map(|t| t < *timestamp).unwrap_or(true) {
621            if (states.global_pending_queries == 0 && state.pending_queries == 0)
622                || states.history_depth == 1
623            {
624                state.last_delivered = Some(*timestamp);
625                states.callback.call(sample);
626            } else {
627                state.pending_samples.entry(*timestamp).or_insert(sample);
628                if state.pending_samples.len() >= states.history_depth {
629                    flush_timestamped_source(state, &states.callback);
630                }
631            }
632        }
633        false
634    } else {
635        states.callback.call(sample);
636        false
637    }
638}
639
640#[zenoh_macros::unstable]
641fn seq_num_range(start: Option<u32>, end: Option<u32>) -> String {
642    match (start, end) {
643        (Some(start), Some(end)) => format!("_sn={start}..{end}"),
644        (Some(start), None) => format!("_sn={start}.."),
645        (None, Some(end)) => format!("_sn=..{end}"),
646        (None, None) => "_sn=..".to_string(),
647    }
648}
649
650#[zenoh_macros::unstable]
651#[derive(Clone)]
652struct PeriodicQuery {
653    source_id: EntityGlobalId,
654    statesref: Arc<Mutex<State>>,
655}
656
657#[zenoh_macros::unstable]
658#[async_trait]
659impl Timed for PeriodicQuery {
660    async fn run(&mut self) {
661        let mut lock = zlock!(self.statesref);
662        let states = &mut *lock;
663        if let Some(state) = states.sequenced_states.get_mut(&self.source_id) {
664            state.pending_queries += 1;
665            let query_expr = &states.key_expr
666                / KE_ADV_PREFIX
667                / KE_STAR
668                / &self.source_id.zid().into_keyexpr()
669                / &KeyExpr::try_from(self.source_id.eid().to_string()).unwrap()
670                / KE_STARSTAR;
671            let seq_num_range = seq_num_range(state.last_delivered.map(|s| s + 1), None);
672
673            let session = states.session.clone();
674            let key_expr = states.key_expr.clone().into_owned();
675            let query_target = states.query_target;
676            let query_timeout = states.query_timeout;
677
678            tracing::trace!(
679                "AdvancedSubscriber{{key_expr: {}}}: Querying undelivered samples {}?{}",
680                states.key_expr,
681                query_expr,
682                seq_num_range
683            );
684            drop(lock);
685
686            let handler = SequencedRepliesHandler {
687                source_id: self.source_id,
688                statesref: self.statesref.clone(),
689            };
690            let _ = session
691                .get(Selector::from((query_expr, seq_num_range)))
692                .callback({
693                    move |r: Reply| {
694                        if let Ok(s) = r.into_result() {
695                            if key_expr.intersects(s.key_expr()) {
696                                let states = &mut *zlock!(handler.statesref);
697                                tracing::trace!(
698                                    "AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}",
699                                    states.key_expr,
700                                    s.source_info(),
701                                    s.timestamp()
702                                );
703                                handle_sample(states, s);
704                            }
705                        }
706                    }
707                })
708                .consolidation(ConsolidationMode::None)
709                .accept_replies(ReplyKeyExpr::Any)
710                .target(query_target)
711                .timeout(query_timeout)
712                .wait();
713        }
714    }
715}
716
717#[zenoh_macros::unstable]
718impl<Handler> AdvancedSubscriber<Handler> {
719    fn new<H>(conf: AdvancedSubscriberBuilder<'_, '_, '_, H>) -> ZResult<Self>
720    where
721        H: IntoHandler<Sample, Handler = Handler> + Send,
722    {
723        let (callback, receiver) = conf.handler.into_handler();
724        let key_expr = conf.key_expr?;
725        let meta = match conf.meta_key_expr {
726            Some(meta) => Some(meta?),
727            None => None,
728        };
729        let retransmission = conf.retransmission;
730        let query_target = conf.query_target;
731        let query_timeout = conf.query_timeout;
732        let session = conf.session.clone();
733        let statesref = Arc::new(Mutex::new(State {
734            next_id: 0,
735            sequenced_states: HashMap::new(),
736            timestamped_states: HashMap::new(),
737            global_pending_queries: if conf.history.is_some() { 1 } else { 0 },
738            session,
739            period: retransmission.as_ref().and_then(|r| {
740                let _rt = ZRuntime::Application.enter();
741                r.periodic_queries.map(|p| Period {
742                    timer: Timer::new(false),
743                    period: p,
744                })
745            }),
746            key_expr: key_expr.clone().into_owned(),
747            retransmission: retransmission.is_some(),
748            history_depth: conf
749                .history
750                .as_ref()
751                .and_then(|h| h.sample_depth)
752                .unwrap_or_default(),
753            query_target: conf.query_target,
754            query_timeout: conf.query_timeout,
755            callback: callback.clone(),
756            miss_handlers: HashMap::new(),
757            token: None,
758        }));
759
760        let sub_callback = {
761            let statesref = statesref.clone();
762            let session = conf.session.clone();
763            let key_expr = key_expr.clone().into_owned();
764
765            move |s: Sample| {
766                let mut lock = zlock!(statesref);
767                let states = &mut *lock;
768                let source_id = s.source_info().map(|si| *si.source_id());
769                let new = handle_sample(states, s);
770
771                if let Some(source_id) = source_id {
772                    if new {
773                        spawn_periodic_queries!(states, source_id, statesref.clone());
774                    }
775                    if let Some(state) = states.sequenced_states.get_mut(&source_id) {
776                        if retransmission.is_some()
777                            && state.pending_queries == 0
778                            && !state.pending_samples.is_empty()
779                        {
780                            state.pending_queries += 1;
781                            let query_expr = &key_expr
782                                / KE_ADV_PREFIX
783                                / KE_STAR
784                                / &source_id.zid().into_keyexpr()
785                                / &KeyExpr::try_from(source_id.eid().to_string()).unwrap()
786                                / KE_STARSTAR;
787                            let seq_num_range =
788                                seq_num_range(state.last_delivered.map(|s| s + 1), None);
789                            tracing::trace!(
790                                "AdvancedSubscriber{{key_expr: {}}}: Querying missing samples {}?{}",
791                                states.key_expr,
792                                query_expr,
793                                seq_num_range
794                            );
795                            drop(lock);
796                            let handler = SequencedRepliesHandler {
797                                source_id,
798                                statesref: statesref.clone(),
799                            };
800                            let _ = session
801                                .get(Selector::from((query_expr, seq_num_range)))
802                                .callback({
803                                    let key_expr = key_expr.clone().into_owned();
804                                    move |r: Reply| {
805                                        if let Ok(s) = r.into_result() {
806                                            if key_expr.intersects(s.key_expr()) {
807                                                let states = &mut *zlock!(handler.statesref);
808                                                tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
809                                                handle_sample(states, s);
810                                            }
811                                        }
812                                    }
813                                })
814                                .consolidation(ConsolidationMode::None)
815                                .accept_replies(ReplyKeyExpr::Any)
816                                .target(query_target)
817                                .timeout(query_timeout)
818                                .wait();
819                        }
820                    }
821                }
822            }
823        };
824
825        let subscriber = conf
826            .session
827            .declare_subscriber(&key_expr)
828            .callback(sub_callback)
829            .allowed_origin(conf.origin)
830            .wait()?;
831
832        tracing::debug!("Create AdvancedSubscriber{{key_expr: {}}}", key_expr,);
833
834        if let Some(historyconf) = conf.history.as_ref() {
835            let handler = InitialRepliesHandler {
836                statesref: statesref.clone(),
837            };
838            let mut params = Parameters::empty();
839            if let Some(max) = historyconf.sample_depth {
840                params.insert("_max", max.to_string());
841            }
842            if let Some(age) = historyconf.age {
843                params.set_time_range(TimeRange {
844                    start: TimeBound::Inclusive(TimeExpr::Now { offset_secs: -age }),
845                    end: TimeBound::Unbounded,
846                });
847            }
848            tracing::trace!(
849                "AdvancedSubscriber{{key_expr: {}}} Querying historical samples {}?{}",
850                key_expr,
851                &key_expr / KE_ADV_PREFIX / KE_STARSTAR,
852                params
853            );
854            let _ = conf
855                .session
856                .get(Selector::from((
857                    &key_expr / KE_ADV_PREFIX / KE_STARSTAR,
858                    params,
859                )))
860                .callback({
861                    let key_expr = key_expr.clone().into_owned();
862                    move |r: Reply| {
863                        if let Ok(s) = r.into_result() {
864                            if key_expr.intersects(s.key_expr()) {
865                                let states = &mut *zlock!(handler.statesref);
866                                tracing::trace!(
867                                    "AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}",
868                                    states.key_expr,
869                                    s.source_info(),
870                                    s.timestamp()
871                                );
872                                handle_sample(states, s);
873                            }
874                        }
875                    }
876                })
877                .consolidation(ConsolidationMode::None)
878                .accept_replies(ReplyKeyExpr::Any)
879                .target(query_target)
880                .timeout(query_timeout)
881                .wait();
882        }
883
884        let liveliness_subscriber = if let Some(historyconf) = conf.history.as_ref() {
885            if historyconf.liveliness {
886                let live_callback = {
887                    let session = conf.session.clone();
888                    let statesref = statesref.clone();
889                    let key_expr = key_expr.clone().into_owned();
890                    let historyconf = historyconf.clone();
891                    move |s: Sample| {
892                        if s.kind() == SampleKind::Put {
893                            if let Ok(parsed) = ke_liveliness::parse(s.key_expr().as_keyexpr()) {
894                                if let Ok(zid) = ZenohId::from_str(parsed.zid().as_str()) {
895                                    // TODO : If we already have a state associated to this discovered source
896                                    // we should query with the appropriate range to avoid unnecessary retransmissions
897                                    if parsed.eid() == KE_UHLC {
898                                        let mut lock = zlock!(statesref);
899                                        let states = &mut *lock;
900                                        tracing::trace!(
901                                            "AdvancedSubscriber{{key_expr: {}}}: Detect late joiner publishers with zid={}",
902                                            states.key_expr,
903                                            parsed.zid().as_str()
904                                        );
905                                        let entry = states.timestamped_states.entry(ID::from(zid));
906                                        let state = entry.or_insert(SourceState::<Timestamp> {
907                                            last_delivered: None,
908                                            pending_queries: 0,
909                                            pending_samples: BTreeMap::new(),
910                                        });
911                                        state.pending_queries += 1;
912
913                                        let mut params = Parameters::empty();
914                                        if let Some(max) = historyconf.sample_depth {
915                                            params.insert("_max", max.to_string());
916                                        }
917                                        if let Some(age) = historyconf.age {
918                                            params.set_time_range(TimeRange {
919                                                start: TimeBound::Inclusive(TimeExpr::Now {
920                                                    offset_secs: -age,
921                                                }),
922                                                end: TimeBound::Unbounded,
923                                            });
924                                        }
925                                        tracing::trace!(
926                                            "AdvancedSubscriber{{key_expr: {}}}: Querying historical samples {}?{}",
927                                            states.key_expr,
928                                            s.key_expr(),
929                                            params
930                                        );
931                                        drop(lock);
932
933                                        let handler = TimestampedRepliesHandler {
934                                            id: ID::from(zid),
935                                            statesref: statesref.clone(),
936                                            callback: callback.clone(),
937                                        };
938                                        let _ = session
939                                            .get(Selector::from((s.key_expr(), params)))
940                                            .callback({
941                                                let key_expr = key_expr.clone().into_owned();
942                                                move |r: Reply| {
943                                                    if let Ok(s) = r.into_result() {
944                                                        if key_expr.intersects(s.key_expr()) {
945                                                            let states =
946                                                                &mut *zlock!(handler.statesref);
947                                                            tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
948                                                            handle_sample(states, s);
949                                                        }
950                                                    }
951                                                }
952                                            })
953                                            .consolidation(ConsolidationMode::None)
954                                            .accept_replies(ReplyKeyExpr::Any)
955                                            .target(query_target)
956                                            .timeout(query_timeout)
957                                            .wait();
958                                    } else if let Ok(eid) =
959                                        EntityId::from_str(parsed.eid().as_str())
960                                    {
961                                        let source_id = EntityGlobalId::new(zid, eid);
962                                        let mut lock = zlock!(statesref);
963                                        let states = &mut *lock;
964                                        tracing::trace!(
965                                            "AdvancedSubscriber{{key_expr: {}}}: Detect late joiner publishers with zid={}",
966                                            states.key_expr,
967                                            parsed.zid().as_str()
968                                        );
969                                        let entry = states.sequenced_states.entry(source_id);
970                                        let new = matches!(&entry, Entry::Vacant(_));
971                                        let state = entry.or_insert(SourceState::<u32> {
972                                            last_delivered: None,
973                                            pending_queries: 0,
974                                            pending_samples: BTreeMap::new(),
975                                        });
976                                        state.pending_queries += 1;
977
978                                        let mut params = Parameters::empty();
979                                        if let Some(max) = historyconf.sample_depth {
980                                            params.insert("_max", max.to_string());
981                                        }
982                                        if let Some(age) = historyconf.age {
983                                            params.set_time_range(TimeRange {
984                                                start: TimeBound::Inclusive(TimeExpr::Now {
985                                                    offset_secs: -age,
986                                                }),
987                                                end: TimeBound::Unbounded,
988                                            });
989                                        }
990                                        tracing::trace!(
991                                            "AdvancedSubscriber{{key_expr: {}}}: Querying historical samples {}?{}",
992                                            states.key_expr,
993                                            s.key_expr(),
994                                            params,
995                                        );
996                                        drop(lock);
997
998                                        let handler = SequencedRepliesHandler {
999                                            source_id,
1000                                            statesref: statesref.clone(),
1001                                        };
1002                                        let _ = session
1003                                            .get(Selector::from((s.key_expr(), params)))
1004                                            .callback({
1005                                                let key_expr = key_expr.clone().into_owned();
1006                                                move |r: Reply| {
1007                                                    if let Ok(s) = r.into_result() {
1008                                                        if key_expr.intersects(s.key_expr()) {
1009                                                            let states =
1010                                                                &mut *zlock!(handler.statesref);
1011                                                            tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
1012                                                            handle_sample(states, s);
1013                                                        }
1014                                                    }
1015                                                }
1016                                            })
1017                                            .consolidation(ConsolidationMode::None)
1018                                            .accept_replies(ReplyKeyExpr::Any)
1019                                            .target(query_target)
1020                                            .timeout(query_timeout)
1021                                            .wait();
1022
1023                                        if new {
1024                                            spawn_periodic_queries!(
1025                                                zlock!(statesref),
1026                                                source_id,
1027                                                statesref.clone()
1028                                            );
1029                                        }
1030                                    }
1031                                } else {
1032                                    let mut lock = zlock!(statesref);
1033                                    let states = &mut *lock;
1034                                    tracing::trace!(
1035                                        "AdvancedSubscriber{{key_expr: {}}}: Detect late joiner publishers with zid={}",
1036                                        states.key_expr,
1037                                        parsed.zid().as_str()
1038                                    );
1039                                    states.global_pending_queries += 1;
1040
1041                                    let mut params = Parameters::empty();
1042                                    if let Some(max) = historyconf.sample_depth {
1043                                        params.insert("_max", max.to_string());
1044                                    }
1045                                    if let Some(age) = historyconf.age {
1046                                        params.set_time_range(TimeRange {
1047                                            start: TimeBound::Inclusive(TimeExpr::Now {
1048                                                offset_secs: -age,
1049                                            }),
1050                                            end: TimeBound::Unbounded,
1051                                        });
1052                                    }
1053                                    tracing::trace!(
1054                                        "AdvancedSubscriber{{key_expr: {}}}: Querying historical samples {}?{}",
1055                                        states.key_expr,
1056                                        s.key_expr(),
1057                                        params,
1058                                    );
1059                                    drop(lock);
1060
1061                                    let handler = InitialRepliesHandler {
1062                                        statesref: statesref.clone(),
1063                                    };
1064                                    let _ = session
1065                                        .get(Selector::from((s.key_expr(), params)))
1066                                        .callback({
1067                                            let key_expr = key_expr.clone().into_owned();
1068                                            move |r: Reply| {
1069                                                if let Ok(s) = r.into_result() {
1070                                                    if key_expr.intersects(s.key_expr()) {
1071                                                        let states =
1072                                                            &mut *zlock!(handler.statesref);
1073                                                        tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
1074                                                        handle_sample(states, s);
1075                                                    }
1076                                                }
1077                                            }
1078                                        })
1079                                        .consolidation(ConsolidationMode::None)
1080                                        .accept_replies(ReplyKeyExpr::Any)
1081                                        .target(query_target)
1082                                        .timeout(query_timeout)
1083                                        .wait();
1084                                }
1085                            } else {
1086                                tracing::warn!(
1087                                    "AdvancedSubscriber{{}}: Received malformed liveliness token key expression: {}",
1088                                    s.key_expr()
1089                                );
1090                            }
1091                        }
1092                    }
1093                };
1094
1095                tracing::debug!(
1096                    "AdvancedSubscriber{{key_expr: {}}}: Detect late joiner publishers on {}",
1097                    key_expr,
1098                    &key_expr / KE_ADV_PREFIX / KE_PUB / KE_STARSTAR
1099                );
1100                Some(
1101                    conf.session
1102                        .liveliness()
1103                        .declare_subscriber(&key_expr / KE_ADV_PREFIX / KE_PUB / KE_STARSTAR)
1104                        // .declare_subscriber(keformat!(ke_liveliness_all::formatter(), zid = 0, eid = 0, remaining = key_expr).unwrap())
1105                        .history(true)
1106                        .callback(live_callback)
1107                        .wait()?,
1108                )
1109            } else {
1110                None
1111            }
1112        } else {
1113            None
1114        };
1115
1116        let heartbeat_subscriber = if retransmission.is_some_and(|r| r.heartbeat) {
1117            let ke_heartbeat_sub = &key_expr / KE_ADV_PREFIX / KE_PUB / KE_STARSTAR;
1118            let statesref = statesref.clone();
1119            tracing::debug!(
1120                "AdvancedSubscriber{{key_expr: {}}}: Enable heartbeat subscriber on {}",
1121                key_expr,
1122                ke_heartbeat_sub
1123            );
1124            let heartbeat_sub = conf
1125                .session
1126                .declare_subscriber(ke_heartbeat_sub)
1127                .callback(move |sample_hb| {
1128                    if sample_hb.kind() != SampleKind::Put {
1129                        return;
1130                    }
1131
1132                    let heartbeat_keyexpr = sample_hb.key_expr().as_keyexpr();
1133                    let Ok(parsed_keyexpr) = ke_liveliness::parse(heartbeat_keyexpr) else {
1134                        return;
1135                    };
1136                    let source_id = {
1137                        let Ok(zid) = ZenohId::from_str(parsed_keyexpr.zid().as_str()) else {
1138                            return;
1139                        };
1140                        let Ok(eid) = EntityId::from_str(parsed_keyexpr.eid().as_str()) else {
1141                            return;
1142                        };
1143                        EntityGlobalId::new(zid, eid)
1144                    };
1145
1146                    let Ok(heartbeat_sn) = z_deserialize::<u32>(sample_hb.payload()) else {
1147                        tracing::debug!(
1148                            "AdvancedSubscriber{{}}: Skipping invalid heartbeat payload on '{}'",
1149                            heartbeat_keyexpr
1150                        );
1151                        return;
1152                    };
1153
1154                    let mut lock = zlock!(statesref);
1155                    let states = &mut *lock;
1156                    let entry = states.sequenced_states.entry(source_id);
1157                    if matches!(&entry, Entry::Vacant(_)) {
1158                        // NOTE: API does not allow both heartbeat and periodic_queries
1159                        spawn_periodic_queries!(states, source_id, statesref.clone());
1160                        if states.global_pending_queries > 0 {
1161                            tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Skipping heartbeat on '{}' from publisher that is currently being pulled by global query", states.key_expr, heartbeat_keyexpr);
1162                            return;
1163                        }
1164                    }
1165
1166                    let state = entry.or_insert(SourceState::<u32> {
1167                        last_delivered: None,
1168                        pending_queries: 0,
1169                        pending_samples: BTreeMap::new(),
1170                    });
1171
1172                    // check that it's not an old sn, and that there are no pending queries
1173                    if (state.last_delivered.is_none()
1174                        || state.last_delivered.is_some_and(|sn| heartbeat_sn > sn))
1175                        && state.pending_queries == 0
1176                    {
1177                        let seq_num_range = seq_num_range(
1178                            state.last_delivered.map(|s| s + 1),
1179                            Some(heartbeat_sn),
1180                        );
1181
1182                        let session = states.session.clone();
1183                        let key_expr = states.key_expr.clone().into_owned();
1184                        let query_target = states.query_target;
1185                        let query_timeout = states.query_timeout;
1186                        state.pending_queries += 1;
1187
1188                        tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Querying missing samples {}?{}", states.key_expr, heartbeat_keyexpr, seq_num_range);
1189                        drop(lock);
1190
1191                        let handler = SequencedRepliesHandler {
1192                            source_id,
1193                            statesref: statesref.clone(),
1194                        };
1195                        let _ = session
1196                            .get(Selector::from((heartbeat_keyexpr, seq_num_range)))
1197                            .callback({
1198                                move |r: Reply| {
1199                                    if let Ok(s) = r.into_result() {
1200                                        if key_expr.intersects(s.key_expr()) {
1201                                            let states = &mut *zlock!(handler.statesref);
1202                                            tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
1203                                            handle_sample(states, s);
1204                                        }
1205                                    }
1206                                }
1207                            })
1208                            .consolidation(ConsolidationMode::None)
1209                            .accept_replies(ReplyKeyExpr::Any)
1210                            .target(query_target)
1211                            .timeout(query_timeout)
1212                            .wait();
1213                    }
1214                })
1215                .allowed_origin(conf.origin)
1216                .wait()?;
1217            Some(heartbeat_sub)
1218        } else {
1219            None
1220        };
1221
1222        if conf.liveliness {
1223            let suffix = KE_ADV_PREFIX
1224                / KE_SUB
1225                / &subscriber.id().zid().into_keyexpr()
1226                / &KeyExpr::try_from(subscriber.id().eid().to_string()).unwrap();
1227            let suffix = match meta {
1228                Some(meta) => suffix / &meta,
1229                // We need this empty chunk because of a routing matching bug
1230                _ => suffix / KE_EMPTY,
1231            };
1232            tracing::debug!(
1233                "AdvancedSubscriber{{key_expr: {}}}: Declare liveliness token {}",
1234                key_expr,
1235                &key_expr / &suffix,
1236            );
1237            let token = conf
1238                .session
1239                .liveliness()
1240                .declare_token(&key_expr / &suffix)
1241                .wait()?;
1242            zlock!(statesref).token = Some(token)
1243        }
1244
1245        let reliable_subscriber = AdvancedSubscriber {
1246            statesref,
1247            subscriber,
1248            receiver,
1249            liveliness_subscriber,
1250            heartbeat_subscriber,
1251        };
1252
1253        Ok(reliable_subscriber)
1254    }
1255
1256    /// Returns the [`EntityGlobalId`] of this AdvancedSubscriber.
1257    #[zenoh_macros::unstable]
1258    pub fn id(&self) -> EntityGlobalId {
1259        self.subscriber.id()
1260    }
1261
1262    /// Returns the [`KeyExpr`] this subscriber subscribes to.
1263    #[zenoh_macros::unstable]
1264    pub fn key_expr(&self) -> &KeyExpr<'static> {
1265        self.subscriber.key_expr()
1266    }
1267
1268    /// Returns a reference to this subscriber's handler.
1269    /// An handler is anything that implements [`zenoh::handlers::IntoHandler`].
1270    /// The default handler is [`zenoh::handlers::DefaultHandler`].
1271    #[zenoh_macros::unstable]
1272    pub fn handler(&self) -> &Handler {
1273        &self.receiver
1274    }
1275
1276    /// Returns a mutable reference to this subscriber's handler.
1277    /// An handler is anything that implements [`zenoh::handlers::IntoHandler`].
1278    /// The default handler is [`zenoh::handlers::DefaultHandler`].
1279    #[zenoh_macros::unstable]
1280    pub fn handler_mut(&mut self) -> &mut Handler {
1281        &mut self.receiver
1282    }
1283
1284    /// Declares a listener to detect missed samples.
1285    ///
1286    /// Missed samples can only be detected from [`AdvancedPublisher`](crate::AdvancedPublisher) that
1287    /// enable [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection).
1288    #[zenoh_macros::unstable]
1289    pub fn sample_miss_listener(&self) -> SampleMissListenerBuilder<'_, DefaultHandler> {
1290        SampleMissListenerBuilder {
1291            statesref: &self.statesref,
1292            handler: DefaultHandler::default(),
1293        }
1294    }
1295
1296    /// Declares a listener to detect matching publishers.
1297    ///
1298    /// Only [`AdvancedPublisher`](crate::AdvancedPublisher) that enable
1299    /// [`publisher_detection`](crate::AdvancedPublisherBuilder::publisher_detection) can be detected.
1300    #[zenoh_macros::unstable]
1301    pub fn detect_publishers(&self) -> LivelinessSubscriberBuilder<'_, '_, DefaultHandler> {
1302        self.subscriber
1303            .session()
1304            .liveliness()
1305            .declare_subscriber(self.subscriber.key_expr() / KE_ADV_PREFIX / KE_PUB / KE_STARSTAR)
1306    }
1307
1308    /// Undeclares this AdvancedSubscriber
1309    #[inline]
1310    #[zenoh_macros::unstable]
1311    pub fn undeclare(self) -> impl Resolve<ZResult<()>> {
1312        tracing::debug!(
1313            "AdvancedSubscriber{{key_expr: {}}}: Undeclare",
1314            self.key_expr()
1315        );
1316        self.subscriber.undeclare()
1317    }
1318
1319    fn set_background_impl(&mut self, background: bool) {
1320        self.subscriber.set_background(background);
1321        if let Some(mut liveliness_sub) = self.liveliness_subscriber.take() {
1322            liveliness_sub.set_background(background);
1323        }
1324        if let Some(mut heartbeat_sub) = self.heartbeat_subscriber.take() {
1325            heartbeat_sub.set_background(background);
1326        }
1327    }
1328
1329    #[zenoh_macros::internal]
1330    pub fn set_background(&mut self, background: bool) {
1331        self.set_background_impl(background)
1332    }
1333}
1334
1335#[zenoh_macros::unstable]
1336#[inline]
1337fn flush_sequenced_source(
1338    state: &mut SourceState<u32>,
1339    callback: &Callback<Sample>,
1340    source_id: &EntityGlobalId,
1341    miss_handlers: &HashMap<usize, Callback<Miss>>,
1342) {
1343    if state.pending_queries == 0 && !state.pending_samples.is_empty() {
1344        let mut pending_samples = BTreeMap::new();
1345        std::mem::swap(&mut state.pending_samples, &mut pending_samples);
1346        for (seq_num, sample) in pending_samples {
1347            match state.last_delivered {
1348                None => {
1349                    state.last_delivered = Some(seq_num);
1350                    callback.call(sample);
1351                }
1352                Some(last) if seq_num == last + 1 => {
1353                    state.last_delivered = Some(seq_num);
1354                    callback.call(sample);
1355                }
1356                Some(last) if seq_num > last + 1 => {
1357                    tracing::warn!(
1358                        "Sample missed: missed {} samples from {:?}.",
1359                        seq_num - last - 1,
1360                        source_id,
1361                    );
1362                    for miss_callback in miss_handlers.values() {
1363                        miss_callback.call(Miss {
1364                            source: *source_id,
1365                            nb: seq_num - last - 1,
1366                        })
1367                    }
1368                    state.last_delivered = Some(seq_num);
1369                    callback.call(sample);
1370                }
1371                _ => {
1372                    // duplicate
1373                }
1374            }
1375        }
1376    }
1377}
1378
1379#[zenoh_macros::unstable]
1380#[inline]
1381fn flush_timestamped_source(state: &mut SourceState<Timestamp>, callback: &Callback<Sample>) {
1382    if state.pending_queries == 0 && !state.pending_samples.is_empty() {
1383        let mut pending_samples = BTreeMap::new();
1384        std::mem::swap(&mut state.pending_samples, &mut pending_samples);
1385        for (timestamp, sample) in pending_samples {
1386            if state
1387                .last_delivered
1388                .map(|last| timestamp > last)
1389                .unwrap_or(true)
1390            {
1391                state.last_delivered = Some(timestamp);
1392                callback.call(sample);
1393            }
1394        }
1395    }
1396}
1397
1398#[zenoh_macros::unstable]
1399#[derive(Clone)]
1400struct InitialRepliesHandler {
1401    statesref: Arc<Mutex<State>>,
1402}
1403
1404#[zenoh_macros::unstable]
1405impl Drop for InitialRepliesHandler {
1406    fn drop(&mut self) {
1407        let states = &mut *zlock!(self.statesref);
1408        states.global_pending_queries = states.global_pending_queries.saturating_sub(1);
1409        tracing::trace!(
1410            "AdvancedSubscriber{{key_expr: {}}}: Flush initial replies",
1411            states.key_expr
1412        );
1413
1414        if states.global_pending_queries == 0 {
1415            for (source_id, state) in states.sequenced_states.iter_mut() {
1416                flush_sequenced_source(state, &states.callback, source_id, &states.miss_handlers);
1417                spawn_periodic_queries!(states, *source_id, self.statesref.clone());
1418            }
1419            for state in states.timestamped_states.values_mut() {
1420                flush_timestamped_source(state, &states.callback);
1421            }
1422        }
1423    }
1424}
1425
1426#[zenoh_macros::unstable]
1427#[derive(Clone)]
1428struct SequencedRepliesHandler {
1429    source_id: EntityGlobalId,
1430    statesref: Arc<Mutex<State>>,
1431}
1432
1433#[zenoh_macros::unstable]
1434impl Drop for SequencedRepliesHandler {
1435    fn drop(&mut self) {
1436        let states = &mut *zlock!(self.statesref);
1437        if let Some(state) = states.sequenced_states.get_mut(&self.source_id) {
1438            state.pending_queries = state.pending_queries.saturating_sub(1);
1439            if states.global_pending_queries == 0 {
1440                tracing::trace!(
1441                    "AdvancedSubscriber{{key_expr: {}}}: Flush sequenced samples",
1442                    states.key_expr
1443                );
1444                flush_sequenced_source(
1445                    state,
1446                    &states.callback,
1447                    &self.source_id,
1448                    &states.miss_handlers,
1449                )
1450            }
1451        }
1452    }
1453}
1454
1455#[zenoh_macros::unstable]
1456#[derive(Clone)]
1457struct TimestampedRepliesHandler {
1458    id: ID,
1459    statesref: Arc<Mutex<State>>,
1460    callback: Callback<Sample>,
1461}
1462
1463#[zenoh_macros::unstable]
1464impl Drop for TimestampedRepliesHandler {
1465    fn drop(&mut self) {
1466        let states = &mut *zlock!(self.statesref);
1467        if let Some(state) = states.timestamped_states.get_mut(&self.id) {
1468            state.pending_queries = state.pending_queries.saturating_sub(1);
1469            if states.global_pending_queries == 0 {
1470                tracing::trace!(
1471                    "AdvancedSubscriber{{key_expr: {}}}: Flush timestamped samples",
1472                    states.key_expr
1473                );
1474                flush_timestamped_source(state, &self.callback);
1475            }
1476        }
1477    }
1478}
1479
1480/// A struct that represent missed samples.
1481#[zenoh_macros::unstable]
1482#[derive(Debug, Clone)]
1483pub struct Miss {
1484    source: EntityGlobalId,
1485    nb: u32,
1486}
1487
1488impl Miss {
1489    /// The source of missed samples.
1490    pub fn source(&self) -> EntityGlobalId {
1491        self.source
1492    }
1493
1494    /// The number of missed samples.
1495    pub fn nb(&self) -> u32 {
1496        self.nb
1497    }
1498}
1499
1500impl CallbackParameter for Miss {
1501    type Message<'a> = Self;
1502
1503    fn from_message(msg: Self::Message<'_>) -> Self {
1504        msg
1505    }
1506}
1507
1508/// A listener to detect missed samples.
1509///
1510/// Missed samples can only be detected from [`AdvancedPublisher`](crate::AdvancedPublisher) that
1511/// enable [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection).
1512#[zenoh_macros::unstable]
1513pub struct SampleMissListener<Handler> {
1514    id: usize,
1515    statesref: Arc<Mutex<State>>,
1516    handler: Handler,
1517    undeclare_on_drop: bool,
1518}
1519
1520#[zenoh_macros::unstable]
1521impl<Handler> SampleMissListener<Handler> {
1522    #[inline]
1523    pub fn undeclare(self) -> SampleMissHandlerUndeclaration<Handler>
1524    where
1525        Handler: Send,
1526    {
1527        // self.undeclare_inner(())
1528        SampleMissHandlerUndeclaration(self)
1529    }
1530
1531    fn undeclare_impl(&mut self) -> ZResult<()> {
1532        // set the flag first to avoid double panic if this function panic
1533        self.undeclare_on_drop = false;
1534        zlock!(self.statesref).unregister_miss_callback(&self.id);
1535        Ok(())
1536    }
1537
1538    #[zenoh_macros::internal]
1539    pub fn set_background(&mut self, background: bool) {
1540        self.undeclare_on_drop = !background;
1541    }
1542}
1543
1544#[cfg(feature = "unstable")]
1545impl<Handler> Drop for SampleMissListener<Handler> {
1546    fn drop(&mut self) {
1547        if self.undeclare_on_drop {
1548            if let Err(error) = self.undeclare_impl() {
1549                tracing::error!(error);
1550            }
1551        }
1552    }
1553}
1554
1555// #[zenoh_macros::unstable]
1556// impl<Handler: Send> UndeclarableSealed<()> for SampleMissHandler<Handler> {
1557//     type Undeclaration = SampleMissHandlerUndeclaration<Handler>;
1558
1559//     fn undeclare_inner(self, _: ()) -> Self::Undeclaration {
1560//         SampleMissHandlerUndeclaration(self)
1561//     }
1562// }
1563
1564#[zenoh_macros::unstable]
1565impl<Handler> std::ops::Deref for SampleMissListener<Handler> {
1566    type Target = Handler;
1567
1568    fn deref(&self) -> &Self::Target {
1569        &self.handler
1570    }
1571}
1572#[zenoh_macros::unstable]
1573impl<Handler> std::ops::DerefMut for SampleMissListener<Handler> {
1574    fn deref_mut(&mut self) -> &mut Self::Target {
1575        &mut self.handler
1576    }
1577}
1578
1579/// A [`Resolvable`] returned by [`SampleMissListener::undeclare`]
1580#[zenoh_macros::unstable]
1581pub struct SampleMissHandlerUndeclaration<Handler>(SampleMissListener<Handler>);
1582
1583#[zenoh_macros::unstable]
1584impl<Handler> Resolvable for SampleMissHandlerUndeclaration<Handler> {
1585    type To = ZResult<()>;
1586}
1587
1588#[zenoh_macros::unstable]
1589impl<Handler> Wait for SampleMissHandlerUndeclaration<Handler> {
1590    fn wait(mut self) -> <Self as Resolvable>::To {
1591        self.0.undeclare_impl()
1592    }
1593}
1594
1595#[zenoh_macros::unstable]
1596impl<Handler> IntoFuture for SampleMissHandlerUndeclaration<Handler> {
1597    type Output = <Self as Resolvable>::To;
1598    type IntoFuture = Ready<<Self as Resolvable>::To>;
1599
1600    fn into_future(self) -> Self::IntoFuture {
1601        std::future::ready(self.wait())
1602    }
1603}
1604
1605/// A builder for initializing a [`SampleMissListener`].
1606#[zenoh_macros::unstable]
1607pub struct SampleMissListenerBuilder<'a, Handler, const BACKGROUND: bool = false> {
1608    statesref: &'a Arc<Mutex<State>>,
1609    handler: Handler,
1610}
1611
1612#[zenoh_macros::unstable]
1613impl<'a> SampleMissListenerBuilder<'a, DefaultHandler> {
1614    /// Receive the sample miss notification with a callback.
1615    #[inline]
1616    #[zenoh_macros::unstable]
1617    pub fn callback<F>(self, callback: F) -> SampleMissListenerBuilder<'a, Callback<Miss>>
1618    where
1619        F: Fn(Miss) + Send + Sync + 'static,
1620    {
1621        self.with(Callback::from(callback))
1622    }
1623
1624    /// Receive the sample miss notification with a mutable callback.
1625    #[inline]
1626    #[zenoh_macros::unstable]
1627    pub fn callback_mut<F>(self, callback: F) -> SampleMissListenerBuilder<'a, Callback<Miss>>
1628    where
1629        F: FnMut(Miss) + Send + Sync + 'static,
1630    {
1631        self.callback(zenoh::handlers::locked(callback))
1632    }
1633
1634    /// Receive the sample miss notification with a [`Handler`](IntoHandler).
1635    #[inline]
1636    #[zenoh_macros::unstable]
1637    pub fn with<Handler>(self, handler: Handler) -> SampleMissListenerBuilder<'a, Handler>
1638    where
1639        Handler: IntoHandler<Miss>,
1640    {
1641        SampleMissListenerBuilder {
1642            statesref: self.statesref,
1643            handler,
1644        }
1645    }
1646}
1647
1648#[zenoh_macros::unstable]
1649impl<'a> SampleMissListenerBuilder<'a, Callback<Miss>> {
1650    /// Make the sample miss notification run in the background until the advanced subscriber is undeclared.
1651    ///
1652    /// Background builder doesn't return a `SampleMissHandler` object anymore.
1653    #[zenoh_macros::unstable]
1654    pub fn background(self) -> SampleMissListenerBuilder<'a, Callback<Miss>, true> {
1655        SampleMissListenerBuilder {
1656            statesref: self.statesref,
1657            handler: self.handler,
1658        }
1659    }
1660}
1661
1662#[zenoh_macros::unstable]
1663impl<Handler> Resolvable for SampleMissListenerBuilder<'_, Handler>
1664where
1665    Handler: IntoHandler<Miss> + Send,
1666    Handler::Handler: Send,
1667{
1668    type To = ZResult<SampleMissListener<Handler::Handler>>;
1669}
1670
1671#[zenoh_macros::unstable]
1672impl<Handler> Wait for SampleMissListenerBuilder<'_, Handler>
1673where
1674    Handler: IntoHandler<Miss> + Send,
1675    Handler::Handler: Send,
1676{
1677    #[zenoh_macros::unstable]
1678    fn wait(self) -> <Self as Resolvable>::To {
1679        let (callback, handler) = self.handler.into_handler();
1680        let id = zlock!(self.statesref).register_miss_callback(callback);
1681        Ok(SampleMissListener {
1682            id,
1683            statesref: self.statesref.clone(),
1684            handler,
1685            undeclare_on_drop: true,
1686        })
1687    }
1688}
1689
1690#[zenoh_macros::unstable]
1691impl<Handler> IntoFuture for SampleMissListenerBuilder<'_, Handler>
1692where
1693    Handler: IntoHandler<Miss> + Send,
1694    Handler::Handler: Send,
1695{
1696    type Output = <Self as Resolvable>::To;
1697    type IntoFuture = Ready<<Self as Resolvable>::To>;
1698
1699    #[zenoh_macros::unstable]
1700    fn into_future(self) -> Self::IntoFuture {
1701        std::future::ready(self.wait())
1702    }
1703}
1704
1705#[zenoh_macros::unstable]
1706impl Resolvable for SampleMissListenerBuilder<'_, Callback<Miss>, true> {
1707    type To = ZResult<()>;
1708}
1709
1710#[zenoh_macros::unstable]
1711impl Wait for SampleMissListenerBuilder<'_, Callback<Miss>, true> {
1712    #[zenoh_macros::unstable]
1713    fn wait(self) -> <Self as Resolvable>::To {
1714        let (callback, _) = self.handler.into_handler();
1715        zlock!(self.statesref).register_miss_callback(callback);
1716        Ok(())
1717    }
1718}
1719
1720#[zenoh_macros::unstable]
1721impl IntoFuture for SampleMissListenerBuilder<'_, Callback<Miss>, true> {
1722    type Output = <Self as Resolvable>::To;
1723    type IntoFuture = Ready<<Self as Resolvable>::To>;
1724
1725    #[zenoh_macros::unstable]
1726    fn into_future(self) -> Self::IntoFuture {
1727        std::future::ready(self.wait())
1728    }
1729}