Skip to main content

zenoh_ext/
advanced_subscriber.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::{collections::BTreeMap, future::IntoFuture, str::FromStr};
15
16use zenoh::{
17    config::ZenohId,
18    handlers::{Callback, CallbackDrop, CallbackParameter, IntoHandler},
19    key_expr::KeyExpr,
20    liveliness::{LivelinessSubscriberBuilder, LivelinessToken},
21    pubsub::{SubscriberBuilder, SubscriberUndeclaration},
22    query::{
23        ConsolidationMode, Parameters, Selector, TimeBound, TimeExpr, TimeRange, ZenohParameters,
24    },
25    sample::{Locality, Sample, SampleKind},
26    session::{EntityGlobalId, EntityId, WeakSession},
27    Resolvable, Session, Wait, KE_ADV_PREFIX, KE_EMPTY, KE_PUB, KE_STAR, KE_STARSTAR, KE_SUB,
28};
29use zenoh_util::{Timed, TimedEvent, Timer};
30#[zenoh_macros::unstable]
31use {
32    async_trait::async_trait,
33    std::collections::hash_map::Entry,
34    std::collections::HashMap,
35    std::convert::TryFrom,
36    std::future::Ready,
37    std::sync::{Arc, Mutex},
38    std::time::Duration,
39    uhlc::ID,
40    zenoh::handlers::{locked, DefaultHandler},
41    zenoh::internal::{runtime::ZRuntime, zlock},
42    zenoh::pubsub::Subscriber,
43    zenoh::query::{QueryTarget, Reply, ReplyKeyExpr},
44    zenoh::time::Timestamp,
45    zenoh::Result as ZResult,
46};
47
48use crate::{
49    advanced_cache::{ke_liveliness, KE_UHLC},
50    utils::WrappingSn,
51    z_deserialize,
52};
53
54#[derive(Debug, Default, Clone)]
55/// Configure query for historical data for [`history`](crate::AdvancedSubscriberBuilder::history) method.
56#[zenoh_macros::unstable]
57pub struct HistoryConfig {
58    liveliness: bool,
59    sample_depth: Option<usize>,
60    age: Option<f64>,
61}
62
63#[zenoh_macros::unstable]
64impl HistoryConfig {
65    /// Enable detection of late joiner publishers and query for their historical data.
66    ///
67    /// Late joiner detection can only be achieved for [`AdvancedPublishers`](crate::AdvancedPublisher) that enable publisher_detection.
68    /// History can only be retransmitted by [`AdvancedPublishers`](crate::AdvancedPublisher) that enable [`cache`](crate::AdvancedPublisherBuilder::cache).
69    #[inline]
70    #[zenoh_macros::unstable]
71    pub fn detect_late_publishers(mut self) -> Self {
72        self.liveliness = true;
73        self
74    }
75
76    /// Specify how many samples to query for each resource.
77    #[zenoh_macros::unstable]
78    pub fn max_samples(mut self, depth: usize) -> Self {
79        self.sample_depth = Some(depth);
80        self
81    }
82
83    /// Specify the maximum age of samples to query.
84    #[zenoh_macros::unstable]
85    pub fn max_age(mut self, seconds: f64) -> Self {
86        self.age = Some(seconds);
87        self
88    }
89}
90
91#[derive(Debug, Default, Clone, Copy)]
92/// Configure retransmission.
93#[zenoh_macros::unstable]
94pub struct RecoveryConfig<const CONFIGURED: bool = true> {
95    periodic_queries: Option<Duration>,
96    heartbeat: bool,
97}
98
99#[zenoh_macros::unstable]
100impl RecoveryConfig<false> {
101    /// Enable periodic queries for not yet received Samples and specify their period.
102    ///
103    /// This allows retrieving the last Sample(s) if the last Sample(s) is/are lost.
104    /// So it is useful for sporadic publications but useless for periodic publications
105    /// with a period smaller or equal to this period.
106    /// Retransmission can only be achieved by [`AdvancedPublishers`](crate::AdvancedPublisher)
107    /// that enable [`cache`](crate::AdvancedPublisherBuilder::cache) and
108    /// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection).
109    #[zenoh_macros::unstable]
110    #[inline]
111    pub fn periodic_queries(self, period: Duration) -> RecoveryConfig<true> {
112        RecoveryConfig {
113            periodic_queries: Some(period),
114            heartbeat: false,
115        }
116    }
117
118    /// Subscribe to heartbeats of [`AdvancedPublishers`](crate::AdvancedPublisher).
119    ///
120    /// This allows receiving the last published Sample's sequence number and check for misses.
121    /// Heartbeat subscriber must be paired with [`AdvancedPublishers`](crate::AdvancedPublisher)
122    /// that enable [`cache`](crate::AdvancedPublisherBuilder::cache) and
123    /// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection) with
124    /// [`heartbeat`](crate::advanced_publisher::MissDetectionConfig::heartbeat) or
125    /// [`sporadic_heartbeat`](crate::advanced_publisher::MissDetectionConfig::sporadic_heartbeat).
126    #[zenoh_macros::unstable]
127    #[inline]
128    pub fn heartbeat(self) -> RecoveryConfig<true> {
129        RecoveryConfig {
130            periodic_queries: None,
131            heartbeat: true,
132        }
133    }
134}
135
136/// The builder of an [`AdvancedSubscriber`], allowing to configure it.
137#[zenoh_macros::unstable]
138pub struct AdvancedSubscriberBuilder<'a, 'b, 'c, Handler, const BACKGROUND: bool = false> {
139    pub(crate) session: &'a Session,
140    pub(crate) key_expr: ZResult<KeyExpr<'b>>,
141    pub(crate) origin: Locality,
142    pub(crate) retransmission: Option<RecoveryConfig>,
143    pub(crate) query_target: QueryTarget,
144    pub(crate) query_timeout: Duration,
145    pub(crate) history: Option<HistoryConfig>,
146    pub(crate) liveliness: bool,
147    pub(crate) meta_key_expr: Option<ZResult<KeyExpr<'c>>>,
148    pub(crate) handler: Handler,
149}
150
151#[zenoh_macros::unstable]
152impl<'a, 'b, Handler> AdvancedSubscriberBuilder<'a, 'b, '_, Handler> {
153    #[zenoh_macros::unstable]
154    pub(crate) fn new(builder: SubscriberBuilder<'a, 'b, Handler>) -> Self {
155        AdvancedSubscriberBuilder {
156            session: builder.session,
157            key_expr: builder.key_expr,
158            origin: builder.origin,
159            handler: builder.handler,
160            retransmission: None,
161            query_target: QueryTarget::All,
162            query_timeout: Duration::from_secs(10),
163            history: None,
164            liveliness: false,
165            meta_key_expr: None,
166        }
167    }
168}
169
170#[zenoh_macros::unstable]
171impl<'a, 'b, 'c> AdvancedSubscriberBuilder<'a, 'b, 'c, DefaultHandler> {
172    /// Add callback to AdvancedSubscriber.
173    #[inline]
174    #[zenoh_macros::unstable]
175    pub fn callback<F>(self, callback: F) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback<Sample>>
176    where
177        F: Fn(Sample) + Send + Sync + 'static,
178    {
179        self.with(Callback::from(callback))
180    }
181
182    /// Add callback to `AdvancedSubscriber`.
183    ///
184    /// Using this guarantees that your callback will never be called concurrently.
185    /// If your callback is also accepted by the [`callback`](AdvancedSubscriberBuilder::callback) method, we suggest you use it instead of `callback_mut`
186    #[inline]
187    #[zenoh_macros::unstable]
188    pub fn callback_mut<F>(
189        self,
190        callback: F,
191    ) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback<Sample>>
192    where
193        F: FnMut(Sample) + Send + Sync + 'static,
194    {
195        self.callback(locked(callback))
196    }
197
198    /// Make the built AdvancedSubscriber an [`AdvancedSubscriber`](AdvancedSubscriber).
199    #[inline]
200    #[zenoh_macros::unstable]
201    pub fn with<Handler>(self, handler: Handler) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler>
202    where
203        Handler: IntoHandler<Sample>,
204    {
205        AdvancedSubscriberBuilder {
206            session: self.session,
207            key_expr: self.key_expr,
208            origin: self.origin,
209            retransmission: self.retransmission,
210            query_target: self.query_target,
211            query_timeout: self.query_timeout,
212            history: self.history,
213            liveliness: self.liveliness,
214            meta_key_expr: self.meta_key_expr,
215            handler,
216        }
217    }
218}
219
220#[zenoh_macros::unstable]
221impl<'a, 'b, 'c> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback<Sample>> {
222    /// Make the subscriber run in background until the session is closed.
223    ///
224    /// Background builder doesn't return a `AdvancedSubscriber` object anymore.
225    pub fn background(self) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback<Sample>, true> {
226        AdvancedSubscriberBuilder {
227            session: self.session,
228            key_expr: self.key_expr,
229            origin: self.origin,
230            retransmission: self.retransmission,
231            query_target: self.query_target,
232            query_timeout: self.query_timeout,
233            history: self.history,
234            liveliness: self.liveliness,
235            meta_key_expr: self.meta_key_expr,
236            handler: self.handler,
237        }
238    }
239}
240
241#[zenoh_macros::unstable]
242impl<'a, 'c, Handler, const BACKGROUND: bool>
243    AdvancedSubscriberBuilder<'a, '_, 'c, Handler, BACKGROUND>
244{
245    /// Restrict the matching publications that will be received by this [`Subscriber`] to the ones that have the given [`Locality`](crate::prelude::Locality).
246    #[zenoh_macros::unstable]
247    #[inline]
248    pub fn allowed_origin(mut self, origin: Locality) -> Self {
249        self.origin = origin;
250        self
251    }
252
253    /// Ask for retransmission of detected lost Samples.
254    ///
255    /// Retransmission can only be achieved by [`AdvancedPublishers`](crate::AdvancedPublisher)
256    /// that enable [`cache`](crate::AdvancedPublisherBuilder::cache) and
257    /// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection).
258    #[zenoh_macros::unstable]
259    #[inline]
260    pub fn recovery(mut self, conf: RecoveryConfig) -> Self {
261        self.retransmission = Some(conf);
262        self
263    }
264
265    // /// Change the target to be used for queries.
266
267    // #[inline]
268    // pub fn query_target(mut self, query_target: QueryTarget) -> Self {
269    //     self.query_target = query_target;
270    //     self
271    // }
272
273    /// Change the timeout to be used for queries (history, retransmission).
274    #[zenoh_macros::unstable]
275    #[inline]
276    pub fn query_timeout(mut self, query_timeout: Duration) -> Self {
277        self.query_timeout = query_timeout;
278        self
279    }
280
281    /// Enable query for historical data.
282    ///
283    /// History can only be retransmitted by [`AdvancedPublishers`](crate::AdvancedPublisher) that enable [`cache`](crate::AdvancedPublisherBuilder::cache).
284    #[zenoh_macros::unstable]
285    #[inline]
286    pub fn history(mut self, config: HistoryConfig) -> Self {
287        self.history = Some(config);
288        self
289    }
290
291    /// Allow this subscriber to be detected through liveliness.
292    #[zenoh_macros::unstable]
293    pub fn subscriber_detection(mut self) -> Self {
294        self.liveliness = true;
295        self
296    }
297
298    /// A key expression added to the liveliness token key expression.
299    ///
300    /// It can be used to convey metadata.
301    #[zenoh_macros::unstable]
302    pub fn subscriber_detection_metadata<TryIntoKeyExpr>(mut self, meta: TryIntoKeyExpr) -> Self
303    where
304        TryIntoKeyExpr: TryInto<KeyExpr<'c>>,
305        <TryIntoKeyExpr as TryInto<KeyExpr<'c>>>::Error: Into<zenoh::Error>,
306    {
307        self.meta_key_expr = Some(meta.try_into().map_err(Into::into));
308        self
309    }
310
311    #[zenoh_macros::unstable]
312    fn with_static_keys(self) -> AdvancedSubscriberBuilder<'a, 'static, 'static, Handler> {
313        AdvancedSubscriberBuilder {
314            session: self.session,
315            key_expr: self.key_expr.map(|s| s.into_owned()),
316            origin: self.origin,
317            retransmission: self.retransmission,
318            query_target: self.query_target,
319            query_timeout: self.query_timeout,
320            history: self.history,
321            liveliness: self.liveliness,
322            meta_key_expr: self.meta_key_expr.map(|s| s.map(|s| s.into_owned())),
323            handler: self.handler,
324        }
325    }
326}
327
328#[zenoh_macros::unstable]
329impl<Handler> Resolvable for AdvancedSubscriberBuilder<'_, '_, '_, Handler>
330where
331    Handler: IntoHandler<Sample>,
332    Handler::Handler: Send,
333{
334    type To = ZResult<AdvancedSubscriber<Handler::Handler>>;
335}
336
337#[zenoh_macros::unstable]
338impl<Handler> Wait for AdvancedSubscriberBuilder<'_, '_, '_, Handler>
339where
340    Handler: IntoHandler<Sample> + Send,
341    Handler::Handler: Send,
342{
343    #[zenoh_macros::unstable]
344    fn wait(self) -> <Self as Resolvable>::To {
345        AdvancedSubscriber::new(self.with_static_keys())
346    }
347}
348
349#[zenoh_macros::unstable]
350impl<Handler> IntoFuture for AdvancedSubscriberBuilder<'_, '_, '_, Handler>
351where
352    Handler: IntoHandler<Sample> + Send,
353    Handler::Handler: Send,
354{
355    type Output = <Self as Resolvable>::To;
356    type IntoFuture = Ready<<Self as Resolvable>::To>;
357
358    #[zenoh_macros::unstable]
359    fn into_future(self) -> Self::IntoFuture {
360        std::future::ready(self.wait())
361    }
362}
363
364#[zenoh_macros::unstable]
365impl Resolvable for AdvancedSubscriberBuilder<'_, '_, '_, Callback<Sample>, true> {
366    type To = ZResult<()>;
367}
368
369#[zenoh_macros::unstable]
370impl Wait for AdvancedSubscriberBuilder<'_, '_, '_, Callback<Sample>, true> {
371    #[zenoh_macros::unstable]
372    fn wait(self) -> <Self as Resolvable>::To {
373        let mut sub = AdvancedSubscriber::new(self.with_static_keys())?;
374        sub.set_background_impl(true);
375        Ok(())
376    }
377}
378
379#[zenoh_macros::unstable]
380impl IntoFuture for AdvancedSubscriberBuilder<'_, '_, '_, Callback<Sample>, true> {
381    type Output = <Self as Resolvable>::To;
382    type IntoFuture = Ready<<Self as Resolvable>::To>;
383
384    #[zenoh_macros::unstable]
385    fn into_future(self) -> Self::IntoFuture {
386        std::future::ready(self.wait())
387    }
388}
389
390#[zenoh_macros::unstable]
391struct Period {
392    timer: Timer,
393    period: Duration,
394}
395
396#[zenoh_macros::unstable]
397struct State {
398    next_id: usize,
399    global_pending_queries: u64,
400    sequenced_states: HashMap<EntityGlobalId, SourceState<WrappingSn>>,
401    timestamped_states: HashMap<ID, SourceState<Timestamp>>,
402    session: WeakSession,
403    key_expr: KeyExpr<'static>,
404    retransmission: bool,
405    period: Option<Period>,
406    history_depth: usize,
407    query_target: QueryTarget,
408    query_timeout: Duration,
409    // Callback must be dropped when the underlying subscriber is undeclared
410    // (for example when session is closed), in order to "close" the advanced
411    // subscriber receiver, hence the `Option`.
412    callback: Option<Callback<Sample>>,
413    miss_handlers: HashMap<usize, Callback<Miss>>,
414    token: Option<LivelinessToken>,
415}
416
417#[zenoh_macros::unstable]
418impl State {
419    #[zenoh_macros::unstable]
420    fn register_miss_callback(&mut self, callback: Callback<Miss>) -> usize {
421        let id = self.next_id;
422        self.next_id += 1;
423        self.miss_handlers.insert(id, callback);
424        id
425    }
426    #[zenoh_macros::unstable]
427    fn unregister_miss_callback(&mut self, id: &usize) {
428        self.miss_handlers.remove(id);
429    }
430}
431
432macro_rules! spawn_periodic_queries {
433    ($p:expr,$s:expr,$r:expr) => {{
434        if let Some(period) = &$p.period {
435            period.timer.add(TimedEvent::periodic(
436                period.period,
437                PeriodicQuery {
438                    source_id: $s,
439                    statesref: $r,
440                },
441            ))
442        }
443    }};
444}
445
446#[zenoh_macros::unstable]
447struct SourceState<T> {
448    last_delivered: Option<T>,
449    pending_queries: u64,
450    pending_samples: BTreeMap<T, Sample>,
451}
452
453/*
454use zenoh_ext::{AdvancedSubscriberBuilderExt, HistoryConfig, RecoveryConfig};
455
456let session = zenoh::open(zenoh::Config::default()).await.unwrap();
457let subscriber = session
458    .declare_subscriber("key/expression")
459    .history(HistoryConfig::default().detect_late_publishers())
460    .recovery(RecoveryConfig::default())
461    .await
462    .unwrap();
463
464let miss_listener = subscriber.sample_miss_listener().await.unwrap();
465loop {
466    tokio::select! {
467        sample = subscriber.recv_async() => {
468            if let Ok(sample) = sample {
469                // ...
470            }
471        },
472        miss = miss_listener.recv_async() => {
473            if let Ok(miss) = miss {
474                // ...
475            }
476        },
477    }
478}
479*/
480
481/// The extension to [`Subscriber`](zenoh::pubsub::Subscriber) that provides advanced functionalities
482///
483/// The `AdvancedSubscriber` is constructed over a regular [`Subscriber`](zenoh::pubsub::Subscriber)
484/// through [`advanced`](crate::AdvancedSubscriberBuilderExt::advanced) method or by using
485/// any other method of [`AdvancedSubscriberBuilder`](crate::AdvancedSubscriberBuilder).
486///
487/// The `AdvancedSubscriber` works with [`AdvancedPublisher`](crate::AdvancedPublisher) to provide additional functionalities such as:
488/// * missing samples detection using periodic queries or heartbeat subscription configurable with [`recovery`](crate::AdvancedSubscriberBuilder::recovery) method
489/// * recovering missing samples, configured with [`history`](crate::AdvancedSubscriberBuilder::history) method
490///   (max age and sample count, late joiner detection and requesting)
491/// * liveliness-based subscriber detection with [`subscriber_detection`](crate::AdvancedSubscriberBuilder::subscriber_detection) method
492///
493/// # Examples
494/// ```no_run
495/// # #[tokio::main]
496/// # async fn main() {
497/// use zenoh_ext::{AdvancedSubscriberBuilderExt, HistoryConfig, RecoveryConfig};
498/// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
499/// let subscriber = session
500///     .declare_subscriber("key/expression")
501///     .history(HistoryConfig::default().detect_late_publishers())
502///     .recovery(RecoveryConfig::default().heartbeat())
503///     .subscriber_detection()
504///     .await
505///     .unwrap();
506/// let miss_listener = subscriber.sample_miss_listener().await.unwrap();
507/// loop {
508///     tokio::select! {
509///         sample = subscriber.recv_async() => {
510///             if let Ok(sample) = sample {
511///                 // ...
512///             }
513///         },
514///         miss = miss_listener.recv_async() => {
515///             if let Ok(miss) = miss {
516///                 // ...
517///             }
518///         },
519///     }
520/// }
521/// # }
522/// ```
523#[zenoh_macros::unstable]
524pub struct AdvancedSubscriber<Receiver> {
525    statesref: Arc<Mutex<State>>,
526    subscriber: Subscriber<()>,
527    receiver: Receiver,
528    liveliness_subscriber: Option<Subscriber<()>>,
529    heartbeat_subscriber: Option<Subscriber<()>>,
530}
531
532#[zenoh_macros::unstable]
533impl<Receiver> std::ops::Deref for AdvancedSubscriber<Receiver> {
534    type Target = Receiver;
535    fn deref(&self) -> &Self::Target {
536        &self.receiver
537    }
538}
539
540#[zenoh_macros::unstable]
541impl<Receiver> std::ops::DerefMut for AdvancedSubscriber<Receiver> {
542    fn deref_mut(&mut self) -> &mut Self::Target {
543        &mut self.receiver
544    }
545}
546
547#[zenoh_macros::unstable]
548fn handle_sample(states: &mut State, sample: Sample) -> bool {
549    let Some(callback) = states.callback.as_ref() else {
550        return false;
551    };
552    if let Some(source_info) = sample.source_info().cloned() {
553        #[inline]
554        fn deliver_and_flush(
555            sample: Sample,
556            source_sn: impl Into<WrappingSn>,
557            callback: &Callback<Sample>,
558            state: &mut SourceState<WrappingSn>,
559        ) {
560            let mut source_sn = source_sn.into();
561            callback.call(sample);
562            state.last_delivered = Some(source_sn);
563            while let Some(sample) = state.pending_samples.remove(&(source_sn + 1)) {
564                callback.call(sample);
565                source_sn += 1;
566                state.last_delivered = Some(source_sn);
567            }
568        }
569
570        let entry = states.sequenced_states.entry(*source_info.source_id());
571        let new = matches!(&entry, Entry::Vacant(_));
572        let state = entry.or_insert(SourceState::<WrappingSn> {
573            last_delivered: None,
574            pending_queries: 0,
575            pending_samples: BTreeMap::new(),
576        });
577        if state.last_delivered.is_none() && states.global_pending_queries != 0 {
578            // Avoid going through the Map if history_depth == 1
579            if states.history_depth == 1 {
580                state.last_delivered = Some(source_info.source_sn().into());
581                callback.call(sample);
582            } else {
583                state
584                    .pending_samples
585                    .insert(source_info.source_sn().into(), sample);
586                if state.pending_samples.len() >= states.history_depth {
587                    if let Some((sn, sample)) = state.pending_samples.pop_first() {
588                        deliver_and_flush(sample, sn, callback, state);
589                    }
590                }
591            }
592        } else if state.last_delivered.is_some()
593            && source_info.source_sn() != state.last_delivered.unwrap() + 1
594        {
595            if source_info.source_sn() > state.last_delivered.unwrap() {
596                if states.retransmission {
597                    state
598                        .pending_samples
599                        .insert(source_info.source_sn().into(), sample);
600                } else {
601                    tracing::info!(
602                        "Sample missed: missed {} samples from {:?}.",
603                        source_info.source_sn() - state.last_delivered.unwrap() - 1,
604                        source_info.source_id(),
605                    );
606                    for miss_callback in states.miss_handlers.values() {
607                        miss_callback.call(Miss {
608                            source: *source_info.source_id(),
609                            nb: source_info.source_sn() - state.last_delivered.unwrap() - 1,
610                        });
611                    }
612                    callback.call(sample);
613                    state.last_delivered = Some(source_info.source_sn().into());
614                }
615            }
616        } else {
617            deliver_and_flush(sample, source_info.source_sn(), callback, state);
618        }
619        new
620    } else if let Some(timestamp) = sample.timestamp() {
621        let entry = states.timestamped_states.entry(*timestamp.get_id());
622        let state = entry.or_insert(SourceState::<Timestamp> {
623            last_delivered: None,
624            pending_queries: 0,
625            pending_samples: BTreeMap::new(),
626        });
627        if state.last_delivered.map(|t| t < *timestamp).unwrap_or(true) {
628            if (states.global_pending_queries == 0 && state.pending_queries == 0)
629                || states.history_depth == 1
630            {
631                state.last_delivered = Some(*timestamp);
632                callback.call(sample);
633            } else {
634                state.pending_samples.entry(*timestamp).or_insert(sample);
635                if state.pending_samples.len() >= states.history_depth {
636                    flush_timestamped_source(state, Some(callback));
637                }
638            }
639        }
640        false
641    } else {
642        callback.call(sample);
643        false
644    }
645}
646
647#[zenoh_macros::unstable]
648fn seq_num_range(start: Option<WrappingSn>, end: Option<WrappingSn>) -> String {
649    match (start, end) {
650        (Some(start), Some(end)) => format!("_sn={start}..{end}"),
651        (Some(start), None) => format!("_sn={start}.."),
652        (None, Some(end)) => format!("_sn=..{end}"),
653        (None, None) => "_sn=..".to_string(),
654    }
655}
656
657#[zenoh_macros::unstable]
658#[derive(Clone)]
659struct PeriodicQuery {
660    source_id: EntityGlobalId,
661    statesref: Arc<Mutex<State>>,
662}
663
664#[zenoh_macros::unstable]
665#[async_trait]
666impl Timed for PeriodicQuery {
667    async fn run(&mut self) {
668        let mut lock = zlock!(self.statesref);
669        let states = &mut *lock;
670        if let Some(state) = states.sequenced_states.get_mut(&self.source_id) {
671            state.pending_queries += 1;
672            let query_expr = &states.key_expr
673                / KE_ADV_PREFIX
674                / KE_STAR
675                / &self.source_id.zid().into_keyexpr()
676                / &KeyExpr::try_from(self.source_id.eid().to_string()).unwrap()
677                / KE_STARSTAR;
678            let seq_num_range = seq_num_range(state.last_delivered.map(|s| s + 1), None);
679
680            let session = states.session.clone();
681            let key_expr = states.key_expr.clone().into_owned();
682            let query_target = states.query_target;
683            let query_timeout = states.query_timeout;
684
685            tracing::trace!(
686                "AdvancedSubscriber{{key_expr: {}}}: Querying undelivered samples {}?{}",
687                states.key_expr,
688                query_expr,
689                seq_num_range
690            );
691            drop(lock);
692
693            let handler = SequencedRepliesHandler {
694                source_id: self.source_id,
695                statesref: self.statesref.clone(),
696            };
697            let _ = session
698                .get(Selector::from((query_expr, seq_num_range)))
699                .callback({
700                    move |r: Reply| {
701                        if let Ok(s) = r.into_result() {
702                            if key_expr.intersects(s.key_expr()) {
703                                let states = &mut *zlock!(handler.statesref);
704                                tracing::trace!(
705                                    "AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}",
706                                    states.key_expr,
707                                    s.source_info(),
708                                    s.timestamp()
709                                );
710                                handle_sample(states, s);
711                            }
712                        }
713                    }
714                })
715                .consolidation(ConsolidationMode::None)
716                .accept_replies(ReplyKeyExpr::Any)
717                .target(query_target)
718                .timeout(query_timeout)
719                .wait();
720        }
721    }
722}
723
724#[zenoh_macros::unstable]
725impl<Handler> AdvancedSubscriber<Handler> {
726    fn new<H>(conf: AdvancedSubscriberBuilder<'_, '_, '_, H>) -> ZResult<Self>
727    where
728        H: IntoHandler<Sample, Handler = Handler> + Send,
729    {
730        let (callback, receiver) = conf.handler.into_handler();
731        let key_expr = conf.key_expr?;
732        let meta = match conf.meta_key_expr {
733            Some(meta) => Some(meta?),
734            None => None,
735        };
736        let retransmission = conf.retransmission;
737        let query_target = conf.query_target;
738        let query_timeout = conf.query_timeout;
739        let statesref = Arc::new(Mutex::new(State {
740            next_id: 0,
741            sequenced_states: HashMap::new(),
742            timestamped_states: HashMap::new(),
743            global_pending_queries: if conf.history.is_some() { 1 } else { 0 },
744            session: conf.session.downgrade(),
745            period: retransmission.as_ref().and_then(|r| {
746                let _rt = ZRuntime::Application.enter();
747                r.periodic_queries.map(|p| Period {
748                    timer: Timer::new(false),
749                    period: p,
750                })
751            }),
752            key_expr: key_expr.clone().into_owned(),
753            retransmission: retransmission.is_some(),
754            history_depth: conf
755                .history
756                .as_ref()
757                .and_then(|h| h.sample_depth)
758                .unwrap_or_default(),
759            query_target: conf.query_target,
760            query_timeout: conf.query_timeout,
761            callback: Some(callback),
762            miss_handlers: HashMap::new(),
763            token: None,
764        }));
765
766        let sub_callback = {
767            let statesref = statesref.clone();
768            let session = conf.session.downgrade();
769            let key_expr = key_expr.clone().into_owned();
770
771            move |s: Sample| {
772                let mut lock = zlock!(statesref);
773                let states = &mut *lock;
774                let source_id = s.source_info().map(|si| *si.source_id());
775                let new = handle_sample(states, s);
776
777                if let Some(source_id) = source_id {
778                    if new {
779                        spawn_periodic_queries!(states, source_id, statesref.clone());
780                    }
781                    if let Some(state) = states.sequenced_states.get_mut(&source_id) {
782                        if retransmission.is_some()
783                            && state.pending_queries == 0
784                            && !state.pending_samples.is_empty()
785                        {
786                            state.pending_queries += 1;
787                            let query_expr = &key_expr
788                                / KE_ADV_PREFIX
789                                / KE_STAR
790                                / &source_id.zid().into_keyexpr()
791                                / &KeyExpr::try_from(source_id.eid().to_string()).unwrap()
792                                / KE_STARSTAR;
793                            let seq_num_range =
794                                seq_num_range(state.last_delivered.map(|s| s + 1), None);
795                            tracing::trace!(
796                                "AdvancedSubscriber{{key_expr: {}}}: Querying missing samples {}?{}",
797                                states.key_expr,
798                                query_expr,
799                                seq_num_range
800                            );
801                            drop(lock);
802                            let handler = SequencedRepliesHandler {
803                                source_id,
804                                statesref: statesref.clone(),
805                            };
806                            let _ = session
807                                .get(Selector::from((query_expr, seq_num_range)))
808                                .callback({
809                                    let key_expr = key_expr.clone().into_owned();
810                                    move |r: Reply| {
811                                        if let Ok(s) = r.into_result() {
812                                            if key_expr.intersects(s.key_expr()) {
813                                                let states = &mut *zlock!(handler.statesref);
814                                                tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
815                                                handle_sample(states, s);
816                                            }
817                                        }
818                                    }
819                                })
820                                .consolidation(ConsolidationMode::None)
821                                .accept_replies(ReplyKeyExpr::Any)
822                                .target(query_target)
823                                .timeout(query_timeout)
824                                .wait();
825                        }
826                    }
827                }
828            }
829        };
830
831        // When the underlying subscriber is undeclared (for example when the session is closed)
832        // the advanced subscriber callback must be dropped to "close" the receiver.
833        let drop_callback = {
834            let statesref = statesref.clone();
835            move || {
836                let mut states = statesref.lock().unwrap();
837                states.callback.take();
838                states.miss_handlers.clear();
839            }
840        };
841
842        let subscriber = conf
843            .session
844            .declare_subscriber(&key_expr)
845            .with(CallbackDrop {
846                callback: sub_callback,
847                drop: drop_callback,
848            })
849            .allowed_origin(conf.origin)
850            .wait()?;
851
852        tracing::debug!("Create AdvancedSubscriber{{key_expr: {}}}", key_expr,);
853
854        if let Some(historyconf) = conf.history.as_ref() {
855            let handler = InitialRepliesHandler {
856                statesref: statesref.clone(),
857            };
858            let mut params = Parameters::empty();
859            if let Some(max) = historyconf.sample_depth {
860                params.insert("_max", max.to_string());
861            }
862            if let Some(age) = historyconf.age {
863                params.set_time_range(TimeRange {
864                    start: TimeBound::Inclusive(TimeExpr::Now { offset_secs: -age }),
865                    end: TimeBound::Unbounded,
866                });
867            }
868            tracing::trace!(
869                "AdvancedSubscriber{{key_expr: {}}} Querying historical samples {}?{}",
870                key_expr,
871                &key_expr / KE_ADV_PREFIX / KE_STARSTAR,
872                params
873            );
874            let _ = conf
875                .session
876                .get(Selector::from((
877                    &key_expr / KE_ADV_PREFIX / KE_STARSTAR,
878                    params,
879                )))
880                .callback({
881                    let key_expr = key_expr.clone().into_owned();
882                    move |r: Reply| {
883                        if let Ok(s) = r.into_result() {
884                            if key_expr.intersects(s.key_expr()) {
885                                let states = &mut *zlock!(handler.statesref);
886                                tracing::trace!(
887                                    "AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}",
888                                    states.key_expr,
889                                    s.source_info(),
890                                    s.timestamp()
891                                );
892                                handle_sample(states, s);
893                            }
894                        }
895                    }
896                })
897                .consolidation(ConsolidationMode::None)
898                .accept_replies(ReplyKeyExpr::Any)
899                .target(query_target)
900                .timeout(query_timeout)
901                .wait();
902        }
903
904        let liveliness_subscriber = if let Some(historyconf) = conf.history.as_ref() {
905            if historyconf.liveliness {
906                let live_callback = {
907                    let session = conf.session.downgrade();
908                    let statesref = statesref.clone();
909                    let key_expr = key_expr.clone().into_owned();
910                    let historyconf = historyconf.clone();
911                    move |s: Sample| {
912                        if s.kind() == SampleKind::Put {
913                            if let Ok(parsed) = ke_liveliness::parse(s.key_expr().as_keyexpr()) {
914                                if let Ok(zid) = ZenohId::from_str(parsed.zid().as_str()) {
915                                    // TODO : If we already have a state associated to this discovered source
916                                    // we should query with the appropriate range to avoid unnecessary retransmissions
917                                    if parsed.eid() == KE_UHLC {
918                                        let mut lock = zlock!(statesref);
919                                        let states = &mut *lock;
920                                        tracing::trace!(
921                                            "AdvancedSubscriber{{key_expr: {}}}: Detect late joiner publishers with zid={}",
922                                            states.key_expr,
923                                            parsed.zid().as_str()
924                                        );
925                                        let entry = states.timestamped_states.entry(ID::from(zid));
926                                        let state = entry.or_insert(SourceState::<Timestamp> {
927                                            last_delivered: None,
928                                            pending_queries: 0,
929                                            pending_samples: BTreeMap::new(),
930                                        });
931                                        state.pending_queries += 1;
932
933                                        let mut params = Parameters::empty();
934                                        if let Some(max) = historyconf.sample_depth {
935                                            params.insert("_max", max.to_string());
936                                        }
937                                        if let Some(age) = historyconf.age {
938                                            params.set_time_range(TimeRange {
939                                                start: TimeBound::Inclusive(TimeExpr::Now {
940                                                    offset_secs: -age,
941                                                }),
942                                                end: TimeBound::Unbounded,
943                                            });
944                                        }
945                                        tracing::trace!(
946                                            "AdvancedSubscriber{{key_expr: {}}}: Querying historical samples {}?{}",
947                                            states.key_expr,
948                                            s.key_expr(),
949                                            params
950                                        );
951                                        drop(lock);
952
953                                        let handler = TimestampedRepliesHandler {
954                                            id: ID::from(zid),
955                                            statesref: statesref.clone(),
956                                        };
957                                        let _ = session
958                                            .get(Selector::from((s.key_expr(), params)))
959                                            .callback({
960                                                let key_expr = key_expr.clone().into_owned();
961                                                move |r: Reply| {
962                                                    if let Ok(s) = r.into_result() {
963                                                        if key_expr.intersects(s.key_expr()) {
964                                                            let states =
965                                                                &mut *zlock!(handler.statesref);
966                                                            tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
967                                                            handle_sample(states, s);
968                                                        }
969                                                    }
970                                                }
971                                            })
972                                            .consolidation(ConsolidationMode::None)
973                                            .accept_replies(ReplyKeyExpr::Any)
974                                            .target(query_target)
975                                            .timeout(query_timeout)
976                                            .wait();
977                                    } else if let Ok(eid) =
978                                        EntityId::from_str(parsed.eid().as_str())
979                                    {
980                                        let source_id = EntityGlobalId::new(zid, eid);
981                                        let mut lock = zlock!(statesref);
982                                        let states = &mut *lock;
983                                        tracing::trace!(
984                                            "AdvancedSubscriber{{key_expr: {}}}: Detect late joiner publishers with zid={}",
985                                            states.key_expr,
986                                            parsed.zid().as_str()
987                                        );
988                                        let entry = states.sequenced_states.entry(source_id);
989                                        let new = matches!(&entry, Entry::Vacant(_));
990                                        let state = entry.or_insert(SourceState::<WrappingSn> {
991                                            last_delivered: None,
992                                            pending_queries: 0,
993                                            pending_samples: BTreeMap::new(),
994                                        });
995                                        state.pending_queries += 1;
996
997                                        let mut params = Parameters::empty();
998                                        if let Some(max) = historyconf.sample_depth {
999                                            params.insert("_max", max.to_string());
1000                                        }
1001                                        if let Some(age) = historyconf.age {
1002                                            params.set_time_range(TimeRange {
1003                                                start: TimeBound::Inclusive(TimeExpr::Now {
1004                                                    offset_secs: -age,
1005                                                }),
1006                                                end: TimeBound::Unbounded,
1007                                            });
1008                                        }
1009                                        tracing::trace!(
1010                                            "AdvancedSubscriber{{key_expr: {}}}: Querying historical samples {}?{}",
1011                                            states.key_expr,
1012                                            s.key_expr(),
1013                                            params,
1014                                        );
1015                                        drop(lock);
1016
1017                                        let handler = SequencedRepliesHandler {
1018                                            source_id,
1019                                            statesref: statesref.clone(),
1020                                        };
1021                                        let _ = session
1022                                            .get(Selector::from((s.key_expr(), params)))
1023                                            .callback({
1024                                                let key_expr = key_expr.clone().into_owned();
1025                                                move |r: Reply| {
1026                                                    if let Ok(s) = r.into_result() {
1027                                                        if key_expr.intersects(s.key_expr()) {
1028                                                            let states =
1029                                                                &mut *zlock!(handler.statesref);
1030                                                            tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
1031                                                            handle_sample(states, s);
1032                                                        }
1033                                                    }
1034                                                }
1035                                            })
1036                                            .consolidation(ConsolidationMode::None)
1037                                            .accept_replies(ReplyKeyExpr::Any)
1038                                            .target(query_target)
1039                                            .timeout(query_timeout)
1040                                            .wait();
1041
1042                                        if new {
1043                                            spawn_periodic_queries!(
1044                                                zlock!(statesref),
1045                                                source_id,
1046                                                statesref.clone()
1047                                            );
1048                                        }
1049                                    }
1050                                } else {
1051                                    let mut lock = zlock!(statesref);
1052                                    let states = &mut *lock;
1053                                    tracing::trace!(
1054                                        "AdvancedSubscriber{{key_expr: {}}}: Detect late joiner publishers with zid={}",
1055                                        states.key_expr,
1056                                        parsed.zid().as_str()
1057                                    );
1058                                    states.global_pending_queries += 1;
1059
1060                                    let mut params = Parameters::empty();
1061                                    if let Some(max) = historyconf.sample_depth {
1062                                        params.insert("_max", max.to_string());
1063                                    }
1064                                    if let Some(age) = historyconf.age {
1065                                        params.set_time_range(TimeRange {
1066                                            start: TimeBound::Inclusive(TimeExpr::Now {
1067                                                offset_secs: -age,
1068                                            }),
1069                                            end: TimeBound::Unbounded,
1070                                        });
1071                                    }
1072                                    tracing::trace!(
1073                                        "AdvancedSubscriber{{key_expr: {}}}: Querying historical samples {}?{}",
1074                                        states.key_expr,
1075                                        s.key_expr(),
1076                                        params,
1077                                    );
1078                                    drop(lock);
1079
1080                                    let handler = InitialRepliesHandler {
1081                                        statesref: statesref.clone(),
1082                                    };
1083                                    let _ = session
1084                                        .get(Selector::from((s.key_expr(), params)))
1085                                        .callback({
1086                                            let key_expr = key_expr.clone().into_owned();
1087                                            move |r: Reply| {
1088                                                if let Ok(s) = r.into_result() {
1089                                                    if key_expr.intersects(s.key_expr()) {
1090                                                        let states =
1091                                                            &mut *zlock!(handler.statesref);
1092                                                        tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
1093                                                        handle_sample(states, s);
1094                                                    }
1095                                                }
1096                                            }
1097                                        })
1098                                        .consolidation(ConsolidationMode::None)
1099                                        .accept_replies(ReplyKeyExpr::Any)
1100                                        .target(query_target)
1101                                        .timeout(query_timeout)
1102                                        .wait();
1103                                }
1104                            } else {
1105                                tracing::warn!(
1106                                    "AdvancedSubscriber{{}}: Received malformed liveliness token key expression: {}",
1107                                    s.key_expr()
1108                                );
1109                            }
1110                        }
1111                    }
1112                };
1113
1114                tracing::debug!(
1115                    "AdvancedSubscriber{{key_expr: {}}}: Detect late joiner publishers on {}",
1116                    key_expr,
1117                    &key_expr / KE_ADV_PREFIX / KE_PUB / KE_STARSTAR
1118                );
1119                Some(
1120                    conf.session
1121                        .liveliness()
1122                        .declare_subscriber(&key_expr / KE_ADV_PREFIX / KE_PUB / KE_STARSTAR)
1123                        // .declare_subscriber(keformat!(ke_liveliness_all::formatter(), zid = 0, eid = 0, remaining = key_expr).unwrap())
1124                        .history(true)
1125                        .callback(live_callback)
1126                        .wait()?,
1127                )
1128            } else {
1129                None
1130            }
1131        } else {
1132            None
1133        };
1134
1135        let heartbeat_subscriber = if retransmission.is_some_and(|r| r.heartbeat) {
1136            let ke_heartbeat_sub = &key_expr / KE_ADV_PREFIX / KE_PUB / KE_STARSTAR;
1137            let statesref = statesref.clone();
1138            tracing::debug!(
1139                "AdvancedSubscriber{{key_expr: {}}}: Enable heartbeat subscriber on {}",
1140                key_expr,
1141                ke_heartbeat_sub
1142            );
1143            let heartbeat_sub = conf
1144                .session
1145                .declare_subscriber(ke_heartbeat_sub)
1146                .callback(move |sample_hb| {
1147                    if sample_hb.kind() != SampleKind::Put {
1148                        return;
1149                    }
1150
1151                    let heartbeat_keyexpr = sample_hb.key_expr().as_keyexpr();
1152                    let Ok(parsed_keyexpr) = ke_liveliness::parse(heartbeat_keyexpr) else {
1153                        return;
1154                    };
1155                    let source_id = {
1156                        let Ok(zid) = ZenohId::from_str(parsed_keyexpr.zid().as_str()) else {
1157                            return;
1158                        };
1159                        let Ok(eid) = EntityId::from_str(parsed_keyexpr.eid().as_str()) else {
1160                            return;
1161                        };
1162                        EntityGlobalId::new(zid, eid)
1163                    };
1164
1165                    let Ok(heartbeat_sn) = z_deserialize::<WrappingSn>(sample_hb.payload()) else {
1166                        tracing::debug!(
1167                            "AdvancedSubscriber{{}}: Skipping invalid heartbeat payload on '{}'",
1168                            heartbeat_keyexpr
1169                        );
1170                        return;
1171                    };
1172
1173                    let mut lock = zlock!(statesref);
1174                    let states = &mut *lock;
1175                    let entry = states.sequenced_states.entry(source_id);
1176                    if matches!(&entry, Entry::Vacant(_)) {
1177                        // NOTE: API does not allow both heartbeat and periodic_queries
1178                        spawn_periodic_queries!(states, source_id, statesref.clone());
1179                        if states.global_pending_queries > 0 {
1180                            tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Skipping heartbeat on '{}' from publisher that is currently being pulled by global query", states.key_expr, heartbeat_keyexpr);
1181                            return;
1182                        }
1183                    }
1184
1185                    let state = entry.or_insert(SourceState::<WrappingSn> {
1186                        last_delivered: None,
1187                        pending_queries: 0,
1188                        pending_samples: BTreeMap::new(),
1189                    });
1190
1191                    // check that it's not an old sn, and that there are no pending queries
1192                    if (state.last_delivered.is_none()
1193                        || state.last_delivered.is_some_and(|sn| heartbeat_sn > sn))
1194                        && state.pending_queries == 0
1195                    {
1196                        let seq_num_range = seq_num_range(
1197                            state.last_delivered.map(|s| s + 1),
1198                            Some(heartbeat_sn),
1199                        );
1200
1201                        let session = states.session.clone();
1202                        let key_expr = states.key_expr.clone().into_owned();
1203                        let query_target = states.query_target;
1204                        let query_timeout = states.query_timeout;
1205                        state.pending_queries += 1;
1206
1207                        tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Querying missing samples {}?{}", states.key_expr, heartbeat_keyexpr, seq_num_range);
1208                        drop(lock);
1209
1210                        let handler = SequencedRepliesHandler {
1211                            source_id,
1212                            statesref: statesref.clone(),
1213                        };
1214                        let _ = session
1215                            .get(Selector::from((heartbeat_keyexpr, seq_num_range)))
1216                            .callback({
1217                                move |r: Reply| {
1218                                    if let Ok(s) = r.into_result() {
1219                                        if key_expr.intersects(s.key_expr()) {
1220                                            let states = &mut *zlock!(handler.statesref);
1221                                            tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
1222                                            handle_sample(states, s);
1223                                        }
1224                                    }
1225                                }
1226                            })
1227                            .consolidation(ConsolidationMode::None)
1228                            .accept_replies(ReplyKeyExpr::Any)
1229                            .target(query_target)
1230                            .timeout(query_timeout)
1231                            .wait();
1232                    }
1233                })
1234                .allowed_origin(conf.origin)
1235                .wait()?;
1236            Some(heartbeat_sub)
1237        } else {
1238            None
1239        };
1240
1241        if conf.liveliness {
1242            let suffix = KE_ADV_PREFIX
1243                / KE_SUB
1244                / &subscriber.id().zid().into_keyexpr()
1245                / &KeyExpr::try_from(subscriber.id().eid().to_string()).unwrap();
1246            let suffix = match meta {
1247                Some(meta) => suffix / &meta,
1248                // We need this empty chunk because of a routing matching bug
1249                _ => suffix / KE_EMPTY,
1250            };
1251            tracing::debug!(
1252                "AdvancedSubscriber{{key_expr: {}}}: Declare liveliness token {}",
1253                key_expr,
1254                &key_expr / &suffix,
1255            );
1256            let token = conf
1257                .session
1258                .liveliness()
1259                .declare_token(&key_expr / &suffix)
1260                .wait()?;
1261            zlock!(statesref).token = Some(token)
1262        }
1263
1264        let reliable_subscriber = AdvancedSubscriber {
1265            statesref,
1266            subscriber,
1267            receiver,
1268            liveliness_subscriber,
1269            heartbeat_subscriber,
1270        };
1271
1272        Ok(reliable_subscriber)
1273    }
1274
1275    /// Returns the [`EntityGlobalId`] of this AdvancedSubscriber.
1276    #[zenoh_macros::unstable]
1277    pub fn id(&self) -> EntityGlobalId {
1278        self.subscriber.id()
1279    }
1280
1281    /// Returns the [`KeyExpr`] this subscriber subscribes to.
1282    #[zenoh_macros::unstable]
1283    pub fn key_expr(&self) -> &KeyExpr<'static> {
1284        self.subscriber.key_expr()
1285    }
1286
1287    /// Returns a reference to this subscriber's handler.
1288    ///
1289    /// An handler is anything that implements [`zenoh::handlers::IntoHandler`].
1290    /// The default handler is [`zenoh::handlers::DefaultHandler`].
1291    #[zenoh_macros::unstable]
1292    pub fn handler(&self) -> &Handler {
1293        &self.receiver
1294    }
1295
1296    /// Returns a mutable reference to this subscriber's handler.
1297    ///
1298    /// An handler is anything that implements [`zenoh::handlers::IntoHandler`].
1299    /// The default handler is [`zenoh::handlers::DefaultHandler`].
1300    #[zenoh_macros::unstable]
1301    pub fn handler_mut(&mut self) -> &mut Handler {
1302        &mut self.receiver
1303    }
1304
1305    /// Declares a listener to detect missed samples.
1306    ///
1307    /// Missed samples can only be detected from [`AdvancedPublisher`](crate::AdvancedPublisher) that
1308    /// enable [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection).
1309    #[zenoh_macros::unstable]
1310    pub fn sample_miss_listener(&self) -> SampleMissListenerBuilder<'_, DefaultHandler> {
1311        SampleMissListenerBuilder {
1312            statesref: &self.statesref,
1313            handler: DefaultHandler::default(),
1314        }
1315    }
1316
1317    /// Declares a listener to detect matching publishers.
1318    ///
1319    /// Only [`AdvancedPublisher`](crate::AdvancedPublisher) that enable
1320    /// [`publisher_detection`](crate::AdvancedPublisherBuilder::publisher_detection) can be detected.
1321    #[zenoh_macros::unstable]
1322    pub fn detect_publishers(&self) -> LivelinessSubscriberBuilder<'_, '_, DefaultHandler> {
1323        self.subscriber
1324            .session()
1325            .liveliness()
1326            .declare_subscriber(self.subscriber.key_expr() / KE_ADV_PREFIX / KE_PUB / KE_STARSTAR)
1327    }
1328
1329    /// Undeclares this AdvancedSubscriber
1330    #[inline]
1331    #[zenoh_macros::unstable]
1332    pub fn undeclare(self) -> SubscriberUndeclaration<()> {
1333        tracing::debug!(
1334            "AdvancedSubscriber{{key_expr: {}}}: Undeclare",
1335            self.key_expr()
1336        );
1337        self.subscriber.undeclare()
1338    }
1339
1340    fn set_background_impl(&mut self, background: bool) {
1341        self.subscriber.set_background(background);
1342        if let Some(mut liveliness_sub) = self.liveliness_subscriber.take() {
1343            liveliness_sub.set_background(background);
1344        }
1345        if let Some(mut heartbeat_sub) = self.heartbeat_subscriber.take() {
1346            heartbeat_sub.set_background(background);
1347        }
1348    }
1349
1350    #[zenoh_macros::internal]
1351    pub fn set_background(&mut self, background: bool) {
1352        self.set_background_impl(background)
1353    }
1354}
1355
1356#[zenoh_macros::unstable]
1357#[inline]
1358fn flush_sequenced_source(
1359    state: &mut SourceState<WrappingSn>,
1360    callback: Option<&Callback<Sample>>,
1361    source_id: &EntityGlobalId,
1362    miss_handlers: &HashMap<usize, Callback<Miss>>,
1363) {
1364    let Some(callback) = callback else {
1365        return;
1366    };
1367    if state.pending_queries == 0 && !state.pending_samples.is_empty() {
1368        let mut pending_samples = BTreeMap::new();
1369        std::mem::swap(&mut state.pending_samples, &mut pending_samples);
1370        for (seq_num, sample) in pending_samples {
1371            match state.last_delivered {
1372                None => {
1373                    state.last_delivered = Some(seq_num);
1374                    callback.call(sample);
1375                }
1376                Some(last) if seq_num == last + 1 => {
1377                    state.last_delivered = Some(seq_num);
1378                    callback.call(sample);
1379                }
1380                Some(last) if seq_num > last + 1 => {
1381                    tracing::warn!(
1382                        "Sample missed: missed {} samples from {:?}.",
1383                        seq_num - last - 1,
1384                        source_id,
1385                    );
1386                    for miss_callback in miss_handlers.values() {
1387                        miss_callback.call(Miss {
1388                            source: *source_id,
1389                            nb: seq_num - last - 1,
1390                        })
1391                    }
1392                    state.last_delivered = Some(seq_num);
1393                    callback.call(sample);
1394                }
1395                _ => {
1396                    // duplicate
1397                }
1398            }
1399        }
1400    }
1401}
1402
1403#[zenoh_macros::unstable]
1404#[inline]
1405fn flush_timestamped_source(
1406    state: &mut SourceState<Timestamp>,
1407    callback: Option<&Callback<Sample>>,
1408) {
1409    let Some(callback) = callback else {
1410        return;
1411    };
1412    if state.pending_queries == 0 && !state.pending_samples.is_empty() {
1413        let mut pending_samples = BTreeMap::new();
1414        std::mem::swap(&mut state.pending_samples, &mut pending_samples);
1415        for (timestamp, sample) in pending_samples {
1416            if state
1417                .last_delivered
1418                .map(|last| timestamp > last)
1419                .unwrap_or(true)
1420            {
1421                state.last_delivered = Some(timestamp);
1422                callback.call(sample);
1423            }
1424        }
1425    }
1426}
1427
1428#[zenoh_macros::unstable]
1429#[derive(Clone)]
1430struct InitialRepliesHandler {
1431    statesref: Arc<Mutex<State>>,
1432}
1433
1434#[zenoh_macros::unstable]
1435impl Drop for InitialRepliesHandler {
1436    fn drop(&mut self) {
1437        let states = &mut *zlock!(self.statesref);
1438        states.global_pending_queries = states.global_pending_queries.saturating_sub(1);
1439        tracing::trace!(
1440            "AdvancedSubscriber{{key_expr: {}}}: Flush initial replies",
1441            states.key_expr
1442        );
1443
1444        if states.global_pending_queries == 0 {
1445            for (source_id, state) in states.sequenced_states.iter_mut() {
1446                flush_sequenced_source(
1447                    state,
1448                    states.callback.as_ref(),
1449                    source_id,
1450                    &states.miss_handlers,
1451                );
1452                spawn_periodic_queries!(states, *source_id, self.statesref.clone());
1453            }
1454            for state in states.timestamped_states.values_mut() {
1455                flush_timestamped_source(state, states.callback.as_ref());
1456            }
1457        }
1458    }
1459}
1460
1461#[zenoh_macros::unstable]
1462#[derive(Clone)]
1463struct SequencedRepliesHandler {
1464    source_id: EntityGlobalId,
1465    statesref: Arc<Mutex<State>>,
1466}
1467
1468#[zenoh_macros::unstable]
1469impl Drop for SequencedRepliesHandler {
1470    fn drop(&mut self) {
1471        let states = &mut *zlock!(self.statesref);
1472        if let Some(state) = states.sequenced_states.get_mut(&self.source_id) {
1473            state.pending_queries = state.pending_queries.saturating_sub(1);
1474            if states.global_pending_queries == 0 {
1475                tracing::trace!(
1476                    "AdvancedSubscriber{{key_expr: {}}}: Flush sequenced samples",
1477                    states.key_expr
1478                );
1479                flush_sequenced_source(
1480                    state,
1481                    states.callback.as_ref(),
1482                    &self.source_id,
1483                    &states.miss_handlers,
1484                )
1485            }
1486        }
1487    }
1488}
1489
1490#[zenoh_macros::unstable]
1491#[derive(Clone)]
1492struct TimestampedRepliesHandler {
1493    id: ID,
1494    statesref: Arc<Mutex<State>>,
1495}
1496
1497#[zenoh_macros::unstable]
1498impl Drop for TimestampedRepliesHandler {
1499    fn drop(&mut self) {
1500        let states = &mut *zlock!(self.statesref);
1501        if let Some(state) = states.timestamped_states.get_mut(&self.id) {
1502            state.pending_queries = state.pending_queries.saturating_sub(1);
1503            if states.global_pending_queries == 0 {
1504                tracing::trace!(
1505                    "AdvancedSubscriber{{key_expr: {}}}: Flush timestamped samples",
1506                    states.key_expr
1507                );
1508                flush_timestamped_source(state, states.callback.as_ref());
1509            }
1510        }
1511    }
1512}
1513
1514/// A struct that represent missed samples.
1515#[zenoh_macros::unstable]
1516#[derive(Debug, Clone)]
1517pub struct Miss {
1518    source: EntityGlobalId,
1519    nb: u32,
1520}
1521
1522impl Miss {
1523    /// The source of missed samples.
1524    pub fn source(&self) -> EntityGlobalId {
1525        self.source
1526    }
1527
1528    /// The number of missed samples.
1529    pub fn nb(&self) -> u32 {
1530        self.nb
1531    }
1532}
1533
1534impl CallbackParameter for Miss {
1535    type Message<'a> = Self;
1536
1537    fn from_message(msg: Self::Message<'_>) -> Self {
1538        msg
1539    }
1540}
1541
1542/// A listener to detect missed samples.
1543///
1544/// Missed samples can only be detected from [`AdvancedPublisher`](crate::AdvancedPublisher) that
1545/// enable [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection).
1546#[zenoh_macros::unstable]
1547pub struct SampleMissListener<Handler> {
1548    id: usize,
1549    statesref: Arc<Mutex<State>>,
1550    handler: Handler,
1551    undeclare_on_drop: bool,
1552}
1553
1554#[zenoh_macros::unstable]
1555impl<Handler> SampleMissListener<Handler> {
1556    #[inline]
1557    pub fn undeclare(self) -> SampleMissHandlerUndeclaration<Handler>
1558    where
1559        Handler: Send,
1560    {
1561        SampleMissHandlerUndeclaration { listener: self }
1562    }
1563
1564    fn undeclare_impl(&mut self) -> ZResult<()> {
1565        // set the flag first to avoid double panic if this function panic
1566        self.undeclare_on_drop = false;
1567        zlock!(self.statesref).unregister_miss_callback(&self.id);
1568        Ok(())
1569    }
1570
1571    #[zenoh_macros::internal]
1572    pub fn set_background(&mut self, background: bool) {
1573        self.undeclare_on_drop = !background;
1574    }
1575}
1576
1577#[cfg(feature = "unstable")]
1578impl<Handler> Drop for SampleMissListener<Handler> {
1579    fn drop(&mut self) {
1580        if self.undeclare_on_drop {
1581            if let Err(error) = self.undeclare_impl() {
1582                tracing::error!(error);
1583            }
1584        }
1585    }
1586}
1587
1588// #[zenoh_macros::unstable]
1589// impl<Handler: Send> UndeclarableSealed<()> for SampleMissHandler<Handler> {
1590//     type Undeclaration = SampleMissHandlerUndeclaration<Handler>;
1591
1592//     fn undeclare_inner(self, _: ()) -> Self::Undeclaration {
1593//         SampleMissHandlerUndeclaration(self)
1594//     }
1595// }
1596
1597#[zenoh_macros::unstable]
1598impl<Handler> std::ops::Deref for SampleMissListener<Handler> {
1599    type Target = Handler;
1600
1601    fn deref(&self) -> &Self::Target {
1602        &self.handler
1603    }
1604}
1605#[zenoh_macros::unstable]
1606impl<Handler> std::ops::DerefMut for SampleMissListener<Handler> {
1607    fn deref_mut(&mut self) -> &mut Self::Target {
1608        &mut self.handler
1609    }
1610}
1611
1612/// A [`Resolvable`] returned by [`SampleMissListener::undeclare`]
1613#[zenoh_macros::unstable]
1614pub struct SampleMissHandlerUndeclaration<Handler> {
1615    listener: SampleMissListener<Handler>,
1616}
1617
1618impl<Handler> SampleMissHandlerUndeclaration<Handler> {
1619    /// Block in undeclare operation until all currently running instances of sample miss listener callback (if any) return.
1620    pub fn wait_callbacks(self) -> Self {
1621        // Note: no particular synchronization is required as of now since miss listener callbacks are always executed
1622        // under state lock
1623        self
1624    }
1625}
1626
1627#[zenoh_macros::unstable]
1628impl<Handler> Resolvable for SampleMissHandlerUndeclaration<Handler> {
1629    type To = ZResult<()>;
1630}
1631
1632#[zenoh_macros::unstable]
1633impl<Handler> Wait for SampleMissHandlerUndeclaration<Handler> {
1634    fn wait(mut self) -> <Self as Resolvable>::To {
1635        self.listener.undeclare_impl()
1636    }
1637}
1638
1639#[zenoh_macros::unstable]
1640impl<Handler> IntoFuture for SampleMissHandlerUndeclaration<Handler> {
1641    type Output = <Self as Resolvable>::To;
1642    type IntoFuture = Ready<<Self as Resolvable>::To>;
1643
1644    fn into_future(self) -> Self::IntoFuture {
1645        std::future::ready(self.wait())
1646    }
1647}
1648
1649/// A builder for initializing a [`SampleMissListener`].
1650#[zenoh_macros::unstable]
1651pub struct SampleMissListenerBuilder<'a, Handler, const BACKGROUND: bool = false> {
1652    statesref: &'a Arc<Mutex<State>>,
1653    handler: Handler,
1654}
1655
1656#[zenoh_macros::unstable]
1657impl<'a> SampleMissListenerBuilder<'a, DefaultHandler> {
1658    /// Receive the sample miss notification with a callback.
1659    #[inline]
1660    #[zenoh_macros::unstable]
1661    pub fn callback<F>(self, callback: F) -> SampleMissListenerBuilder<'a, Callback<Miss>>
1662    where
1663        F: Fn(Miss) + Send + Sync + 'static,
1664    {
1665        self.with(Callback::from(callback))
1666    }
1667
1668    /// Receive the sample miss notification with a mutable callback.
1669    #[inline]
1670    #[zenoh_macros::unstable]
1671    pub fn callback_mut<F>(self, callback: F) -> SampleMissListenerBuilder<'a, Callback<Miss>>
1672    where
1673        F: FnMut(Miss) + Send + Sync + 'static,
1674    {
1675        self.callback(zenoh::handlers::locked(callback))
1676    }
1677
1678    /// Receive the sample miss notification with a [`Handler`](IntoHandler).
1679    #[inline]
1680    #[zenoh_macros::unstable]
1681    pub fn with<Handler>(self, handler: Handler) -> SampleMissListenerBuilder<'a, Handler>
1682    where
1683        Handler: IntoHandler<Miss>,
1684    {
1685        SampleMissListenerBuilder {
1686            statesref: self.statesref,
1687            handler,
1688        }
1689    }
1690}
1691
1692#[zenoh_macros::unstable]
1693impl<'a> SampleMissListenerBuilder<'a, Callback<Miss>> {
1694    /// Make the sample miss notification run in the background until the advanced subscriber is undeclared.
1695    ///
1696    /// Background builder doesn't return a `SampleMissHandler` object anymore.
1697    #[zenoh_macros::unstable]
1698    pub fn background(self) -> SampleMissListenerBuilder<'a, Callback<Miss>, true> {
1699        SampleMissListenerBuilder {
1700            statesref: self.statesref,
1701            handler: self.handler,
1702        }
1703    }
1704}
1705
1706#[zenoh_macros::unstable]
1707impl<Handler> Resolvable for SampleMissListenerBuilder<'_, Handler>
1708where
1709    Handler: IntoHandler<Miss> + Send,
1710    Handler::Handler: Send,
1711{
1712    type To = ZResult<SampleMissListener<Handler::Handler>>;
1713}
1714
1715#[zenoh_macros::unstable]
1716impl<Handler> Wait for SampleMissListenerBuilder<'_, Handler>
1717where
1718    Handler: IntoHandler<Miss> + Send,
1719    Handler::Handler: Send,
1720{
1721    #[zenoh_macros::unstable]
1722    fn wait(self) -> <Self as Resolvable>::To {
1723        let (callback, handler) = self.handler.into_handler();
1724        let id = zlock!(self.statesref).register_miss_callback(callback);
1725        Ok(SampleMissListener {
1726            id,
1727            statesref: self.statesref.clone(),
1728            handler,
1729            undeclare_on_drop: true,
1730        })
1731    }
1732}
1733
1734#[zenoh_macros::unstable]
1735impl<Handler> IntoFuture for SampleMissListenerBuilder<'_, Handler>
1736where
1737    Handler: IntoHandler<Miss> + Send,
1738    Handler::Handler: Send,
1739{
1740    type Output = <Self as Resolvable>::To;
1741    type IntoFuture = Ready<<Self as Resolvable>::To>;
1742
1743    #[zenoh_macros::unstable]
1744    fn into_future(self) -> Self::IntoFuture {
1745        std::future::ready(self.wait())
1746    }
1747}
1748
1749#[zenoh_macros::unstable]
1750impl Resolvable for SampleMissListenerBuilder<'_, Callback<Miss>, true> {
1751    type To = ZResult<()>;
1752}
1753
1754#[zenoh_macros::unstable]
1755impl Wait for SampleMissListenerBuilder<'_, Callback<Miss>, true> {
1756    #[zenoh_macros::unstable]
1757    fn wait(self) -> <Self as Resolvable>::To {
1758        let (callback, _) = self.handler.into_handler();
1759        zlock!(self.statesref).register_miss_callback(callback);
1760        Ok(())
1761    }
1762}
1763
1764#[zenoh_macros::unstable]
1765impl IntoFuture for SampleMissListenerBuilder<'_, Callback<Miss>, true> {
1766    type Output = <Self as Resolvable>::To;
1767    type IntoFuture = Ready<<Self as Resolvable>::To>;
1768
1769    #[zenoh_macros::unstable]
1770    fn into_future(self) -> Self::IntoFuture {
1771        std::future::ready(self.wait())
1772    }
1773}