zenoh_ext/
advanced_subscriber.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::{collections::BTreeMap, future::IntoFuture, str::FromStr};
15
16use zenoh::{
17    config::ZenohId,
18    handlers::{Callback, CallbackParameter, IntoHandler},
19    key_expr::KeyExpr,
20    liveliness::{LivelinessSubscriberBuilder, LivelinessToken},
21    pubsub::SubscriberBuilder,
22    query::{
23        ConsolidationMode, Parameters, Selector, TimeBound, TimeExpr, TimeRange, ZenohParameters,
24    },
25    sample::{Locality, Sample, SampleKind},
26    session::{EntityGlobalId, EntityId},
27    Resolvable, Resolve, Session, Wait, KE_ADV_PREFIX, KE_EMPTY, KE_PUB, KE_STAR, KE_STARSTAR,
28    KE_SUB,
29};
30use zenoh_util::{Timed, TimedEvent, Timer};
31#[zenoh_macros::unstable]
32use {
33    async_trait::async_trait,
34    std::collections::hash_map::Entry,
35    std::collections::HashMap,
36    std::convert::TryFrom,
37    std::future::Ready,
38    std::sync::{Arc, Mutex},
39    std::time::Duration,
40    uhlc::ID,
41    zenoh::handlers::{locked, DefaultHandler},
42    zenoh::internal::{runtime::ZRuntime, zlock},
43    zenoh::pubsub::Subscriber,
44    zenoh::query::{QueryTarget, Reply, ReplyKeyExpr},
45    zenoh::time::Timestamp,
46    zenoh::Result as ZResult,
47};
48
49use crate::{
50    advanced_cache::{ke_liveliness, KE_UHLC},
51    utils::WrappingSn,
52    z_deserialize,
53};
54
55#[derive(Debug, Default, Clone)]
56/// Configure query for historical data for [`history`](crate::AdvancedSubscriberBuilder::history) method.
57#[zenoh_macros::unstable]
58pub struct HistoryConfig {
59    liveliness: bool,
60    sample_depth: Option<usize>,
61    age: Option<f64>,
62}
63
64#[zenoh_macros::unstable]
65impl HistoryConfig {
66    /// Enable detection of late joiner publishers and query for their historical data.
67    ///
68    /// Late joiner detection can only be achieved for [`AdvancedPublishers`](crate::AdvancedPublisher) that enable publisher_detection.
69    /// History can only be retransmitted by [`AdvancedPublishers`](crate::AdvancedPublisher) that enable [`cache`](crate::AdvancedPublisherBuilder::cache).
70    #[inline]
71    #[zenoh_macros::unstable]
72    pub fn detect_late_publishers(mut self) -> Self {
73        self.liveliness = true;
74        self
75    }
76
77    /// Specify how many samples to query for each resource.
78    #[zenoh_macros::unstable]
79    pub fn max_samples(mut self, depth: usize) -> Self {
80        self.sample_depth = Some(depth);
81        self
82    }
83
84    /// Specify the maximum age of samples to query.
85    #[zenoh_macros::unstable]
86    pub fn max_age(mut self, seconds: f64) -> Self {
87        self.age = Some(seconds);
88        self
89    }
90}
91
92#[derive(Debug, Default, Clone, Copy)]
93/// Configure retransmission.
94#[zenoh_macros::unstable]
95pub struct RecoveryConfig<const CONFIGURED: bool = true> {
96    periodic_queries: Option<Duration>,
97    heartbeat: bool,
98}
99
100#[zenoh_macros::unstable]
101impl RecoveryConfig<false> {
102    /// Enable periodic queries for not yet received Samples and specify their period.
103    ///
104    /// This allows retrieving the last Sample(s) if the last Sample(s) is/are lost.
105    /// So it is useful for sporadic publications but useless for periodic publications
106    /// with a period smaller or equal to this period.
107    /// Retransmission can only be achieved by [`AdvancedPublishers`](crate::AdvancedPublisher)
108    /// that enable [`cache`](crate::AdvancedPublisherBuilder::cache) and
109    /// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection).
110    #[zenoh_macros::unstable]
111    #[inline]
112    pub fn periodic_queries(self, period: Duration) -> RecoveryConfig<true> {
113        RecoveryConfig {
114            periodic_queries: Some(period),
115            heartbeat: false,
116        }
117    }
118
119    /// Subscribe to heartbeats of [`AdvancedPublishers`](crate::AdvancedPublisher).
120    ///
121    /// This allows receiving the last published Sample's sequence number and check for misses.
122    /// Heartbeat subscriber must be paired with [`AdvancedPublishers`](crate::AdvancedPublisher)
123    /// that enable [`cache`](crate::AdvancedPublisherBuilder::cache) and
124    /// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection) with
125    /// [`heartbeat`](crate::advanced_publisher::MissDetectionConfig::heartbeat) or
126    /// [`sporadic_heartbeat`](crate::advanced_publisher::MissDetectionConfig::sporadic_heartbeat).
127    #[zenoh_macros::unstable]
128    #[inline]
129    pub fn heartbeat(self) -> RecoveryConfig<true> {
130        RecoveryConfig {
131            periodic_queries: None,
132            heartbeat: true,
133        }
134    }
135}
136
137/// The builder of an [`AdvancedSubscriber`], allowing to configure it.
138#[zenoh_macros::unstable]
139pub struct AdvancedSubscriberBuilder<'a, 'b, 'c, Handler, const BACKGROUND: bool = false> {
140    pub(crate) session: &'a Session,
141    pub(crate) key_expr: ZResult<KeyExpr<'b>>,
142    pub(crate) origin: Locality,
143    pub(crate) retransmission: Option<RecoveryConfig>,
144    pub(crate) query_target: QueryTarget,
145    pub(crate) query_timeout: Duration,
146    pub(crate) history: Option<HistoryConfig>,
147    pub(crate) liveliness: bool,
148    pub(crate) meta_key_expr: Option<ZResult<KeyExpr<'c>>>,
149    pub(crate) handler: Handler,
150}
151
152#[zenoh_macros::unstable]
153impl<'a, 'b, Handler> AdvancedSubscriberBuilder<'a, 'b, '_, Handler> {
154    #[zenoh_macros::unstable]
155    pub(crate) fn new(builder: SubscriberBuilder<'a, 'b, Handler>) -> Self {
156        AdvancedSubscriberBuilder {
157            session: builder.session,
158            key_expr: builder.key_expr,
159            origin: builder.origin,
160            handler: builder.handler,
161            retransmission: None,
162            query_target: QueryTarget::All,
163            query_timeout: Duration::from_secs(10),
164            history: None,
165            liveliness: false,
166            meta_key_expr: None,
167        }
168    }
169}
170
171#[zenoh_macros::unstable]
172impl<'a, 'b, 'c> AdvancedSubscriberBuilder<'a, 'b, 'c, DefaultHandler> {
173    /// Add callback to AdvancedSubscriber.
174    #[inline]
175    #[zenoh_macros::unstable]
176    pub fn callback<F>(self, callback: F) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback<Sample>>
177    where
178        F: Fn(Sample) + Send + Sync + 'static,
179    {
180        self.with(Callback::from(callback))
181    }
182
183    /// Add callback to `AdvancedSubscriber`.
184    ///
185    /// Using this guarantees that your callback will never be called concurrently.
186    /// If your callback is also accepted by the [`callback`](AdvancedSubscriberBuilder::callback) method, we suggest you use it instead of `callback_mut`
187    #[inline]
188    #[zenoh_macros::unstable]
189    pub fn callback_mut<F>(
190        self,
191        callback: F,
192    ) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback<Sample>>
193    where
194        F: FnMut(Sample) + Send + Sync + 'static,
195    {
196        self.callback(locked(callback))
197    }
198
199    /// Make the built AdvancedSubscriber an [`AdvancedSubscriber`](AdvancedSubscriber).
200    #[inline]
201    #[zenoh_macros::unstable]
202    pub fn with<Handler>(self, handler: Handler) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler>
203    where
204        Handler: IntoHandler<Sample>,
205    {
206        AdvancedSubscriberBuilder {
207            session: self.session,
208            key_expr: self.key_expr,
209            origin: self.origin,
210            retransmission: self.retransmission,
211            query_target: self.query_target,
212            query_timeout: self.query_timeout,
213            history: self.history,
214            liveliness: self.liveliness,
215            meta_key_expr: self.meta_key_expr,
216            handler,
217        }
218    }
219}
220
221#[zenoh_macros::unstable]
222impl<'a, 'b, 'c> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback<Sample>> {
223    /// Make the subscriber run in background until the session is closed.
224    ///
225    /// Background builder doesn't return a `AdvancedSubscriber` object anymore.
226    pub fn background(self) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback<Sample>, true> {
227        AdvancedSubscriberBuilder {
228            session: self.session,
229            key_expr: self.key_expr,
230            origin: self.origin,
231            retransmission: self.retransmission,
232            query_target: self.query_target,
233            query_timeout: self.query_timeout,
234            history: self.history,
235            liveliness: self.liveliness,
236            meta_key_expr: self.meta_key_expr,
237            handler: self.handler,
238        }
239    }
240}
241
242#[zenoh_macros::unstable]
243impl<'a, 'c, Handler, const BACKGROUND: bool>
244    AdvancedSubscriberBuilder<'a, '_, 'c, Handler, BACKGROUND>
245{
246    /// Restrict the matching publications that will be received by this [`Subscriber`]
247    /// to the ones that have the given [`Locality`](crate::prelude::Locality).
248    #[zenoh_macros::unstable]
249    #[inline]
250    pub fn allowed_origin(mut self, origin: Locality) -> Self {
251        self.origin = origin;
252        self
253    }
254
255    /// Ask for retransmission of detected lost Samples.
256    ///
257    /// Retransmission can only be achieved by [`AdvancedPublishers`](crate::AdvancedPublisher)
258    /// that enable [`cache`](crate::AdvancedPublisherBuilder::cache) and
259    /// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection).
260    #[zenoh_macros::unstable]
261    #[inline]
262    pub fn recovery(mut self, conf: RecoveryConfig) -> Self {
263        self.retransmission = Some(conf);
264        self
265    }
266
267    // /// Change the target to be used for queries.
268
269    // #[inline]
270    // pub fn query_target(mut self, query_target: QueryTarget) -> Self {
271    //     self.query_target = query_target;
272    //     self
273    // }
274
275    /// Change the timeout to be used for queries (history, retransmission).
276    #[zenoh_macros::unstable]
277    #[inline]
278    pub fn query_timeout(mut self, query_timeout: Duration) -> Self {
279        self.query_timeout = query_timeout;
280        self
281    }
282
283    /// Enable query for historical data.
284    ///
285    /// History can only be retransmitted by [`AdvancedPublishers`](crate::AdvancedPublisher) that enable [`cache`](crate::AdvancedPublisherBuilder::cache).
286    #[zenoh_macros::unstable]
287    #[inline]
288    pub fn history(mut self, config: HistoryConfig) -> Self {
289        self.history = Some(config);
290        self
291    }
292
293    /// Allow this subscriber to be detected through liveliness.
294    #[zenoh_macros::unstable]
295    pub fn subscriber_detection(mut self) -> Self {
296        self.liveliness = true;
297        self
298    }
299
300    /// A key expression added to the liveliness token key expression.
301    /// It can be used to convey metadata.
302    #[zenoh_macros::unstable]
303    pub fn subscriber_detection_metadata<TryIntoKeyExpr>(mut self, meta: TryIntoKeyExpr) -> Self
304    where
305        TryIntoKeyExpr: TryInto<KeyExpr<'c>>,
306        <TryIntoKeyExpr as TryInto<KeyExpr<'c>>>::Error: Into<zenoh::Error>,
307    {
308        self.meta_key_expr = Some(meta.try_into().map_err(Into::into));
309        self
310    }
311
312    #[zenoh_macros::unstable]
313    fn with_static_keys(self) -> AdvancedSubscriberBuilder<'a, 'static, 'static, Handler> {
314        AdvancedSubscriberBuilder {
315            session: self.session,
316            key_expr: self.key_expr.map(|s| s.into_owned()),
317            origin: self.origin,
318            retransmission: self.retransmission,
319            query_target: self.query_target,
320            query_timeout: self.query_timeout,
321            history: self.history,
322            liveliness: self.liveliness,
323            meta_key_expr: self.meta_key_expr.map(|s| s.map(|s| s.into_owned())),
324            handler: self.handler,
325        }
326    }
327}
328
329#[zenoh_macros::unstable]
330impl<Handler> Resolvable for AdvancedSubscriberBuilder<'_, '_, '_, Handler>
331where
332    Handler: IntoHandler<Sample>,
333    Handler::Handler: Send,
334{
335    type To = ZResult<AdvancedSubscriber<Handler::Handler>>;
336}
337
338#[zenoh_macros::unstable]
339impl<Handler> Wait for AdvancedSubscriberBuilder<'_, '_, '_, Handler>
340where
341    Handler: IntoHandler<Sample> + Send,
342    Handler::Handler: Send,
343{
344    #[zenoh_macros::unstable]
345    fn wait(self) -> <Self as Resolvable>::To {
346        AdvancedSubscriber::new(self.with_static_keys())
347    }
348}
349
350#[zenoh_macros::unstable]
351impl<Handler> IntoFuture for AdvancedSubscriberBuilder<'_, '_, '_, Handler>
352where
353    Handler: IntoHandler<Sample> + Send,
354    Handler::Handler: Send,
355{
356    type Output = <Self as Resolvable>::To;
357    type IntoFuture = Ready<<Self as Resolvable>::To>;
358
359    #[zenoh_macros::unstable]
360    fn into_future(self) -> Self::IntoFuture {
361        std::future::ready(self.wait())
362    }
363}
364
365#[zenoh_macros::unstable]
366impl Resolvable for AdvancedSubscriberBuilder<'_, '_, '_, Callback<Sample>, true> {
367    type To = ZResult<()>;
368}
369
370#[zenoh_macros::unstable]
371impl Wait for AdvancedSubscriberBuilder<'_, '_, '_, Callback<Sample>, true> {
372    #[zenoh_macros::unstable]
373    fn wait(self) -> <Self as Resolvable>::To {
374        let mut sub = AdvancedSubscriber::new(self.with_static_keys())?;
375        sub.set_background_impl(true);
376        Ok(())
377    }
378}
379
380#[zenoh_macros::unstable]
381impl IntoFuture for AdvancedSubscriberBuilder<'_, '_, '_, Callback<Sample>, true> {
382    type Output = <Self as Resolvable>::To;
383    type IntoFuture = Ready<<Self as Resolvable>::To>;
384
385    #[zenoh_macros::unstable]
386    fn into_future(self) -> Self::IntoFuture {
387        std::future::ready(self.wait())
388    }
389}
390
391#[zenoh_macros::unstable]
392struct Period {
393    timer: Timer,
394    period: Duration,
395}
396
397#[zenoh_macros::unstable]
398struct State {
399    next_id: usize,
400    global_pending_queries: u64,
401    sequenced_states: HashMap<EntityGlobalId, SourceState<WrappingSn>>,
402    timestamped_states: HashMap<ID, SourceState<Timestamp>>,
403    session: Session,
404    key_expr: KeyExpr<'static>,
405    retransmission: bool,
406    period: Option<Period>,
407    history_depth: usize,
408    query_target: QueryTarget,
409    query_timeout: Duration,
410    callback: Callback<Sample>,
411    miss_handlers: HashMap<usize, Callback<Miss>>,
412    token: Option<LivelinessToken>,
413}
414
415#[zenoh_macros::unstable]
416impl State {
417    #[zenoh_macros::unstable]
418    fn register_miss_callback(&mut self, callback: Callback<Miss>) -> usize {
419        let id = self.next_id;
420        self.next_id += 1;
421        self.miss_handlers.insert(id, callback);
422        id
423    }
424    #[zenoh_macros::unstable]
425    fn unregister_miss_callback(&mut self, id: &usize) {
426        self.miss_handlers.remove(id);
427    }
428}
429
430macro_rules! spawn_periodic_queries {
431    ($p:expr,$s:expr,$r:expr) => {{
432        if let Some(period) = &$p.period {
433            period.timer.add(TimedEvent::periodic(
434                period.period,
435                PeriodicQuery {
436                    source_id: $s,
437                    statesref: $r,
438                },
439            ))
440        }
441    }};
442}
443
444#[zenoh_macros::unstable]
445struct SourceState<T> {
446    last_delivered: Option<T>,
447    pending_queries: u64,
448    pending_samples: BTreeMap<T, Sample>,
449}
450
451/*
452use zenoh_ext::{AdvancedSubscriberBuilderExt, HistoryConfig, RecoveryConfig};
453
454let session = zenoh::open(zenoh::Config::default()).await.unwrap();
455let subscriber = session
456    .declare_subscriber("key/expression")
457    .history(HistoryConfig::default().detect_late_publishers())
458    .recovery(RecoveryConfig::default())
459    .await
460    .unwrap();
461
462let miss_listener = subscriber.sample_miss_listener().await.unwrap();
463loop {
464    tokio::select! {
465        sample = subscriber.recv_async() => {
466            if let Ok(sample) = sample {
467                // ...
468            }
469        },
470        miss = miss_listener.recv_async() => {
471            if let Ok(miss) = miss {
472                // ...
473            }
474        },
475    }
476}
477*/
478
479/// The extension to [`Subscriber`](zenoh::pubsub::Subscriber) that provides advanced functionalities
480///
481/// The `AdvancedSubscriber` is constructed over a regular [`Subscriber`](zenoh::pubsub::Subscriber)
482/// through [`advanced`](crate::AdvancedSubscriberBuilderExt::advanced) method or by using
483/// any other method of [`AdvancedSubscriberBuilder`](crate::AdvancedSubscriberBuilder).
484///
485/// The `AdvancedSubscriber` works with [`AdvancedPublisher`](crate::AdvancedPublisher) to provide additional functionalities such as:
486/// * missing samples detection using periodic queries or heartbeat subscription configurable with [`recovery`](crate::AdvancedSubscriberBuilder::recovery) method
487/// * recovering missing samples, configured with [`history`](crate::AdvancedSubscriberBuilder::history) method
488///   (max age and sample count, late joiner detection and requesting)
489/// * liveliness-based subscriber detection with [`subscriber_detection`](crate::AdvancedSubscriberBuilder::subscriber_detection) method
490///
491/// # Examples
492/// ```no_run
493/// # #[tokio::main]
494/// # async fn main() {
495/// use zenoh_ext::{AdvancedSubscriberBuilderExt, HistoryConfig, RecoveryConfig};
496/// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
497/// let subscriber = session
498///     .declare_subscriber("key/expression")
499///     .history(HistoryConfig::default().detect_late_publishers())
500///     .recovery(RecoveryConfig::default().heartbeat())
501///     .subscriber_detection()
502///     .await
503///     .unwrap();
504/// let miss_listener = subscriber.sample_miss_listener().await.unwrap();
505/// loop {
506///     tokio::select! {
507///         sample = subscriber.recv_async() => {
508///             if let Ok(sample) = sample {
509///                 // ...
510///             }
511///         },
512///         miss = miss_listener.recv_async() => {
513///             if let Ok(miss) = miss {
514///                 // ...
515///             }
516///         },
517///     }
518/// }
519/// # }
520/// ```
521#[zenoh_macros::unstable]
522pub struct AdvancedSubscriber<Receiver> {
523    statesref: Arc<Mutex<State>>,
524    subscriber: Subscriber<()>,
525    receiver: Receiver,
526    liveliness_subscriber: Option<Subscriber<()>>,
527    heartbeat_subscriber: Option<Subscriber<()>>,
528}
529
530#[zenoh_macros::unstable]
531impl<Receiver> std::ops::Deref for AdvancedSubscriber<Receiver> {
532    type Target = Receiver;
533    fn deref(&self) -> &Self::Target {
534        &self.receiver
535    }
536}
537
538#[zenoh_macros::unstable]
539impl<Receiver> std::ops::DerefMut for AdvancedSubscriber<Receiver> {
540    fn deref_mut(&mut self) -> &mut Self::Target {
541        &mut self.receiver
542    }
543}
544
545#[zenoh_macros::unstable]
546fn handle_sample(states: &mut State, sample: Sample) -> bool {
547    if let Some(source_info) = sample.source_info().cloned() {
548        #[inline]
549        fn deliver_and_flush(
550            sample: Sample,
551            source_sn: impl Into<WrappingSn>,
552            callback: &Callback<Sample>,
553            state: &mut SourceState<WrappingSn>,
554        ) {
555            let mut source_sn = source_sn.into();
556            callback.call(sample);
557            state.last_delivered = Some(source_sn);
558            while let Some(sample) = state.pending_samples.remove(&(source_sn + 1)) {
559                callback.call(sample);
560                source_sn += 1;
561                state.last_delivered = Some(source_sn);
562            }
563        }
564
565        let entry = states.sequenced_states.entry(*source_info.source_id());
566        let new = matches!(&entry, Entry::Vacant(_));
567        let state = entry.or_insert(SourceState::<WrappingSn> {
568            last_delivered: None,
569            pending_queries: 0,
570            pending_samples: BTreeMap::new(),
571        });
572        if state.last_delivered.is_none() && states.global_pending_queries != 0 {
573            // Avoid going through the Map if history_depth == 1
574            if states.history_depth == 1 {
575                state.last_delivered = Some(source_info.source_sn().into());
576                states.callback.call(sample);
577            } else {
578                state
579                    .pending_samples
580                    .insert(source_info.source_sn().into(), sample);
581                if state.pending_samples.len() >= states.history_depth {
582                    if let Some((sn, sample)) = state.pending_samples.pop_first() {
583                        deliver_and_flush(sample, sn, &states.callback, state);
584                    }
585                }
586            }
587        } else if state.last_delivered.is_some()
588            && source_info.source_sn() != state.last_delivered.unwrap() + 1
589        {
590            if source_info.source_sn() > state.last_delivered.unwrap() {
591                if states.retransmission {
592                    state
593                        .pending_samples
594                        .insert(source_info.source_sn().into(), sample);
595                } else {
596                    tracing::info!(
597                        "Sample missed: missed {} samples from {:?}.",
598                        source_info.source_sn() - state.last_delivered.unwrap() - 1,
599                        source_info.source_id(),
600                    );
601                    for miss_callback in states.miss_handlers.values() {
602                        miss_callback.call(Miss {
603                            source: *source_info.source_id(),
604                            nb: source_info.source_sn() - state.last_delivered.unwrap() - 1,
605                        });
606                    }
607                    states.callback.call(sample);
608                    state.last_delivered = Some(source_info.source_sn().into());
609                }
610            }
611        } else {
612            deliver_and_flush(sample, source_info.source_sn(), &states.callback, state);
613        }
614        new
615    } else if let Some(timestamp) = sample.timestamp() {
616        let entry = states.timestamped_states.entry(*timestamp.get_id());
617        let state = entry.or_insert(SourceState::<Timestamp> {
618            last_delivered: None,
619            pending_queries: 0,
620            pending_samples: BTreeMap::new(),
621        });
622        if state.last_delivered.map(|t| t < *timestamp).unwrap_or(true) {
623            if (states.global_pending_queries == 0 && state.pending_queries == 0)
624                || states.history_depth == 1
625            {
626                state.last_delivered = Some(*timestamp);
627                states.callback.call(sample);
628            } else {
629                state.pending_samples.entry(*timestamp).or_insert(sample);
630                if state.pending_samples.len() >= states.history_depth {
631                    flush_timestamped_source(state, &states.callback);
632                }
633            }
634        }
635        false
636    } else {
637        states.callback.call(sample);
638        false
639    }
640}
641
642#[zenoh_macros::unstable]
643fn seq_num_range(start: Option<WrappingSn>, end: Option<WrappingSn>) -> String {
644    match (start, end) {
645        (Some(start), Some(end)) => format!("_sn={start}..{end}"),
646        (Some(start), None) => format!("_sn={start}.."),
647        (None, Some(end)) => format!("_sn=..{end}"),
648        (None, None) => "_sn=..".to_string(),
649    }
650}
651
652#[zenoh_macros::unstable]
653#[derive(Clone)]
654struct PeriodicQuery {
655    source_id: EntityGlobalId,
656    statesref: Arc<Mutex<State>>,
657}
658
659#[zenoh_macros::unstable]
660#[async_trait]
661impl Timed for PeriodicQuery {
662    async fn run(&mut self) {
663        let mut lock = zlock!(self.statesref);
664        let states = &mut *lock;
665        if let Some(state) = states.sequenced_states.get_mut(&self.source_id) {
666            state.pending_queries += 1;
667            let query_expr = &states.key_expr
668                / KE_ADV_PREFIX
669                / KE_STAR
670                / &self.source_id.zid().into_keyexpr()
671                / &KeyExpr::try_from(self.source_id.eid().to_string()).unwrap()
672                / KE_STARSTAR;
673            let seq_num_range = seq_num_range(state.last_delivered.map(|s| s + 1), None);
674
675            let session = states.session.clone();
676            let key_expr = states.key_expr.clone().into_owned();
677            let query_target = states.query_target;
678            let query_timeout = states.query_timeout;
679
680            tracing::trace!(
681                "AdvancedSubscriber{{key_expr: {}}}: Querying undelivered samples {}?{}",
682                states.key_expr,
683                query_expr,
684                seq_num_range
685            );
686            drop(lock);
687
688            let handler = SequencedRepliesHandler {
689                source_id: self.source_id,
690                statesref: self.statesref.clone(),
691            };
692            let _ = session
693                .get(Selector::from((query_expr, seq_num_range)))
694                .callback({
695                    move |r: Reply| {
696                        if let Ok(s) = r.into_result() {
697                            if key_expr.intersects(s.key_expr()) {
698                                let states = &mut *zlock!(handler.statesref);
699                                tracing::trace!(
700                                    "AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}",
701                                    states.key_expr,
702                                    s.source_info(),
703                                    s.timestamp()
704                                );
705                                handle_sample(states, s);
706                            }
707                        }
708                    }
709                })
710                .consolidation(ConsolidationMode::None)
711                .accept_replies(ReplyKeyExpr::Any)
712                .target(query_target)
713                .timeout(query_timeout)
714                .wait();
715        }
716    }
717}
718
719#[zenoh_macros::unstable]
720impl<Handler> AdvancedSubscriber<Handler> {
721    fn new<H>(conf: AdvancedSubscriberBuilder<'_, '_, '_, H>) -> ZResult<Self>
722    where
723        H: IntoHandler<Sample, Handler = Handler> + Send,
724    {
725        let (callback, receiver) = conf.handler.into_handler();
726        let key_expr = conf.key_expr?;
727        let meta = match conf.meta_key_expr {
728            Some(meta) => Some(meta?),
729            None => None,
730        };
731        let retransmission = conf.retransmission;
732        let query_target = conf.query_target;
733        let query_timeout = conf.query_timeout;
734        let session = conf.session.clone();
735        let statesref = Arc::new(Mutex::new(State {
736            next_id: 0,
737            sequenced_states: HashMap::new(),
738            timestamped_states: HashMap::new(),
739            global_pending_queries: if conf.history.is_some() { 1 } else { 0 },
740            session,
741            period: retransmission.as_ref().and_then(|r| {
742                let _rt = ZRuntime::Application.enter();
743                r.periodic_queries.map(|p| Period {
744                    timer: Timer::new(false),
745                    period: p,
746                })
747            }),
748            key_expr: key_expr.clone().into_owned(),
749            retransmission: retransmission.is_some(),
750            history_depth: conf
751                .history
752                .as_ref()
753                .and_then(|h| h.sample_depth)
754                .unwrap_or_default(),
755            query_target: conf.query_target,
756            query_timeout: conf.query_timeout,
757            callback: callback.clone(),
758            miss_handlers: HashMap::new(),
759            token: None,
760        }));
761
762        let sub_callback = {
763            let statesref = statesref.clone();
764            let session = conf.session.clone();
765            let key_expr = key_expr.clone().into_owned();
766
767            move |s: Sample| {
768                let mut lock = zlock!(statesref);
769                let states = &mut *lock;
770                let source_id = s.source_info().map(|si| *si.source_id());
771                let new = handle_sample(states, s);
772
773                if let Some(source_id) = source_id {
774                    if new {
775                        spawn_periodic_queries!(states, source_id, statesref.clone());
776                    }
777                    if let Some(state) = states.sequenced_states.get_mut(&source_id) {
778                        if retransmission.is_some()
779                            && state.pending_queries == 0
780                            && !state.pending_samples.is_empty()
781                        {
782                            state.pending_queries += 1;
783                            let query_expr = &key_expr
784                                / KE_ADV_PREFIX
785                                / KE_STAR
786                                / &source_id.zid().into_keyexpr()
787                                / &KeyExpr::try_from(source_id.eid().to_string()).unwrap()
788                                / KE_STARSTAR;
789                            let seq_num_range =
790                                seq_num_range(state.last_delivered.map(|s| s + 1), None);
791                            tracing::trace!(
792                                "AdvancedSubscriber{{key_expr: {}}}: Querying missing samples {}?{}",
793                                states.key_expr,
794                                query_expr,
795                                seq_num_range
796                            );
797                            drop(lock);
798                            let handler = SequencedRepliesHandler {
799                                source_id,
800                                statesref: statesref.clone(),
801                            };
802                            let _ = session
803                                .get(Selector::from((query_expr, seq_num_range)))
804                                .callback({
805                                    let key_expr = key_expr.clone().into_owned();
806                                    move |r: Reply| {
807                                        if let Ok(s) = r.into_result() {
808                                            if key_expr.intersects(s.key_expr()) {
809                                                let states = &mut *zlock!(handler.statesref);
810                                                tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
811                                                handle_sample(states, s);
812                                            }
813                                        }
814                                    }
815                                })
816                                .consolidation(ConsolidationMode::None)
817                                .accept_replies(ReplyKeyExpr::Any)
818                                .target(query_target)
819                                .timeout(query_timeout)
820                                .wait();
821                        }
822                    }
823                }
824            }
825        };
826
827        let subscriber = conf
828            .session
829            .declare_subscriber(&key_expr)
830            .callback(sub_callback)
831            .allowed_origin(conf.origin)
832            .wait()?;
833
834        tracing::debug!("Create AdvancedSubscriber{{key_expr: {}}}", key_expr,);
835
836        if let Some(historyconf) = conf.history.as_ref() {
837            let handler = InitialRepliesHandler {
838                statesref: statesref.clone(),
839            };
840            let mut params = Parameters::empty();
841            if let Some(max) = historyconf.sample_depth {
842                params.insert("_max", max.to_string());
843            }
844            if let Some(age) = historyconf.age {
845                params.set_time_range(TimeRange {
846                    start: TimeBound::Inclusive(TimeExpr::Now { offset_secs: -age }),
847                    end: TimeBound::Unbounded,
848                });
849            }
850            tracing::trace!(
851                "AdvancedSubscriber{{key_expr: {}}} Querying historical samples {}?{}",
852                key_expr,
853                &key_expr / KE_ADV_PREFIX / KE_STARSTAR,
854                params
855            );
856            let _ = conf
857                .session
858                .get(Selector::from((
859                    &key_expr / KE_ADV_PREFIX / KE_STARSTAR,
860                    params,
861                )))
862                .callback({
863                    let key_expr = key_expr.clone().into_owned();
864                    move |r: Reply| {
865                        if let Ok(s) = r.into_result() {
866                            if key_expr.intersects(s.key_expr()) {
867                                let states = &mut *zlock!(handler.statesref);
868                                tracing::trace!(
869                                    "AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}",
870                                    states.key_expr,
871                                    s.source_info(),
872                                    s.timestamp()
873                                );
874                                handle_sample(states, s);
875                            }
876                        }
877                    }
878                })
879                .consolidation(ConsolidationMode::None)
880                .accept_replies(ReplyKeyExpr::Any)
881                .target(query_target)
882                .timeout(query_timeout)
883                .wait();
884        }
885
886        let liveliness_subscriber = if let Some(historyconf) = conf.history.as_ref() {
887            if historyconf.liveliness {
888                let live_callback = {
889                    let session = conf.session.clone();
890                    let statesref = statesref.clone();
891                    let key_expr = key_expr.clone().into_owned();
892                    let historyconf = historyconf.clone();
893                    move |s: Sample| {
894                        if s.kind() == SampleKind::Put {
895                            if let Ok(parsed) = ke_liveliness::parse(s.key_expr().as_keyexpr()) {
896                                if let Ok(zid) = ZenohId::from_str(parsed.zid().as_str()) {
897                                    // TODO : If we already have a state associated to this discovered source
898                                    // we should query with the appropriate range to avoid unnecessary retransmissions
899                                    if parsed.eid() == KE_UHLC {
900                                        let mut lock = zlock!(statesref);
901                                        let states = &mut *lock;
902                                        tracing::trace!(
903                                            "AdvancedSubscriber{{key_expr: {}}}: Detect late joiner publishers with zid={}",
904                                            states.key_expr,
905                                            parsed.zid().as_str()
906                                        );
907                                        let entry = states.timestamped_states.entry(ID::from(zid));
908                                        let state = entry.or_insert(SourceState::<Timestamp> {
909                                            last_delivered: None,
910                                            pending_queries: 0,
911                                            pending_samples: BTreeMap::new(),
912                                        });
913                                        state.pending_queries += 1;
914
915                                        let mut params = Parameters::empty();
916                                        if let Some(max) = historyconf.sample_depth {
917                                            params.insert("_max", max.to_string());
918                                        }
919                                        if let Some(age) = historyconf.age {
920                                            params.set_time_range(TimeRange {
921                                                start: TimeBound::Inclusive(TimeExpr::Now {
922                                                    offset_secs: -age,
923                                                }),
924                                                end: TimeBound::Unbounded,
925                                            });
926                                        }
927                                        tracing::trace!(
928                                            "AdvancedSubscriber{{key_expr: {}}}: Querying historical samples {}?{}",
929                                            states.key_expr,
930                                            s.key_expr(),
931                                            params
932                                        );
933                                        drop(lock);
934
935                                        let handler = TimestampedRepliesHandler {
936                                            id: ID::from(zid),
937                                            statesref: statesref.clone(),
938                                            callback: callback.clone(),
939                                        };
940                                        let _ = session
941                                            .get(Selector::from((s.key_expr(), params)))
942                                            .callback({
943                                                let key_expr = key_expr.clone().into_owned();
944                                                move |r: Reply| {
945                                                    if let Ok(s) = r.into_result() {
946                                                        if key_expr.intersects(s.key_expr()) {
947                                                            let states =
948                                                                &mut *zlock!(handler.statesref);
949                                                            tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
950                                                            handle_sample(states, s);
951                                                        }
952                                                    }
953                                                }
954                                            })
955                                            .consolidation(ConsolidationMode::None)
956                                            .accept_replies(ReplyKeyExpr::Any)
957                                            .target(query_target)
958                                            .timeout(query_timeout)
959                                            .wait();
960                                    } else if let Ok(eid) =
961                                        EntityId::from_str(parsed.eid().as_str())
962                                    {
963                                        let source_id = EntityGlobalId::new(zid, eid);
964                                        let mut lock = zlock!(statesref);
965                                        let states = &mut *lock;
966                                        tracing::trace!(
967                                            "AdvancedSubscriber{{key_expr: {}}}: Detect late joiner publishers with zid={}",
968                                            states.key_expr,
969                                            parsed.zid().as_str()
970                                        );
971                                        let entry = states.sequenced_states.entry(source_id);
972                                        let new = matches!(&entry, Entry::Vacant(_));
973                                        let state = entry.or_insert(SourceState::<WrappingSn> {
974                                            last_delivered: None,
975                                            pending_queries: 0,
976                                            pending_samples: BTreeMap::new(),
977                                        });
978                                        state.pending_queries += 1;
979
980                                        let mut params = Parameters::empty();
981                                        if let Some(max) = historyconf.sample_depth {
982                                            params.insert("_max", max.to_string());
983                                        }
984                                        if let Some(age) = historyconf.age {
985                                            params.set_time_range(TimeRange {
986                                                start: TimeBound::Inclusive(TimeExpr::Now {
987                                                    offset_secs: -age,
988                                                }),
989                                                end: TimeBound::Unbounded,
990                                            });
991                                        }
992                                        tracing::trace!(
993                                            "AdvancedSubscriber{{key_expr: {}}}: Querying historical samples {}?{}",
994                                            states.key_expr,
995                                            s.key_expr(),
996                                            params,
997                                        );
998                                        drop(lock);
999
1000                                        let handler = SequencedRepliesHandler {
1001                                            source_id,
1002                                            statesref: statesref.clone(),
1003                                        };
1004                                        let _ = session
1005                                            .get(Selector::from((s.key_expr(), params)))
1006                                            .callback({
1007                                                let key_expr = key_expr.clone().into_owned();
1008                                                move |r: Reply| {
1009                                                    if let Ok(s) = r.into_result() {
1010                                                        if key_expr.intersects(s.key_expr()) {
1011                                                            let states =
1012                                                                &mut *zlock!(handler.statesref);
1013                                                            tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
1014                                                            handle_sample(states, s);
1015                                                        }
1016                                                    }
1017                                                }
1018                                            })
1019                                            .consolidation(ConsolidationMode::None)
1020                                            .accept_replies(ReplyKeyExpr::Any)
1021                                            .target(query_target)
1022                                            .timeout(query_timeout)
1023                                            .wait();
1024
1025                                        if new {
1026                                            spawn_periodic_queries!(
1027                                                zlock!(statesref),
1028                                                source_id,
1029                                                statesref.clone()
1030                                            );
1031                                        }
1032                                    }
1033                                } else {
1034                                    let mut lock = zlock!(statesref);
1035                                    let states = &mut *lock;
1036                                    tracing::trace!(
1037                                        "AdvancedSubscriber{{key_expr: {}}}: Detect late joiner publishers with zid={}",
1038                                        states.key_expr,
1039                                        parsed.zid().as_str()
1040                                    );
1041                                    states.global_pending_queries += 1;
1042
1043                                    let mut params = Parameters::empty();
1044                                    if let Some(max) = historyconf.sample_depth {
1045                                        params.insert("_max", max.to_string());
1046                                    }
1047                                    if let Some(age) = historyconf.age {
1048                                        params.set_time_range(TimeRange {
1049                                            start: TimeBound::Inclusive(TimeExpr::Now {
1050                                                offset_secs: -age,
1051                                            }),
1052                                            end: TimeBound::Unbounded,
1053                                        });
1054                                    }
1055                                    tracing::trace!(
1056                                        "AdvancedSubscriber{{key_expr: {}}}: Querying historical samples {}?{}",
1057                                        states.key_expr,
1058                                        s.key_expr(),
1059                                        params,
1060                                    );
1061                                    drop(lock);
1062
1063                                    let handler = InitialRepliesHandler {
1064                                        statesref: statesref.clone(),
1065                                    };
1066                                    let _ = session
1067                                        .get(Selector::from((s.key_expr(), params)))
1068                                        .callback({
1069                                            let key_expr = key_expr.clone().into_owned();
1070                                            move |r: Reply| {
1071                                                if let Ok(s) = r.into_result() {
1072                                                    if key_expr.intersects(s.key_expr()) {
1073                                                        let states =
1074                                                            &mut *zlock!(handler.statesref);
1075                                                        tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
1076                                                        handle_sample(states, s);
1077                                                    }
1078                                                }
1079                                            }
1080                                        })
1081                                        .consolidation(ConsolidationMode::None)
1082                                        .accept_replies(ReplyKeyExpr::Any)
1083                                        .target(query_target)
1084                                        .timeout(query_timeout)
1085                                        .wait();
1086                                }
1087                            } else {
1088                                tracing::warn!(
1089                                    "AdvancedSubscriber{{}}: Received malformed liveliness token key expression: {}",
1090                                    s.key_expr()
1091                                );
1092                            }
1093                        }
1094                    }
1095                };
1096
1097                tracing::debug!(
1098                    "AdvancedSubscriber{{key_expr: {}}}: Detect late joiner publishers on {}",
1099                    key_expr,
1100                    &key_expr / KE_ADV_PREFIX / KE_PUB / KE_STARSTAR
1101                );
1102                Some(
1103                    conf.session
1104                        .liveliness()
1105                        .declare_subscriber(&key_expr / KE_ADV_PREFIX / KE_PUB / KE_STARSTAR)
1106                        // .declare_subscriber(keformat!(ke_liveliness_all::formatter(), zid = 0, eid = 0, remaining = key_expr).unwrap())
1107                        .history(true)
1108                        .callback(live_callback)
1109                        .wait()?,
1110                )
1111            } else {
1112                None
1113            }
1114        } else {
1115            None
1116        };
1117
1118        let heartbeat_subscriber = if retransmission.is_some_and(|r| r.heartbeat) {
1119            let ke_heartbeat_sub = &key_expr / KE_ADV_PREFIX / KE_PUB / KE_STARSTAR;
1120            let statesref = statesref.clone();
1121            tracing::debug!(
1122                "AdvancedSubscriber{{key_expr: {}}}: Enable heartbeat subscriber on {}",
1123                key_expr,
1124                ke_heartbeat_sub
1125            );
1126            let heartbeat_sub = conf
1127                .session
1128                .declare_subscriber(ke_heartbeat_sub)
1129                .callback(move |sample_hb| {
1130                    if sample_hb.kind() != SampleKind::Put {
1131                        return;
1132                    }
1133
1134                    let heartbeat_keyexpr = sample_hb.key_expr().as_keyexpr();
1135                    let Ok(parsed_keyexpr) = ke_liveliness::parse(heartbeat_keyexpr) else {
1136                        return;
1137                    };
1138                    let source_id = {
1139                        let Ok(zid) = ZenohId::from_str(parsed_keyexpr.zid().as_str()) else {
1140                            return;
1141                        };
1142                        let Ok(eid) = EntityId::from_str(parsed_keyexpr.eid().as_str()) else {
1143                            return;
1144                        };
1145                        EntityGlobalId::new(zid, eid)
1146                    };
1147
1148                    let Ok(heartbeat_sn) = z_deserialize::<WrappingSn>(sample_hb.payload()) else {
1149                        tracing::debug!(
1150                            "AdvancedSubscriber{{}}: Skipping invalid heartbeat payload on '{}'",
1151                            heartbeat_keyexpr
1152                        );
1153                        return;
1154                    };
1155
1156                    let mut lock = zlock!(statesref);
1157                    let states = &mut *lock;
1158                    let entry = states.sequenced_states.entry(source_id);
1159                    if matches!(&entry, Entry::Vacant(_)) {
1160                        // NOTE: API does not allow both heartbeat and periodic_queries
1161                        spawn_periodic_queries!(states, source_id, statesref.clone());
1162                        if states.global_pending_queries > 0 {
1163                            tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Skipping heartbeat on '{}' from publisher that is currently being pulled by global query", states.key_expr, heartbeat_keyexpr);
1164                            return;
1165                        }
1166                    }
1167
1168                    let state = entry.or_insert(SourceState::<WrappingSn> {
1169                        last_delivered: None,
1170                        pending_queries: 0,
1171                        pending_samples: BTreeMap::new(),
1172                    });
1173
1174                    // check that it's not an old sn, and that there are no pending queries
1175                    if (state.last_delivered.is_none()
1176                        || state.last_delivered.is_some_and(|sn| heartbeat_sn > sn))
1177                        && state.pending_queries == 0
1178                    {
1179                        let seq_num_range = seq_num_range(
1180                            state.last_delivered.map(|s| s + 1),
1181                            Some(heartbeat_sn),
1182                        );
1183
1184                        let session = states.session.clone();
1185                        let key_expr = states.key_expr.clone().into_owned();
1186                        let query_target = states.query_target;
1187                        let query_timeout = states.query_timeout;
1188                        state.pending_queries += 1;
1189
1190                        tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Querying missing samples {}?{}", states.key_expr, heartbeat_keyexpr, seq_num_range);
1191                        drop(lock);
1192
1193                        let handler = SequencedRepliesHandler {
1194                            source_id,
1195                            statesref: statesref.clone(),
1196                        };
1197                        let _ = session
1198                            .get(Selector::from((heartbeat_keyexpr, seq_num_range)))
1199                            .callback({
1200                                move |r: Reply| {
1201                                    if let Ok(s) = r.into_result() {
1202                                        if key_expr.intersects(s.key_expr()) {
1203                                            let states = &mut *zlock!(handler.statesref);
1204                                            tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
1205                                            handle_sample(states, s);
1206                                        }
1207                                    }
1208                                }
1209                            })
1210                            .consolidation(ConsolidationMode::None)
1211                            .accept_replies(ReplyKeyExpr::Any)
1212                            .target(query_target)
1213                            .timeout(query_timeout)
1214                            .wait();
1215                    }
1216                })
1217                .allowed_origin(conf.origin)
1218                .wait()?;
1219            Some(heartbeat_sub)
1220        } else {
1221            None
1222        };
1223
1224        if conf.liveliness {
1225            let suffix = KE_ADV_PREFIX
1226                / KE_SUB
1227                / &subscriber.id().zid().into_keyexpr()
1228                / &KeyExpr::try_from(subscriber.id().eid().to_string()).unwrap();
1229            let suffix = match meta {
1230                Some(meta) => suffix / &meta,
1231                // We need this empty chunk because of a routing matching bug
1232                _ => suffix / KE_EMPTY,
1233            };
1234            tracing::debug!(
1235                "AdvancedSubscriber{{key_expr: {}}}: Declare liveliness token {}",
1236                key_expr,
1237                &key_expr / &suffix,
1238            );
1239            let token = conf
1240                .session
1241                .liveliness()
1242                .declare_token(&key_expr / &suffix)
1243                .wait()?;
1244            zlock!(statesref).token = Some(token)
1245        }
1246
1247        let reliable_subscriber = AdvancedSubscriber {
1248            statesref,
1249            subscriber,
1250            receiver,
1251            liveliness_subscriber,
1252            heartbeat_subscriber,
1253        };
1254
1255        Ok(reliable_subscriber)
1256    }
1257
1258    /// Returns the [`EntityGlobalId`] of this AdvancedSubscriber.
1259    #[zenoh_macros::unstable]
1260    pub fn id(&self) -> EntityGlobalId {
1261        self.subscriber.id()
1262    }
1263
1264    /// Returns the [`KeyExpr`] this subscriber subscribes to.
1265    #[zenoh_macros::unstable]
1266    pub fn key_expr(&self) -> &KeyExpr<'static> {
1267        self.subscriber.key_expr()
1268    }
1269
1270    /// Returns a reference to this subscriber's handler.
1271    /// An handler is anything that implements [`zenoh::handlers::IntoHandler`].
1272    /// The default handler is [`zenoh::handlers::DefaultHandler`].
1273    #[zenoh_macros::unstable]
1274    pub fn handler(&self) -> &Handler {
1275        &self.receiver
1276    }
1277
1278    /// Returns a mutable reference to this subscriber's handler.
1279    /// An handler is anything that implements [`zenoh::handlers::IntoHandler`].
1280    /// The default handler is [`zenoh::handlers::DefaultHandler`].
1281    #[zenoh_macros::unstable]
1282    pub fn handler_mut(&mut self) -> &mut Handler {
1283        &mut self.receiver
1284    }
1285
1286    /// Declares a listener to detect missed samples.
1287    ///
1288    /// Missed samples can only be detected from [`AdvancedPublisher`](crate::AdvancedPublisher) that
1289    /// enable [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection).
1290    #[zenoh_macros::unstable]
1291    pub fn sample_miss_listener(&self) -> SampleMissListenerBuilder<'_, DefaultHandler> {
1292        SampleMissListenerBuilder {
1293            statesref: &self.statesref,
1294            handler: DefaultHandler::default(),
1295        }
1296    }
1297
1298    /// Declares a listener to detect matching publishers.
1299    ///
1300    /// Only [`AdvancedPublisher`](crate::AdvancedPublisher) that enable
1301    /// [`publisher_detection`](crate::AdvancedPublisherBuilder::publisher_detection) can be detected.
1302    #[zenoh_macros::unstable]
1303    pub fn detect_publishers(&self) -> LivelinessSubscriberBuilder<'_, '_, DefaultHandler> {
1304        self.subscriber
1305            .session()
1306            .liveliness()
1307            .declare_subscriber(self.subscriber.key_expr() / KE_ADV_PREFIX / KE_PUB / KE_STARSTAR)
1308    }
1309
1310    /// Undeclares this AdvancedSubscriber
1311    #[inline]
1312    #[zenoh_macros::unstable]
1313    pub fn undeclare(self) -> impl Resolve<ZResult<()>> {
1314        tracing::debug!(
1315            "AdvancedSubscriber{{key_expr: {}}}: Undeclare",
1316            self.key_expr()
1317        );
1318        self.subscriber.undeclare()
1319    }
1320
1321    fn set_background_impl(&mut self, background: bool) {
1322        self.subscriber.set_background(background);
1323        if let Some(mut liveliness_sub) = self.liveliness_subscriber.take() {
1324            liveliness_sub.set_background(background);
1325        }
1326        if let Some(mut heartbeat_sub) = self.heartbeat_subscriber.take() {
1327            heartbeat_sub.set_background(background);
1328        }
1329    }
1330
1331    #[zenoh_macros::internal]
1332    pub fn set_background(&mut self, background: bool) {
1333        self.set_background_impl(background)
1334    }
1335}
1336
1337#[zenoh_macros::unstable]
1338#[inline]
1339fn flush_sequenced_source(
1340    state: &mut SourceState<WrappingSn>,
1341    callback: &Callback<Sample>,
1342    source_id: &EntityGlobalId,
1343    miss_handlers: &HashMap<usize, Callback<Miss>>,
1344) {
1345    if state.pending_queries == 0 && !state.pending_samples.is_empty() {
1346        let mut pending_samples = BTreeMap::new();
1347        std::mem::swap(&mut state.pending_samples, &mut pending_samples);
1348        for (seq_num, sample) in pending_samples {
1349            match state.last_delivered {
1350                None => {
1351                    state.last_delivered = Some(seq_num);
1352                    callback.call(sample);
1353                }
1354                Some(last) if seq_num == last + 1 => {
1355                    state.last_delivered = Some(seq_num);
1356                    callback.call(sample);
1357                }
1358                Some(last) if seq_num > last + 1 => {
1359                    tracing::warn!(
1360                        "Sample missed: missed {} samples from {:?}.",
1361                        seq_num - last - 1,
1362                        source_id,
1363                    );
1364                    for miss_callback in miss_handlers.values() {
1365                        miss_callback.call(Miss {
1366                            source: *source_id,
1367                            nb: seq_num - last - 1,
1368                        })
1369                    }
1370                    state.last_delivered = Some(seq_num);
1371                    callback.call(sample);
1372                }
1373                _ => {
1374                    // duplicate
1375                }
1376            }
1377        }
1378    }
1379}
1380
1381#[zenoh_macros::unstable]
1382#[inline]
1383fn flush_timestamped_source(state: &mut SourceState<Timestamp>, callback: &Callback<Sample>) {
1384    if state.pending_queries == 0 && !state.pending_samples.is_empty() {
1385        let mut pending_samples = BTreeMap::new();
1386        std::mem::swap(&mut state.pending_samples, &mut pending_samples);
1387        for (timestamp, sample) in pending_samples {
1388            if state
1389                .last_delivered
1390                .map(|last| timestamp > last)
1391                .unwrap_or(true)
1392            {
1393                state.last_delivered = Some(timestamp);
1394                callback.call(sample);
1395            }
1396        }
1397    }
1398}
1399
1400#[zenoh_macros::unstable]
1401#[derive(Clone)]
1402struct InitialRepliesHandler {
1403    statesref: Arc<Mutex<State>>,
1404}
1405
1406#[zenoh_macros::unstable]
1407impl Drop for InitialRepliesHandler {
1408    fn drop(&mut self) {
1409        let states = &mut *zlock!(self.statesref);
1410        states.global_pending_queries = states.global_pending_queries.saturating_sub(1);
1411        tracing::trace!(
1412            "AdvancedSubscriber{{key_expr: {}}}: Flush initial replies",
1413            states.key_expr
1414        );
1415
1416        if states.global_pending_queries == 0 {
1417            for (source_id, state) in states.sequenced_states.iter_mut() {
1418                flush_sequenced_source(state, &states.callback, source_id, &states.miss_handlers);
1419                spawn_periodic_queries!(states, *source_id, self.statesref.clone());
1420            }
1421            for state in states.timestamped_states.values_mut() {
1422                flush_timestamped_source(state, &states.callback);
1423            }
1424        }
1425    }
1426}
1427
1428#[zenoh_macros::unstable]
1429#[derive(Clone)]
1430struct SequencedRepliesHandler {
1431    source_id: EntityGlobalId,
1432    statesref: Arc<Mutex<State>>,
1433}
1434
1435#[zenoh_macros::unstable]
1436impl Drop for SequencedRepliesHandler {
1437    fn drop(&mut self) {
1438        let states = &mut *zlock!(self.statesref);
1439        if let Some(state) = states.sequenced_states.get_mut(&self.source_id) {
1440            state.pending_queries = state.pending_queries.saturating_sub(1);
1441            if states.global_pending_queries == 0 {
1442                tracing::trace!(
1443                    "AdvancedSubscriber{{key_expr: {}}}: Flush sequenced samples",
1444                    states.key_expr
1445                );
1446                flush_sequenced_source(
1447                    state,
1448                    &states.callback,
1449                    &self.source_id,
1450                    &states.miss_handlers,
1451                )
1452            }
1453        }
1454    }
1455}
1456
1457#[zenoh_macros::unstable]
1458#[derive(Clone)]
1459struct TimestampedRepliesHandler {
1460    id: ID,
1461    statesref: Arc<Mutex<State>>,
1462    callback: Callback<Sample>,
1463}
1464
1465#[zenoh_macros::unstable]
1466impl Drop for TimestampedRepliesHandler {
1467    fn drop(&mut self) {
1468        let states = &mut *zlock!(self.statesref);
1469        if let Some(state) = states.timestamped_states.get_mut(&self.id) {
1470            state.pending_queries = state.pending_queries.saturating_sub(1);
1471            if states.global_pending_queries == 0 {
1472                tracing::trace!(
1473                    "AdvancedSubscriber{{key_expr: {}}}: Flush timestamped samples",
1474                    states.key_expr
1475                );
1476                flush_timestamped_source(state, &self.callback);
1477            }
1478        }
1479    }
1480}
1481
1482/// A struct that represent missed samples.
1483#[zenoh_macros::unstable]
1484#[derive(Debug, Clone)]
1485pub struct Miss {
1486    source: EntityGlobalId,
1487    nb: u32,
1488}
1489
1490impl Miss {
1491    /// The source of missed samples.
1492    pub fn source(&self) -> EntityGlobalId {
1493        self.source
1494    }
1495
1496    /// The number of missed samples.
1497    pub fn nb(&self) -> u32 {
1498        self.nb
1499    }
1500}
1501
1502impl CallbackParameter for Miss {
1503    type Message<'a> = Self;
1504
1505    fn from_message(msg: Self::Message<'_>) -> Self {
1506        msg
1507    }
1508}
1509
1510/// A listener to detect missed samples.
1511///
1512/// Missed samples can only be detected from [`AdvancedPublisher`](crate::AdvancedPublisher) that
1513/// enable [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection).
1514#[zenoh_macros::unstable]
1515pub struct SampleMissListener<Handler> {
1516    id: usize,
1517    statesref: Arc<Mutex<State>>,
1518    handler: Handler,
1519    undeclare_on_drop: bool,
1520}
1521
1522#[zenoh_macros::unstable]
1523impl<Handler> SampleMissListener<Handler> {
1524    #[inline]
1525    pub fn undeclare(self) -> SampleMissHandlerUndeclaration<Handler>
1526    where
1527        Handler: Send,
1528    {
1529        // self.undeclare_inner(())
1530        SampleMissHandlerUndeclaration(self)
1531    }
1532
1533    fn undeclare_impl(&mut self) -> ZResult<()> {
1534        // set the flag first to avoid double panic if this function panic
1535        self.undeclare_on_drop = false;
1536        zlock!(self.statesref).unregister_miss_callback(&self.id);
1537        Ok(())
1538    }
1539
1540    #[zenoh_macros::internal]
1541    pub fn set_background(&mut self, background: bool) {
1542        self.undeclare_on_drop = !background;
1543    }
1544}
1545
1546#[cfg(feature = "unstable")]
1547impl<Handler> Drop for SampleMissListener<Handler> {
1548    fn drop(&mut self) {
1549        if self.undeclare_on_drop {
1550            if let Err(error) = self.undeclare_impl() {
1551                tracing::error!(error);
1552            }
1553        }
1554    }
1555}
1556
1557// #[zenoh_macros::unstable]
1558// impl<Handler: Send> UndeclarableSealed<()> for SampleMissHandler<Handler> {
1559//     type Undeclaration = SampleMissHandlerUndeclaration<Handler>;
1560
1561//     fn undeclare_inner(self, _: ()) -> Self::Undeclaration {
1562//         SampleMissHandlerUndeclaration(self)
1563//     }
1564// }
1565
1566#[zenoh_macros::unstable]
1567impl<Handler> std::ops::Deref for SampleMissListener<Handler> {
1568    type Target = Handler;
1569
1570    fn deref(&self) -> &Self::Target {
1571        &self.handler
1572    }
1573}
1574#[zenoh_macros::unstable]
1575impl<Handler> std::ops::DerefMut for SampleMissListener<Handler> {
1576    fn deref_mut(&mut self) -> &mut Self::Target {
1577        &mut self.handler
1578    }
1579}
1580
1581/// A [`Resolvable`] returned by [`SampleMissListener::undeclare`]
1582#[zenoh_macros::unstable]
1583pub struct SampleMissHandlerUndeclaration<Handler>(SampleMissListener<Handler>);
1584
1585#[zenoh_macros::unstable]
1586impl<Handler> Resolvable for SampleMissHandlerUndeclaration<Handler> {
1587    type To = ZResult<()>;
1588}
1589
1590#[zenoh_macros::unstable]
1591impl<Handler> Wait for SampleMissHandlerUndeclaration<Handler> {
1592    fn wait(mut self) -> <Self as Resolvable>::To {
1593        self.0.undeclare_impl()
1594    }
1595}
1596
1597#[zenoh_macros::unstable]
1598impl<Handler> IntoFuture for SampleMissHandlerUndeclaration<Handler> {
1599    type Output = <Self as Resolvable>::To;
1600    type IntoFuture = Ready<<Self as Resolvable>::To>;
1601
1602    fn into_future(self) -> Self::IntoFuture {
1603        std::future::ready(self.wait())
1604    }
1605}
1606
1607/// A builder for initializing a [`SampleMissListener`].
1608#[zenoh_macros::unstable]
1609pub struct SampleMissListenerBuilder<'a, Handler, const BACKGROUND: bool = false> {
1610    statesref: &'a Arc<Mutex<State>>,
1611    handler: Handler,
1612}
1613
1614#[zenoh_macros::unstable]
1615impl<'a> SampleMissListenerBuilder<'a, DefaultHandler> {
1616    /// Receive the sample miss notification with a callback.
1617    #[inline]
1618    #[zenoh_macros::unstable]
1619    pub fn callback<F>(self, callback: F) -> SampleMissListenerBuilder<'a, Callback<Miss>>
1620    where
1621        F: Fn(Miss) + Send + Sync + 'static,
1622    {
1623        self.with(Callback::from(callback))
1624    }
1625
1626    /// Receive the sample miss notification with a mutable callback.
1627    #[inline]
1628    #[zenoh_macros::unstable]
1629    pub fn callback_mut<F>(self, callback: F) -> SampleMissListenerBuilder<'a, Callback<Miss>>
1630    where
1631        F: FnMut(Miss) + Send + Sync + 'static,
1632    {
1633        self.callback(zenoh::handlers::locked(callback))
1634    }
1635
1636    /// Receive the sample miss notification with a [`Handler`](IntoHandler).
1637    #[inline]
1638    #[zenoh_macros::unstable]
1639    pub fn with<Handler>(self, handler: Handler) -> SampleMissListenerBuilder<'a, Handler>
1640    where
1641        Handler: IntoHandler<Miss>,
1642    {
1643        SampleMissListenerBuilder {
1644            statesref: self.statesref,
1645            handler,
1646        }
1647    }
1648}
1649
1650#[zenoh_macros::unstable]
1651impl<'a> SampleMissListenerBuilder<'a, Callback<Miss>> {
1652    /// Make the sample miss notification run in the background until the advanced subscriber is undeclared.
1653    ///
1654    /// Background builder doesn't return a `SampleMissHandler` object anymore.
1655    #[zenoh_macros::unstable]
1656    pub fn background(self) -> SampleMissListenerBuilder<'a, Callback<Miss>, true> {
1657        SampleMissListenerBuilder {
1658            statesref: self.statesref,
1659            handler: self.handler,
1660        }
1661    }
1662}
1663
1664#[zenoh_macros::unstable]
1665impl<Handler> Resolvable for SampleMissListenerBuilder<'_, Handler>
1666where
1667    Handler: IntoHandler<Miss> + Send,
1668    Handler::Handler: Send,
1669{
1670    type To = ZResult<SampleMissListener<Handler::Handler>>;
1671}
1672
1673#[zenoh_macros::unstable]
1674impl<Handler> Wait for SampleMissListenerBuilder<'_, Handler>
1675where
1676    Handler: IntoHandler<Miss> + Send,
1677    Handler::Handler: Send,
1678{
1679    #[zenoh_macros::unstable]
1680    fn wait(self) -> <Self as Resolvable>::To {
1681        let (callback, handler) = self.handler.into_handler();
1682        let id = zlock!(self.statesref).register_miss_callback(callback);
1683        Ok(SampleMissListener {
1684            id,
1685            statesref: self.statesref.clone(),
1686            handler,
1687            undeclare_on_drop: true,
1688        })
1689    }
1690}
1691
1692#[zenoh_macros::unstable]
1693impl<Handler> IntoFuture for SampleMissListenerBuilder<'_, Handler>
1694where
1695    Handler: IntoHandler<Miss> + Send,
1696    Handler::Handler: Send,
1697{
1698    type Output = <Self as Resolvable>::To;
1699    type IntoFuture = Ready<<Self as Resolvable>::To>;
1700
1701    #[zenoh_macros::unstable]
1702    fn into_future(self) -> Self::IntoFuture {
1703        std::future::ready(self.wait())
1704    }
1705}
1706
1707#[zenoh_macros::unstable]
1708impl Resolvable for SampleMissListenerBuilder<'_, Callback<Miss>, true> {
1709    type To = ZResult<()>;
1710}
1711
1712#[zenoh_macros::unstable]
1713impl Wait for SampleMissListenerBuilder<'_, Callback<Miss>, true> {
1714    #[zenoh_macros::unstable]
1715    fn wait(self) -> <Self as Resolvable>::To {
1716        let (callback, _) = self.handler.into_handler();
1717        zlock!(self.statesref).register_miss_callback(callback);
1718        Ok(())
1719    }
1720}
1721
1722#[zenoh_macros::unstable]
1723impl IntoFuture for SampleMissListenerBuilder<'_, Callback<Miss>, true> {
1724    type Output = <Self as Resolvable>::To;
1725    type IntoFuture = Ready<<Self as Resolvable>::To>;
1726
1727    #[zenoh_macros::unstable]
1728    fn into_future(self) -> Self::IntoFuture {
1729        std::future::ready(self.wait())
1730    }
1731}