zenoh_ext/
advanced_subscriber.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::{collections::BTreeMap, future::IntoFuture, str::FromStr};
15
16use zenoh::{
17    config::ZenohId,
18    handlers::{Callback, CallbackParameter, IntoHandler},
19    key_expr::KeyExpr,
20    liveliness::{LivelinessSubscriberBuilder, LivelinessToken},
21    pubsub::SubscriberBuilder,
22    query::{
23        ConsolidationMode, Parameters, Selector, TimeBound, TimeExpr, TimeRange, ZenohParameters,
24    },
25    sample::{Locality, Sample, SampleKind, SourceSn},
26    session::{EntityGlobalId, EntityId},
27    Resolvable, Resolve, Session, Wait, KE_ADV_PREFIX, KE_EMPTY, KE_PUB, KE_STAR, KE_STARSTAR,
28    KE_SUB,
29};
30use zenoh_util::{Timed, TimedEvent, Timer};
31#[zenoh_macros::unstable]
32use {
33    async_trait::async_trait,
34    std::collections::hash_map::Entry,
35    std::collections::HashMap,
36    std::convert::TryFrom,
37    std::future::Ready,
38    std::sync::{Arc, Mutex},
39    std::time::Duration,
40    uhlc::ID,
41    zenoh::handlers::{locked, DefaultHandler},
42    zenoh::internal::{runtime::ZRuntime, zlock},
43    zenoh::pubsub::Subscriber,
44    zenoh::query::{QueryTarget, Reply, ReplyKeyExpr},
45    zenoh::time::Timestamp,
46    zenoh::Result as ZResult,
47};
48
49use crate::{
50    advanced_cache::{ke_liveliness, KE_UHLC},
51    z_deserialize,
52};
53
54#[derive(Debug, Default, Clone)]
55/// Configure query for historical data.
56#[zenoh_macros::unstable]
57pub struct HistoryConfig {
58    liveliness: bool,
59    sample_depth: Option<usize>,
60    age: Option<f64>,
61}
62
63#[zenoh_macros::unstable]
64impl HistoryConfig {
65    /// Enable detection of late joiner publishers and query for their historical data.
66    ///
67    /// Late joiner detection can only be achieved for [`AdvancedPublishers`](crate::AdvancedPublisher) that enable publisher_detection.
68    /// History can only be retransmitted by [`AdvancedPublishers`](crate::AdvancedPublisher) that enable [`cache`](crate::AdvancedPublisherBuilder::cache).
69    #[inline]
70    #[zenoh_macros::unstable]
71    pub fn detect_late_publishers(mut self) -> Self {
72        self.liveliness = true;
73        self
74    }
75
76    /// Specify how many samples to query for each resource.
77    #[zenoh_macros::unstable]
78    pub fn max_samples(mut self, depth: usize) -> Self {
79        self.sample_depth = Some(depth);
80        self
81    }
82
83    /// Specify the maximum age of samples to query.
84    #[zenoh_macros::unstable]
85    pub fn max_age(mut self, seconds: f64) -> Self {
86        self.age = Some(seconds);
87        self
88    }
89}
90
91#[derive(Debug, Default, Clone, Copy)]
92/// Configure retransmission.
93#[zenoh_macros::unstable]
94pub struct RecoveryConfig<const CONFIGURED: bool = true> {
95    periodic_queries: Option<Duration>,
96    heartbeat: bool,
97}
98
99#[zenoh_macros::unstable]
100impl RecoveryConfig<false> {
101    /// Enable periodic queries for not yet received Samples and specify their period.
102    ///
103    /// This allows to retrieve the last Sample(s) if the last Sample(s) is/are lost.
104    /// So it is useful for sporadic publications but useless for periodic publications
105    /// with a period smaller or equal to this period.
106    /// Retransmission can only be achieved by [`AdvancedPublishers`](crate::AdvancedPublisher)
107    /// that enable [`cache`](crate::AdvancedPublisherBuilder::cache) and
108    /// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection).
109    #[zenoh_macros::unstable]
110    #[inline]
111    pub fn periodic_queries(self, period: Duration) -> RecoveryConfig<true> {
112        RecoveryConfig {
113            periodic_queries: Some(period),
114            heartbeat: false,
115        }
116    }
117
118    /// Subscribe to heartbeats of [`AdvancedPublishers`](crate::AdvancedPublisher).
119    ///
120    /// This allows to receive the last published Sample's sequence number and check for misses.
121    /// Heartbeat subscriber must be paired with [`AdvancedPublishers`](crate::AdvancedPublisher)
122    /// that enable [`cache`](crate::AdvancedPublisherBuilder::cache) and
123    /// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection) with
124    /// [`heartbeat`](crate::advanced_publisher::MissDetectionConfig::heartbeat) or
125    /// [`sporadic_heartbeat`](crate::advanced_publisher::MissDetectionConfig::sporadic_heartbeat).
126    #[zenoh_macros::unstable]
127    #[inline]
128    pub fn heartbeat(self) -> RecoveryConfig<true> {
129        RecoveryConfig {
130            periodic_queries: None,
131            heartbeat: true,
132        }
133    }
134}
135
136/// The builder of an [`AdvancedSubscriber`], allowing to configure it.
137#[zenoh_macros::unstable]
138pub struct AdvancedSubscriberBuilder<'a, 'b, 'c, Handler, const BACKGROUND: bool = false> {
139    pub(crate) session: &'a Session,
140    pub(crate) key_expr: ZResult<KeyExpr<'b>>,
141    pub(crate) origin: Locality,
142    pub(crate) retransmission: Option<RecoveryConfig>,
143    pub(crate) query_target: QueryTarget,
144    pub(crate) query_timeout: Duration,
145    pub(crate) history: Option<HistoryConfig>,
146    pub(crate) liveliness: bool,
147    pub(crate) meta_key_expr: Option<ZResult<KeyExpr<'c>>>,
148    pub(crate) handler: Handler,
149}
150
151#[zenoh_macros::unstable]
152impl<'a, 'b, Handler> AdvancedSubscriberBuilder<'a, 'b, '_, Handler> {
153    #[zenoh_macros::unstable]
154    pub(crate) fn new(builder: SubscriberBuilder<'a, 'b, Handler>) -> Self {
155        AdvancedSubscriberBuilder {
156            session: builder.session,
157            key_expr: builder.key_expr,
158            origin: builder.origin,
159            handler: builder.handler,
160            retransmission: None,
161            query_target: QueryTarget::All,
162            query_timeout: Duration::from_secs(10),
163            history: None,
164            liveliness: false,
165            meta_key_expr: None,
166        }
167    }
168}
169
170#[zenoh_macros::unstable]
171impl<'a, 'b, 'c> AdvancedSubscriberBuilder<'a, 'b, 'c, DefaultHandler> {
172    /// Add callback to AdvancedSubscriber.
173    #[inline]
174    #[zenoh_macros::unstable]
175    pub fn callback<F>(self, callback: F) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback<Sample>>
176    where
177        F: Fn(Sample) + Send + Sync + 'static,
178    {
179        self.with(Callback::from(callback))
180    }
181
182    /// Add callback to `AdvancedSubscriber`.
183    ///
184    /// Using this guarantees that your callback will never be called concurrently.
185    /// If your callback is also accepted by the [`callback`](AdvancedSubscriberBuilder::callback) method, we suggest you use it instead of `callback_mut`
186    #[inline]
187    #[zenoh_macros::unstable]
188    pub fn callback_mut<F>(
189        self,
190        callback: F,
191    ) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback<Sample>>
192    where
193        F: FnMut(Sample) + Send + Sync + 'static,
194    {
195        self.callback(locked(callback))
196    }
197
198    /// Make the built AdvancedSubscriber an [`AdvancedSubscriber`](AdvancedSubscriber).
199    #[inline]
200    #[zenoh_macros::unstable]
201    pub fn with<Handler>(self, handler: Handler) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Handler>
202    where
203        Handler: IntoHandler<Sample>,
204    {
205        AdvancedSubscriberBuilder {
206            session: self.session,
207            key_expr: self.key_expr,
208            origin: self.origin,
209            retransmission: self.retransmission,
210            query_target: self.query_target,
211            query_timeout: self.query_timeout,
212            history: self.history,
213            liveliness: self.liveliness,
214            meta_key_expr: self.meta_key_expr,
215            handler,
216        }
217    }
218}
219
220#[zenoh_macros::unstable]
221impl<'a, 'b, 'c> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback<Sample>> {
222    /// Register the subscriber callback to be run in background until the session is closed.
223    ///
224    /// Background builder doesn't return a `AdvancedSubscriber` object anymore.
225    pub fn background(self) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback<Sample>, true> {
226        AdvancedSubscriberBuilder {
227            session: self.session,
228            key_expr: self.key_expr,
229            origin: self.origin,
230            retransmission: self.retransmission,
231            query_target: self.query_target,
232            query_timeout: self.query_timeout,
233            history: self.history,
234            liveliness: self.liveliness,
235            meta_key_expr: self.meta_key_expr,
236            handler: self.handler,
237        }
238    }
239}
240
241#[zenoh_macros::unstable]
242impl<'a, 'c, Handler, const BACKGROUND: bool>
243    AdvancedSubscriberBuilder<'a, '_, 'c, Handler, BACKGROUND>
244{
245    /// Restrict the matching publications that will be receive by this [`Subscriber`]
246    /// to the ones that have the given [`Locality`](crate::prelude::Locality).
247    #[zenoh_macros::unstable]
248    #[inline]
249    pub fn allowed_origin(mut self, origin: Locality) -> Self {
250        self.origin = origin;
251        self
252    }
253
254    /// Ask for retransmission of detected lost Samples.
255    ///
256    /// Retransmission can only be achieved by [`AdvancedPublishers`](crate::AdvancedPublisher)
257    /// that enable [`cache`](crate::AdvancedPublisherBuilder::cache) and
258    /// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection).
259    #[zenoh_macros::unstable]
260    #[inline]
261    pub fn recovery(mut self, conf: RecoveryConfig) -> Self {
262        self.retransmission = Some(conf);
263        self
264    }
265
266    // /// Change the target to be used for queries.
267
268    // #[inline]
269    // pub fn query_target(mut self, query_target: QueryTarget) -> Self {
270    //     self.query_target = query_target;
271    //     self
272    // }
273
274    /// Change the timeout to be used for queries (history, retransmission).
275    #[zenoh_macros::unstable]
276    #[inline]
277    pub fn query_timeout(mut self, query_timeout: Duration) -> Self {
278        self.query_timeout = query_timeout;
279        self
280    }
281
282    /// Enable query for historical data.
283    ///
284    /// History can only be retransmitted by [`AdvancedPublishers`](crate::AdvancedPublisher) that enable [`cache`](crate::AdvancedPublisherBuilder::cache).
285    #[zenoh_macros::unstable]
286    #[inline]
287    pub fn history(mut self, config: HistoryConfig) -> Self {
288        self.history = Some(config);
289        self
290    }
291
292    /// Allow this subscriber to be detected through liveliness.
293    #[zenoh_macros::unstable]
294    pub fn subscriber_detection(mut self) -> Self {
295        self.liveliness = true;
296        self
297    }
298
299    /// A key expression added to the liveliness token key expression.
300    /// It can be used to convey meta data.
301    #[zenoh_macros::unstable]
302    pub fn subscriber_detection_metadata<TryIntoKeyExpr>(mut self, meta: TryIntoKeyExpr) -> Self
303    where
304        TryIntoKeyExpr: TryInto<KeyExpr<'c>>,
305        <TryIntoKeyExpr as TryInto<KeyExpr<'c>>>::Error: Into<zenoh::Error>,
306    {
307        self.meta_key_expr = Some(meta.try_into().map_err(Into::into));
308        self
309    }
310
311    #[zenoh_macros::unstable]
312    fn with_static_keys(self) -> AdvancedSubscriberBuilder<'a, 'static, 'static, Handler> {
313        AdvancedSubscriberBuilder {
314            session: self.session,
315            key_expr: self.key_expr.map(|s| s.into_owned()),
316            origin: self.origin,
317            retransmission: self.retransmission,
318            query_target: self.query_target,
319            query_timeout: self.query_timeout,
320            history: self.history,
321            liveliness: self.liveliness,
322            meta_key_expr: self.meta_key_expr.map(|s| s.map(|s| s.into_owned())),
323            handler: self.handler,
324        }
325    }
326}
327
328#[zenoh_macros::unstable]
329impl<Handler> Resolvable for AdvancedSubscriberBuilder<'_, '_, '_, Handler>
330where
331    Handler: IntoHandler<Sample>,
332    Handler::Handler: Send,
333{
334    type To = ZResult<AdvancedSubscriber<Handler::Handler>>;
335}
336
337#[zenoh_macros::unstable]
338impl<Handler> Wait for AdvancedSubscriberBuilder<'_, '_, '_, Handler>
339where
340    Handler: IntoHandler<Sample> + Send,
341    Handler::Handler: Send,
342{
343    #[zenoh_macros::unstable]
344    fn wait(self) -> <Self as Resolvable>::To {
345        AdvancedSubscriber::new(self.with_static_keys())
346    }
347}
348
349#[zenoh_macros::unstable]
350impl<Handler> IntoFuture for AdvancedSubscriberBuilder<'_, '_, '_, Handler>
351where
352    Handler: IntoHandler<Sample> + Send,
353    Handler::Handler: Send,
354{
355    type Output = <Self as Resolvable>::To;
356    type IntoFuture = Ready<<Self as Resolvable>::To>;
357
358    #[zenoh_macros::unstable]
359    fn into_future(self) -> Self::IntoFuture {
360        std::future::ready(self.wait())
361    }
362}
363
364#[zenoh_macros::unstable]
365impl Resolvable for AdvancedSubscriberBuilder<'_, '_, '_, Callback<Sample>, true> {
366    type To = ZResult<()>;
367}
368
369#[zenoh_macros::unstable]
370impl Wait for AdvancedSubscriberBuilder<'_, '_, '_, Callback<Sample>, true> {
371    #[zenoh_macros::unstable]
372    fn wait(self) -> <Self as Resolvable>::To {
373        let mut sub = AdvancedSubscriber::new(self.with_static_keys())?;
374        sub.set_background_impl(true);
375        Ok(())
376    }
377}
378
379#[zenoh_macros::unstable]
380impl IntoFuture for AdvancedSubscriberBuilder<'_, '_, '_, Callback<Sample>, true> {
381    type Output = <Self as Resolvable>::To;
382    type IntoFuture = Ready<<Self as Resolvable>::To>;
383
384    #[zenoh_macros::unstable]
385    fn into_future(self) -> Self::IntoFuture {
386        std::future::ready(self.wait())
387    }
388}
389
390#[zenoh_macros::unstable]
391struct Period {
392    timer: Timer,
393    period: Duration,
394}
395
396#[zenoh_macros::unstable]
397struct State {
398    next_id: usize,
399    global_pending_queries: u64,
400    sequenced_states: HashMap<EntityGlobalId, SourceState<u32>>,
401    timestamped_states: HashMap<ID, SourceState<Timestamp>>,
402    session: Session,
403    key_expr: KeyExpr<'static>,
404    retransmission: bool,
405    period: Option<Period>,
406    history_depth: usize,
407    query_target: QueryTarget,
408    query_timeout: Duration,
409    callback: Callback<Sample>,
410    miss_handlers: HashMap<usize, Callback<Miss>>,
411    token: Option<LivelinessToken>,
412}
413
414#[zenoh_macros::unstable]
415impl State {
416    #[zenoh_macros::unstable]
417    fn register_miss_callback(&mut self, callback: Callback<Miss>) -> usize {
418        let id = self.next_id;
419        self.next_id += 1;
420        self.miss_handlers.insert(id, callback);
421        id
422    }
423    #[zenoh_macros::unstable]
424    fn unregister_miss_callback(&mut self, id: &usize) {
425        self.miss_handlers.remove(id);
426    }
427}
428
429macro_rules! spawn_periodoic_queries {
430    ($p:expr,$s:expr,$r:expr) => {{
431        if let Some(period) = &$p.period {
432            period.timer.add(TimedEvent::periodic(
433                period.period,
434                PeriodicQuery {
435                    source_id: $s,
436                    statesref: $r,
437                },
438            ))
439        }
440    }};
441}
442
443#[zenoh_macros::unstable]
444struct SourceState<T> {
445    last_delivered: Option<T>,
446    pending_queries: u64,
447    pending_samples: BTreeMap<T, Sample>,
448}
449
450/// [`AdvancedSubscriber`].
451#[zenoh_macros::unstable]
452pub struct AdvancedSubscriber<Receiver> {
453    statesref: Arc<Mutex<State>>,
454    subscriber: Subscriber<()>,
455    receiver: Receiver,
456    liveliness_subscriber: Option<Subscriber<()>>,
457    heartbeat_subscriber: Option<Subscriber<()>>,
458}
459
460#[zenoh_macros::unstable]
461impl<Receiver> std::ops::Deref for AdvancedSubscriber<Receiver> {
462    type Target = Receiver;
463    fn deref(&self) -> &Self::Target {
464        &self.receiver
465    }
466}
467
468#[zenoh_macros::unstable]
469impl<Receiver> std::ops::DerefMut for AdvancedSubscriber<Receiver> {
470    fn deref_mut(&mut self) -> &mut Self::Target {
471        &mut self.receiver
472    }
473}
474
475#[zenoh_macros::unstable]
476fn handle_sample(states: &mut State, sample: Sample) -> bool {
477    if let (Some(source_id), Some(source_sn)) = (
478        sample.source_info().source_id(),
479        sample.source_info().source_sn(),
480    ) {
481        #[inline]
482        fn deliver_and_flush(
483            sample: Sample,
484            mut source_sn: SourceSn,
485            callback: &Callback<Sample>,
486            state: &mut SourceState<u32>,
487        ) {
488            callback.call(sample);
489            state.last_delivered = Some(source_sn);
490            while let Some(sample) = state.pending_samples.remove(&(source_sn + 1)) {
491                callback.call(sample);
492                source_sn += 1;
493                state.last_delivered = Some(source_sn);
494            }
495        }
496
497        let entry = states.sequenced_states.entry(*source_id);
498        let new = matches!(&entry, Entry::Vacant(_));
499        let state = entry.or_insert(SourceState::<u32> {
500            last_delivered: None,
501            pending_queries: 0,
502            pending_samples: BTreeMap::new(),
503        });
504        if state.last_delivered.is_none() && states.global_pending_queries != 0 {
505            // Avoid going through the Map if history_depth == 1
506            if states.history_depth == 1 {
507                state.last_delivered = Some(source_sn);
508                states.callback.call(sample);
509            } else {
510                state.pending_samples.insert(source_sn, sample);
511                if state.pending_samples.len() >= states.history_depth {
512                    if let Some((sn, sample)) = state.pending_samples.pop_first() {
513                        deliver_and_flush(sample, sn, &states.callback, state);
514                    }
515                }
516            }
517        } else if state.last_delivered.is_some() && source_sn != state.last_delivered.unwrap() + 1 {
518            if source_sn > state.last_delivered.unwrap() {
519                if states.retransmission {
520                    state.pending_samples.insert(source_sn, sample);
521                } else {
522                    tracing::info!(
523                        "Sample missed: missed {} samples from {:?}.",
524                        source_sn - state.last_delivered.unwrap() - 1,
525                        source_id,
526                    );
527                    for miss_callback in states.miss_handlers.values() {
528                        miss_callback.call(Miss {
529                            source: *source_id,
530                            nb: source_sn - state.last_delivered.unwrap() - 1,
531                        });
532                    }
533                    states.callback.call(sample);
534                    state.last_delivered = Some(source_sn);
535                }
536            }
537        } else {
538            deliver_and_flush(sample, source_sn, &states.callback, state);
539        }
540        new
541    } else if let Some(timestamp) = sample.timestamp() {
542        let entry = states.timestamped_states.entry(*timestamp.get_id());
543        let state = entry.or_insert(SourceState::<Timestamp> {
544            last_delivered: None,
545            pending_queries: 0,
546            pending_samples: BTreeMap::new(),
547        });
548        if state.last_delivered.map(|t| t < *timestamp).unwrap_or(true) {
549            if (states.global_pending_queries == 0 && state.pending_queries == 0)
550                || states.history_depth == 1
551            {
552                state.last_delivered = Some(*timestamp);
553                states.callback.call(sample);
554            } else {
555                state.pending_samples.entry(*timestamp).or_insert(sample);
556                if state.pending_samples.len() >= states.history_depth {
557                    flush_timestamped_source(state, &states.callback);
558                }
559            }
560        }
561        false
562    } else {
563        states.callback.call(sample);
564        false
565    }
566}
567
568#[zenoh_macros::unstable]
569fn seq_num_range(start: Option<u32>, end: Option<u32>) -> String {
570    match (start, end) {
571        (Some(start), Some(end)) => format!("_sn={start}..{end}"),
572        (Some(start), None) => format!("_sn={start}.."),
573        (None, Some(end)) => format!("_sn=..{end}"),
574        (None, None) => "_sn=..".to_string(),
575    }
576}
577
578#[zenoh_macros::unstable]
579#[derive(Clone)]
580struct PeriodicQuery {
581    source_id: EntityGlobalId,
582    statesref: Arc<Mutex<State>>,
583}
584
585#[zenoh_macros::unstable]
586#[async_trait]
587impl Timed for PeriodicQuery {
588    async fn run(&mut self) {
589        let mut lock = zlock!(self.statesref);
590        let states = &mut *lock;
591        if let Some(state) = states.sequenced_states.get_mut(&self.source_id) {
592            state.pending_queries += 1;
593            let query_expr = &states.key_expr
594                / KE_ADV_PREFIX
595                / KE_STAR
596                / &self.source_id.zid().into_keyexpr()
597                / &KeyExpr::try_from(self.source_id.eid().to_string()).unwrap()
598                / KE_STARSTAR;
599            let seq_num_range = seq_num_range(state.last_delivered.map(|s| s + 1), None);
600
601            let session = states.session.clone();
602            let key_expr = states.key_expr.clone().into_owned();
603            let query_target = states.query_target;
604            let query_timeout = states.query_timeout;
605
606            tracing::trace!(
607                "AdvancedSubscriber{{key_expr: {}}}: Querying undelivered samples {}?{}",
608                states.key_expr,
609                query_expr,
610                seq_num_range
611            );
612            drop(lock);
613
614            let handler = SequencedRepliesHandler {
615                source_id: self.source_id,
616                statesref: self.statesref.clone(),
617            };
618            let _ = session
619                .get(Selector::from((query_expr, seq_num_range)))
620                .callback({
621                    move |r: Reply| {
622                        if let Ok(s) = r.into_result() {
623                            if key_expr.intersects(s.key_expr()) {
624                                let states = &mut *zlock!(handler.statesref);
625                                tracing::trace!(
626                                    "AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}",
627                                    states.key_expr,
628                                    s.source_info(),
629                                    s.timestamp()
630                                );
631                                handle_sample(states, s);
632                            }
633                        }
634                    }
635                })
636                .consolidation(ConsolidationMode::None)
637                .accept_replies(ReplyKeyExpr::Any)
638                .target(query_target)
639                .timeout(query_timeout)
640                .wait();
641        }
642    }
643}
644
645#[zenoh_macros::unstable]
646impl<Handler> AdvancedSubscriber<Handler> {
647    fn new<H>(conf: AdvancedSubscriberBuilder<'_, '_, '_, H>) -> ZResult<Self>
648    where
649        H: IntoHandler<Sample, Handler = Handler> + Send,
650    {
651        let (callback, receiver) = conf.handler.into_handler();
652        let key_expr = conf.key_expr?;
653        let meta = match conf.meta_key_expr {
654            Some(meta) => Some(meta?),
655            None => None,
656        };
657        let retransmission = conf.retransmission;
658        let query_target = conf.query_target;
659        let query_timeout = conf.query_timeout;
660        let session = conf.session.clone();
661        let statesref = Arc::new(Mutex::new(State {
662            next_id: 0,
663            sequenced_states: HashMap::new(),
664            timestamped_states: HashMap::new(),
665            global_pending_queries: if conf.history.is_some() { 1 } else { 0 },
666            session,
667            period: retransmission.as_ref().and_then(|r| {
668                let _rt = ZRuntime::Application.enter();
669                r.periodic_queries.map(|p| Period {
670                    timer: Timer::new(false),
671                    period: p,
672                })
673            }),
674            key_expr: key_expr.clone().into_owned(),
675            retransmission: retransmission.is_some(),
676            history_depth: conf
677                .history
678                .as_ref()
679                .and_then(|h| h.sample_depth)
680                .unwrap_or_default(),
681            query_target: conf.query_target,
682            query_timeout: conf.query_timeout,
683            callback: callback.clone(),
684            miss_handlers: HashMap::new(),
685            token: None,
686        }));
687
688        let sub_callback = {
689            let statesref = statesref.clone();
690            let session = conf.session.clone();
691            let key_expr = key_expr.clone().into_owned();
692
693            move |s: Sample| {
694                let mut lock = zlock!(statesref);
695                let states = &mut *lock;
696                let source_id = s.source_info().source_id().cloned();
697                let new = handle_sample(states, s);
698
699                if let Some(source_id) = source_id {
700                    if new {
701                        spawn_periodoic_queries!(states, source_id, statesref.clone());
702                    }
703
704                    if let Some(state) = states.sequenced_states.get_mut(&source_id) {
705                        if retransmission.is_some()
706                            && state.pending_queries == 0
707                            && !state.pending_samples.is_empty()
708                        {
709                            state.pending_queries += 1;
710                            let query_expr = &key_expr
711                                / KE_ADV_PREFIX
712                                / KE_STAR
713                                / &source_id.zid().into_keyexpr()
714                                / &KeyExpr::try_from(source_id.eid().to_string()).unwrap()
715                                / KE_STARSTAR;
716                            let seq_num_range =
717                                seq_num_range(state.last_delivered.map(|s| s + 1), None);
718                            tracing::trace!(
719                                "AdvancedSubscriber{{key_expr: {}}}: Querying missing samples {}?{}",
720                                states.key_expr,
721                                query_expr,
722                                seq_num_range
723                            );
724                            drop(lock);
725                            let handler = SequencedRepliesHandler {
726                                source_id,
727                                statesref: statesref.clone(),
728                            };
729                            let _ = session
730                                .get(Selector::from((query_expr, seq_num_range)))
731                                .callback({
732                                    let key_expr = key_expr.clone().into_owned();
733                                    move |r: Reply| {
734                                        if let Ok(s) = r.into_result() {
735                                            if key_expr.intersects(s.key_expr()) {
736                                                let states = &mut *zlock!(handler.statesref);
737                                                tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
738                                                handle_sample(states, s);
739                                            }
740                                        }
741                                    }
742                                })
743                                .consolidation(ConsolidationMode::None)
744                                .accept_replies(ReplyKeyExpr::Any)
745                                .target(query_target)
746                                .timeout(query_timeout)
747                                .wait();
748                        }
749                    }
750                }
751            }
752        };
753
754        let subscriber = conf
755            .session
756            .declare_subscriber(&key_expr)
757            .callback(sub_callback)
758            .allowed_origin(conf.origin)
759            .wait()?;
760
761        tracing::debug!("Create AdvancedSubscriber{{key_expr: {}}}", key_expr,);
762
763        if let Some(historyconf) = conf.history.as_ref() {
764            let handler = InitialRepliesHandler {
765                statesref: statesref.clone(),
766            };
767            let mut params = Parameters::empty();
768            if let Some(max) = historyconf.sample_depth {
769                params.insert("_max", max.to_string());
770            }
771            if let Some(age) = historyconf.age {
772                params.set_time_range(TimeRange {
773                    start: TimeBound::Inclusive(TimeExpr::Now { offset_secs: -age }),
774                    end: TimeBound::Unbounded,
775                });
776            }
777            tracing::trace!(
778                "AdvancedSubscriber{{key_expr: {}}} Querying historical samples {}?{}",
779                key_expr,
780                &key_expr / KE_ADV_PREFIX / KE_STARSTAR,
781                params
782            );
783            let _ = conf
784                .session
785                .get(Selector::from((
786                    &key_expr / KE_ADV_PREFIX / KE_STARSTAR,
787                    params,
788                )))
789                .callback({
790                    let key_expr = key_expr.clone().into_owned();
791                    move |r: Reply| {
792                        if let Ok(s) = r.into_result() {
793                            if key_expr.intersects(s.key_expr()) {
794                                let states = &mut *zlock!(handler.statesref);
795                                tracing::trace!(
796                                    "AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}",
797                                    states.key_expr,
798                                    s.source_info(),
799                                    s.timestamp()
800                                );
801                                handle_sample(states, s);
802                            }
803                        }
804                    }
805                })
806                .consolidation(ConsolidationMode::None)
807                .accept_replies(ReplyKeyExpr::Any)
808                .target(query_target)
809                .timeout(query_timeout)
810                .wait();
811        }
812
813        let liveliness_subscriber = if let Some(historyconf) = conf.history.as_ref() {
814            if historyconf.liveliness {
815                let live_callback = {
816                    let session = conf.session.clone();
817                    let statesref = statesref.clone();
818                    let key_expr = key_expr.clone().into_owned();
819                    let historyconf = historyconf.clone();
820                    move |s: Sample| {
821                        if s.kind() == SampleKind::Put {
822                            if let Ok(parsed) = ke_liveliness::parse(s.key_expr().as_keyexpr()) {
823                                if let Ok(zid) = ZenohId::from_str(parsed.zid().as_str()) {
824                                    // TODO : If we already have a state associated to this discovered source
825                                    // we should query with the appropriate range to avoid unnecessary retransmissions
826                                    if parsed.eid() == KE_UHLC {
827                                        let mut lock = zlock!(statesref);
828                                        let states = &mut *lock;
829                                        tracing::trace!(
830                                            "AdvancedSubscriber{{key_expr: {}}}: Detect late joiner publishers with zid={}",
831                                            states.key_expr,
832                                            parsed.zid().as_str()
833                                        );
834                                        let entry = states.timestamped_states.entry(ID::from(zid));
835                                        let state = entry.or_insert(SourceState::<Timestamp> {
836                                            last_delivered: None,
837                                            pending_queries: 0,
838                                            pending_samples: BTreeMap::new(),
839                                        });
840                                        state.pending_queries += 1;
841
842                                        let mut params = Parameters::empty();
843                                        if let Some(max) = historyconf.sample_depth {
844                                            params.insert("_max", max.to_string());
845                                        }
846                                        if let Some(age) = historyconf.age {
847                                            params.set_time_range(TimeRange {
848                                                start: TimeBound::Inclusive(TimeExpr::Now {
849                                                    offset_secs: -age,
850                                                }),
851                                                end: TimeBound::Unbounded,
852                                            });
853                                        }
854                                        tracing::trace!(
855                                            "AdvancedSubscriber{{key_expr: {}}}: Querying historical samples {}?{}",
856                                            states.key_expr,
857                                            s.key_expr(),
858                                            params
859                                        );
860                                        drop(lock);
861
862                                        let handler = TimestampedRepliesHandler {
863                                            id: ID::from(zid),
864                                            statesref: statesref.clone(),
865                                            callback: callback.clone(),
866                                        };
867                                        let _ = session
868                                            .get(Selector::from((s.key_expr(), params)))
869                                            .callback({
870                                                let key_expr = key_expr.clone().into_owned();
871                                                move |r: Reply| {
872                                                    if let Ok(s) = r.into_result() {
873                                                        if key_expr.intersects(s.key_expr()) {
874                                                            let states =
875                                                                &mut *zlock!(handler.statesref);
876                                                            tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
877                                                            handle_sample(states, s);
878                                                        }
879                                                    }
880                                                }
881                                            })
882                                            .consolidation(ConsolidationMode::None)
883                                            .accept_replies(ReplyKeyExpr::Any)
884                                            .target(query_target)
885                                            .timeout(query_timeout)
886                                            .wait();
887                                    } else if let Ok(eid) =
888                                        EntityId::from_str(parsed.eid().as_str())
889                                    {
890                                        let source_id = EntityGlobalId::new(zid, eid);
891                                        let mut lock = zlock!(statesref);
892                                        let states = &mut *lock;
893                                        tracing::trace!(
894                                            "AdvancedSubscriber{{key_expr: {}}}: Detect late joiner publishers with zid={}",
895                                            states.key_expr,
896                                            parsed.zid().as_str()
897                                        );
898                                        let entry = states.sequenced_states.entry(source_id);
899                                        let new = matches!(&entry, Entry::Vacant(_));
900                                        let state = entry.or_insert(SourceState::<u32> {
901                                            last_delivered: None,
902                                            pending_queries: 0,
903                                            pending_samples: BTreeMap::new(),
904                                        });
905                                        state.pending_queries += 1;
906
907                                        let mut params = Parameters::empty();
908                                        if let Some(max) = historyconf.sample_depth {
909                                            params.insert("_max", max.to_string());
910                                        }
911                                        if let Some(age) = historyconf.age {
912                                            params.set_time_range(TimeRange {
913                                                start: TimeBound::Inclusive(TimeExpr::Now {
914                                                    offset_secs: -age,
915                                                }),
916                                                end: TimeBound::Unbounded,
917                                            });
918                                        }
919                                        tracing::trace!(
920                                            "AdvancedSubscriber{{key_expr: {}}}: Querying historical samples {}?{}",
921                                            states.key_expr,
922                                            s.key_expr(),
923                                            params,
924                                        );
925                                        drop(lock);
926
927                                        let handler = SequencedRepliesHandler {
928                                            source_id,
929                                            statesref: statesref.clone(),
930                                        };
931                                        let _ = session
932                                            .get(Selector::from((s.key_expr(), params)))
933                                            .callback({
934                                                let key_expr = key_expr.clone().into_owned();
935                                                move |r: Reply| {
936                                                    if let Ok(s) = r.into_result() {
937                                                        if key_expr.intersects(s.key_expr()) {
938                                                            let states =
939                                                                &mut *zlock!(handler.statesref);
940                                                            tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
941                                                            handle_sample(states, s);
942                                                        }
943                                                    }
944                                                }
945                                            })
946                                            .consolidation(ConsolidationMode::None)
947                                            .accept_replies(ReplyKeyExpr::Any)
948                                            .target(query_target)
949                                            .timeout(query_timeout)
950                                            .wait();
951
952                                        if new {
953                                            spawn_periodoic_queries!(
954                                                zlock!(statesref),
955                                                source_id,
956                                                statesref.clone()
957                                            );
958                                        }
959                                    }
960                                } else {
961                                    let mut lock = zlock!(statesref);
962                                    let states = &mut *lock;
963                                    tracing::trace!(
964                                        "AdvancedSubscriber{{key_expr: {}}}: Detect late joiner publishers with zid={}",
965                                        states.key_expr,
966                                        parsed.zid().as_str()
967                                    );
968                                    states.global_pending_queries += 1;
969
970                                    let mut params = Parameters::empty();
971                                    if let Some(max) = historyconf.sample_depth {
972                                        params.insert("_max", max.to_string());
973                                    }
974                                    if let Some(age) = historyconf.age {
975                                        params.set_time_range(TimeRange {
976                                            start: TimeBound::Inclusive(TimeExpr::Now {
977                                                offset_secs: -age,
978                                            }),
979                                            end: TimeBound::Unbounded,
980                                        });
981                                    }
982                                    tracing::trace!(
983                                        "AdvancedSubscriber{{key_expr: {}}}: Querying historical samples {}?{}",
984                                        states.key_expr,
985                                        s.key_expr(),
986                                        params,
987                                    );
988                                    drop(lock);
989
990                                    let handler = InitialRepliesHandler {
991                                        statesref: statesref.clone(),
992                                    };
993                                    let _ = session
994                                        .get(Selector::from((s.key_expr(), params)))
995                                        .callback({
996                                            let key_expr = key_expr.clone().into_owned();
997                                            move |r: Reply| {
998                                                if let Ok(s) = r.into_result() {
999                                                    if key_expr.intersects(s.key_expr()) {
1000                                                        let states =
1001                                                            &mut *zlock!(handler.statesref);
1002                                                        tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
1003                                                        handle_sample(states, s);
1004                                                    }
1005                                                }
1006                                            }
1007                                        })
1008                                        .consolidation(ConsolidationMode::None)
1009                                        .accept_replies(ReplyKeyExpr::Any)
1010                                        .target(query_target)
1011                                        .timeout(query_timeout)
1012                                        .wait();
1013                                }
1014                            } else {
1015                                tracing::warn!(
1016                                    "AdvancedSubscriber{{}}: Received malformed liveliness token key expression: {}",
1017                                    s.key_expr()
1018                                );
1019                            }
1020                        }
1021                    }
1022                };
1023
1024                tracing::debug!(
1025                    "AdvancedSubscriber{{key_expr: {}}}: Detect late joiner publishers on {}",
1026                    key_expr,
1027                    &key_expr / KE_ADV_PREFIX / KE_PUB / KE_STARSTAR
1028                );
1029                Some(
1030                    conf.session
1031                        .liveliness()
1032                        .declare_subscriber(&key_expr / KE_ADV_PREFIX / KE_PUB / KE_STARSTAR)
1033                        // .declare_subscriber(keformat!(ke_liveliness_all::formatter(), zid = 0, eid = 0, remaining = key_expr).unwrap())
1034                        .history(true)
1035                        .callback(live_callback)
1036                        .wait()?,
1037                )
1038            } else {
1039                None
1040            }
1041        } else {
1042            None
1043        };
1044
1045        let heartbeat_subscriber = if retransmission.is_some_and(|r| r.heartbeat) {
1046            let ke_heartbeat_sub = &key_expr / KE_ADV_PREFIX / KE_PUB / KE_STARSTAR;
1047            let statesref = statesref.clone();
1048            tracing::debug!(
1049                "AdvancedSubscriber{{key_expr: {}}}: Enable heartbeat subscriber on {}",
1050                key_expr,
1051                ke_heartbeat_sub
1052            );
1053            let heartbeat_sub = conf
1054                .session
1055                .declare_subscriber(ke_heartbeat_sub)
1056                .callback(move |sample_hb| {
1057                    if sample_hb.kind() != SampleKind::Put {
1058                        return;
1059                    }
1060
1061                    let heartbeat_keyexpr = sample_hb.key_expr().as_keyexpr();
1062                    let Ok(parsed_keyexpr) = ke_liveliness::parse(heartbeat_keyexpr) else {
1063                        return;
1064                    };
1065                    let source_id = {
1066                        let Ok(zid) = ZenohId::from_str(parsed_keyexpr.zid().as_str()) else {
1067                            return;
1068                        };
1069                        let Ok(eid) = EntityId::from_str(parsed_keyexpr.eid().as_str()) else {
1070                            return;
1071                        };
1072                        EntityGlobalId::new(zid, eid)
1073                    };
1074
1075                    let Ok(heartbeat_sn) = z_deserialize::<u32>(sample_hb.payload()) else {
1076                        tracing::debug!(
1077                            "AdvancedSubscriber{{}}: Skipping invalid heartbeat payload on '{}'",
1078                            heartbeat_keyexpr
1079                        );
1080                        return;
1081                    };
1082
1083                    let mut lock = zlock!(statesref);
1084                    let states = &mut *lock;
1085                    let entry = states.sequenced_states.entry(source_id);
1086                    if matches!(&entry, Entry::Vacant(_)) {
1087                        // NOTE: API does not allow both heartbeat and periodic_queries
1088                        spawn_periodoic_queries!(states, source_id, statesref.clone());
1089                        if states.global_pending_queries > 0 {
1090                            tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Skipping heartbeat on '{}' from publisher that is currently being pulled by global query", states.key_expr, heartbeat_keyexpr);
1091                            return;
1092                        }
1093                    }
1094
1095                    let state = entry.or_insert(SourceState::<u32> {
1096                        last_delivered: None,
1097                        pending_queries: 0,
1098                        pending_samples: BTreeMap::new(),
1099                    });
1100
1101                    // check that it's not an old sn, and that there are no pending queries
1102                    if (state.last_delivered.is_none()
1103                        || state.last_delivered.is_some_and(|sn| heartbeat_sn > sn))
1104                        && state.pending_queries == 0
1105                    {
1106                        let seq_num_range = seq_num_range(
1107                            state.last_delivered.map(|s| s + 1),
1108                            Some(heartbeat_sn),
1109                        );
1110
1111                        let session = states.session.clone();
1112                        let key_expr = states.key_expr.clone().into_owned();
1113                        let query_target = states.query_target;
1114                        let query_timeout = states.query_timeout;
1115                        state.pending_queries += 1;
1116
1117                        tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Querying missing samples {}?{}", states.key_expr, heartbeat_keyexpr, seq_num_range);
1118                        drop(lock);
1119
1120                        let handler = SequencedRepliesHandler {
1121                            source_id,
1122                            statesref: statesref.clone(),
1123                        };
1124                        let _ = session
1125                            .get(Selector::from((heartbeat_keyexpr, seq_num_range)))
1126                            .callback({
1127                                move |r: Reply| {
1128                                    if let Ok(s) = r.into_result() {
1129                                        if key_expr.intersects(s.key_expr()) {
1130                                            let states = &mut *zlock!(handler.statesref);
1131                                            tracing::trace!("AdvancedSubscriber{{key_expr: {}}}: Received reply with Sample{{info:{:?}, ts:{:?}}}", states.key_expr, s.source_info(), s.timestamp());
1132                                            handle_sample(states, s);
1133                                        }
1134                                    }
1135                                }
1136                            })
1137                            .consolidation(ConsolidationMode::None)
1138                            .accept_replies(ReplyKeyExpr::Any)
1139                            .target(query_target)
1140                            .timeout(query_timeout)
1141                            .wait();
1142                    }
1143                })
1144                .allowed_origin(conf.origin)
1145                .wait()?;
1146            Some(heartbeat_sub)
1147        } else {
1148            None
1149        };
1150
1151        if conf.liveliness {
1152            let suffix = KE_ADV_PREFIX
1153                / KE_SUB
1154                / &subscriber.id().zid().into_keyexpr()
1155                / &KeyExpr::try_from(subscriber.id().eid().to_string()).unwrap();
1156            let suffix = match meta {
1157                Some(meta) => suffix / &meta,
1158                // We need this empty chunk because af a routing matching bug
1159                _ => suffix / KE_EMPTY,
1160            };
1161            tracing::debug!(
1162                "AdvancedSubscriber{{key_expr: {}}}: Declare liveliness token {}",
1163                key_expr,
1164                &key_expr / &suffix,
1165            );
1166            let token = conf
1167                .session
1168                .liveliness()
1169                .declare_token(&key_expr / &suffix)
1170                .wait()?;
1171            zlock!(statesref).token = Some(token)
1172        }
1173
1174        let reliable_subscriber = AdvancedSubscriber {
1175            statesref,
1176            subscriber,
1177            receiver,
1178            liveliness_subscriber,
1179            heartbeat_subscriber,
1180        };
1181
1182        Ok(reliable_subscriber)
1183    }
1184
1185    /// Returns the [`EntityGlobalId`] of this AdvancedSubscriber.
1186    #[zenoh_macros::unstable]
1187    pub fn id(&self) -> EntityGlobalId {
1188        self.subscriber.id()
1189    }
1190
1191    /// Returns the [`KeyExpr`] this subscriber subscribes to.
1192    #[zenoh_macros::unstable]
1193    pub fn key_expr(&self) -> &KeyExpr<'static> {
1194        self.subscriber.key_expr()
1195    }
1196
1197    /// Returns a reference to this subscriber's handler.
1198    /// An handler is anything that implements [`zenoh::handlers::IntoHandler`].
1199    /// The default handler is [`zenoh::handlers::DefaultHandler`].
1200    #[zenoh_macros::unstable]
1201    pub fn handler(&self) -> &Handler {
1202        &self.receiver
1203    }
1204
1205    /// Returns a mutable reference to this subscriber's handler.
1206    /// An handler is anything that implements [`zenoh::handlers::IntoHandler`].
1207    /// The default handler is [`zenoh::handlers::DefaultHandler`].
1208    #[zenoh_macros::unstable]
1209    pub fn handler_mut(&mut self) -> &mut Handler {
1210        &mut self.receiver
1211    }
1212
1213    /// Declares a listener to detect missed samples.
1214    ///
1215    /// Missed samples can only be detected from [`AdvancedPublisher`](crate::AdvancedPublisher) that
1216    /// enable [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection).
1217    #[zenoh_macros::unstable]
1218    pub fn sample_miss_listener(&self) -> SampleMissListenerBuilder<'_, DefaultHandler> {
1219        SampleMissListenerBuilder {
1220            statesref: &self.statesref,
1221            handler: DefaultHandler::default(),
1222        }
1223    }
1224
1225    /// Declares a listener to detect matching publishers.
1226    ///
1227    /// Only [`AdvancedPublisher`](crate::AdvancedPublisher) that enable
1228    /// [`publisher_detection`](crate::AdvancedPublisherBuilder::publisher_detection) can be detected.
1229    #[zenoh_macros::unstable]
1230    pub fn detect_publishers(&self) -> LivelinessSubscriberBuilder<'_, '_, DefaultHandler> {
1231        self.subscriber
1232            .session()
1233            .liveliness()
1234            .declare_subscriber(self.subscriber.key_expr() / KE_ADV_PREFIX / KE_PUB / KE_STARSTAR)
1235    }
1236
1237    /// Undeclares this AdvancedSubscriber
1238    #[inline]
1239    #[zenoh_macros::unstable]
1240    pub fn undeclare(self) -> impl Resolve<ZResult<()>> {
1241        tracing::debug!(
1242            "AdvancedSubscriber{{key_expr: {}}}: Undeclare",
1243            self.key_expr()
1244        );
1245        self.subscriber.undeclare()
1246    }
1247
1248    fn set_background_impl(&mut self, background: bool) {
1249        self.subscriber.set_background(background);
1250        if let Some(mut liveliness_sub) = self.liveliness_subscriber.take() {
1251            liveliness_sub.set_background(background);
1252        }
1253        if let Some(mut heartbeat_sub) = self.heartbeat_subscriber.take() {
1254            heartbeat_sub.set_background(background);
1255        }
1256    }
1257
1258    #[zenoh_macros::internal]
1259    pub fn set_background(&mut self, background: bool) {
1260        self.set_background_impl(background)
1261    }
1262}
1263
1264#[zenoh_macros::unstable]
1265#[inline]
1266fn flush_sequenced_source(
1267    state: &mut SourceState<u32>,
1268    callback: &Callback<Sample>,
1269    source_id: &EntityGlobalId,
1270    miss_handlers: &HashMap<usize, Callback<Miss>>,
1271) {
1272    if state.pending_queries == 0 && !state.pending_samples.is_empty() {
1273        let mut pending_samples = BTreeMap::new();
1274        std::mem::swap(&mut state.pending_samples, &mut pending_samples);
1275        for (seq_num, sample) in pending_samples {
1276            match state.last_delivered {
1277                None => {
1278                    state.last_delivered = Some(seq_num);
1279                    callback.call(sample);
1280                }
1281                Some(last) if seq_num == last + 1 => {
1282                    state.last_delivered = Some(seq_num);
1283                    callback.call(sample);
1284                }
1285                Some(last) if seq_num > last + 1 => {
1286                    tracing::warn!(
1287                        "Sample missed: missed {} samples from {:?}.",
1288                        seq_num - last - 1,
1289                        source_id,
1290                    );
1291                    for miss_callback in miss_handlers.values() {
1292                        miss_callback.call(Miss {
1293                            source: *source_id,
1294                            nb: seq_num - last - 1,
1295                        })
1296                    }
1297                    state.last_delivered = Some(seq_num);
1298                    callback.call(sample);
1299                }
1300                _ => {
1301                    // duplicate
1302                }
1303            }
1304        }
1305    }
1306}
1307
1308#[zenoh_macros::unstable]
1309#[inline]
1310fn flush_timestamped_source(state: &mut SourceState<Timestamp>, callback: &Callback<Sample>) {
1311    if state.pending_queries == 0 && !state.pending_samples.is_empty() {
1312        let mut pending_samples = BTreeMap::new();
1313        std::mem::swap(&mut state.pending_samples, &mut pending_samples);
1314        for (timestamp, sample) in pending_samples {
1315            if state
1316                .last_delivered
1317                .map(|last| timestamp > last)
1318                .unwrap_or(true)
1319            {
1320                state.last_delivered = Some(timestamp);
1321                callback.call(sample);
1322            }
1323        }
1324    }
1325}
1326
1327#[zenoh_macros::unstable]
1328#[derive(Clone)]
1329struct InitialRepliesHandler {
1330    statesref: Arc<Mutex<State>>,
1331}
1332
1333#[zenoh_macros::unstable]
1334impl Drop for InitialRepliesHandler {
1335    fn drop(&mut self) {
1336        let states = &mut *zlock!(self.statesref);
1337        states.global_pending_queries = states.global_pending_queries.saturating_sub(1);
1338        tracing::trace!(
1339            "AdvancedSubscriber{{key_expr: {}}}: Flush initial replies",
1340            states.key_expr
1341        );
1342
1343        if states.global_pending_queries == 0 {
1344            for (source_id, state) in states.sequenced_states.iter_mut() {
1345                flush_sequenced_source(state, &states.callback, source_id, &states.miss_handlers);
1346                spawn_periodoic_queries!(states, *source_id, self.statesref.clone());
1347            }
1348            for state in states.timestamped_states.values_mut() {
1349                flush_timestamped_source(state, &states.callback);
1350            }
1351        }
1352    }
1353}
1354
1355#[zenoh_macros::unstable]
1356#[derive(Clone)]
1357struct SequencedRepliesHandler {
1358    source_id: EntityGlobalId,
1359    statesref: Arc<Mutex<State>>,
1360}
1361
1362#[zenoh_macros::unstable]
1363impl Drop for SequencedRepliesHandler {
1364    fn drop(&mut self) {
1365        let states = &mut *zlock!(self.statesref);
1366        if let Some(state) = states.sequenced_states.get_mut(&self.source_id) {
1367            state.pending_queries = state.pending_queries.saturating_sub(1);
1368            if states.global_pending_queries == 0 {
1369                tracing::trace!(
1370                    "AdvancedSubscriber{{key_expr: {}}}: Flush sequenced samples",
1371                    states.key_expr
1372                );
1373                flush_sequenced_source(
1374                    state,
1375                    &states.callback,
1376                    &self.source_id,
1377                    &states.miss_handlers,
1378                )
1379            }
1380        }
1381    }
1382}
1383
1384#[zenoh_macros::unstable]
1385#[derive(Clone)]
1386struct TimestampedRepliesHandler {
1387    id: ID,
1388    statesref: Arc<Mutex<State>>,
1389    callback: Callback<Sample>,
1390}
1391
1392#[zenoh_macros::unstable]
1393impl Drop for TimestampedRepliesHandler {
1394    fn drop(&mut self) {
1395        let states = &mut *zlock!(self.statesref);
1396        if let Some(state) = states.timestamped_states.get_mut(&self.id) {
1397            state.pending_queries = state.pending_queries.saturating_sub(1);
1398            if states.global_pending_queries == 0 {
1399                tracing::trace!(
1400                    "AdvancedSubscriber{{key_expr: {}}}: Flush timestamped samples",
1401                    states.key_expr
1402                );
1403                flush_timestamped_source(state, &self.callback);
1404            }
1405        }
1406    }
1407}
1408
1409/// A struct that represent missed samples.
1410#[zenoh_macros::unstable]
1411#[derive(Debug, Clone)]
1412pub struct Miss {
1413    source: EntityGlobalId,
1414    nb: u32,
1415}
1416
1417impl Miss {
1418    /// The source of missed samples.
1419    pub fn source(&self) -> EntityGlobalId {
1420        self.source
1421    }
1422
1423    /// The number of missed samples.
1424    pub fn nb(&self) -> u32 {
1425        self.nb
1426    }
1427}
1428
1429impl CallbackParameter for Miss {
1430    type Message<'a> = Self;
1431
1432    fn from_message(msg: Self::Message<'_>) -> Self {
1433        msg
1434    }
1435}
1436
1437/// A listener to detect missed samples.
1438///
1439/// Missed samples can only be detected from [`AdvancedPublisher`](crate::AdvancedPublisher) that
1440/// enable [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection).
1441#[zenoh_macros::unstable]
1442pub struct SampleMissListener<Handler> {
1443    id: usize,
1444    statesref: Arc<Mutex<State>>,
1445    handler: Handler,
1446    undeclare_on_drop: bool,
1447}
1448
1449#[zenoh_macros::unstable]
1450impl<Handler> SampleMissListener<Handler> {
1451    #[inline]
1452    pub fn undeclare(self) -> SampleMissHandlerUndeclaration<Handler>
1453    where
1454        Handler: Send,
1455    {
1456        // self.undeclare_inner(())
1457        SampleMissHandlerUndeclaration(self)
1458    }
1459
1460    fn undeclare_impl(&mut self) -> ZResult<()> {
1461        // set the flag first to avoid double panic if this function panic
1462        self.undeclare_on_drop = false;
1463        zlock!(self.statesref).unregister_miss_callback(&self.id);
1464        Ok(())
1465    }
1466
1467    #[zenoh_macros::internal]
1468    pub fn set_background(&mut self, background: bool) {
1469        self.undeclare_on_drop = !background;
1470    }
1471}
1472
1473#[cfg(feature = "unstable")]
1474impl<Handler> Drop for SampleMissListener<Handler> {
1475    fn drop(&mut self) {
1476        if self.undeclare_on_drop {
1477            if let Err(error) = self.undeclare_impl() {
1478                tracing::error!(error);
1479            }
1480        }
1481    }
1482}
1483
1484// #[zenoh_macros::unstable]
1485// impl<Handler: Send> UndeclarableSealed<()> for SampleMissHandler<Handler> {
1486//     type Undeclaration = SampleMissHandlerUndeclaration<Handler>;
1487
1488//     fn undeclare_inner(self, _: ()) -> Self::Undeclaration {
1489//         SampleMissHandlerUndeclaration(self)
1490//     }
1491// }
1492
1493#[zenoh_macros::unstable]
1494impl<Handler> std::ops::Deref for SampleMissListener<Handler> {
1495    type Target = Handler;
1496
1497    fn deref(&self) -> &Self::Target {
1498        &self.handler
1499    }
1500}
1501#[zenoh_macros::unstable]
1502impl<Handler> std::ops::DerefMut for SampleMissListener<Handler> {
1503    fn deref_mut(&mut self) -> &mut Self::Target {
1504        &mut self.handler
1505    }
1506}
1507
1508/// A [`Resolvable`] returned when undeclaring a [`SampleMissListener`].
1509#[zenoh_macros::unstable]
1510pub struct SampleMissHandlerUndeclaration<Handler>(SampleMissListener<Handler>);
1511
1512#[zenoh_macros::unstable]
1513impl<Handler> Resolvable for SampleMissHandlerUndeclaration<Handler> {
1514    type To = ZResult<()>;
1515}
1516
1517#[zenoh_macros::unstable]
1518impl<Handler> Wait for SampleMissHandlerUndeclaration<Handler> {
1519    fn wait(mut self) -> <Self as Resolvable>::To {
1520        self.0.undeclare_impl()
1521    }
1522}
1523
1524#[zenoh_macros::unstable]
1525impl<Handler> IntoFuture for SampleMissHandlerUndeclaration<Handler> {
1526    type Output = <Self as Resolvable>::To;
1527    type IntoFuture = Ready<<Self as Resolvable>::To>;
1528
1529    fn into_future(self) -> Self::IntoFuture {
1530        std::future::ready(self.wait())
1531    }
1532}
1533
1534/// A builder for initializing a [`SampleMissListener`].
1535#[zenoh_macros::unstable]
1536pub struct SampleMissListenerBuilder<'a, Handler, const BACKGROUND: bool = false> {
1537    statesref: &'a Arc<Mutex<State>>,
1538    handler: Handler,
1539}
1540
1541#[zenoh_macros::unstable]
1542impl<'a> SampleMissListenerBuilder<'a, DefaultHandler> {
1543    /// Receive the sample miss notification with a callback.
1544    #[inline]
1545    #[zenoh_macros::unstable]
1546    pub fn callback<F>(self, callback: F) -> SampleMissListenerBuilder<'a, Callback<Miss>>
1547    where
1548        F: Fn(Miss) + Send + Sync + 'static,
1549    {
1550        self.with(Callback::from(callback))
1551    }
1552
1553    /// Receive the sample miss notification with a mutable callback.
1554    #[inline]
1555    #[zenoh_macros::unstable]
1556    pub fn callback_mut<F>(self, callback: F) -> SampleMissListenerBuilder<'a, Callback<Miss>>
1557    where
1558        F: FnMut(Miss) + Send + Sync + 'static,
1559    {
1560        self.callback(zenoh::handlers::locked(callback))
1561    }
1562
1563    /// Receive the sample miss notification with a [`Handler`](IntoHandler).
1564    #[inline]
1565    #[zenoh_macros::unstable]
1566    pub fn with<Handler>(self, handler: Handler) -> SampleMissListenerBuilder<'a, Handler>
1567    where
1568        Handler: IntoHandler<Miss>,
1569    {
1570        SampleMissListenerBuilder {
1571            statesref: self.statesref,
1572            handler,
1573        }
1574    }
1575}
1576
1577#[zenoh_macros::unstable]
1578impl<'a> SampleMissListenerBuilder<'a, Callback<Miss>> {
1579    /// Register the sample miss notification callback to be run in background until the adanced subscriber is undeclared.
1580    ///
1581    /// Background builder doesn't return a `SampleMissHandler` object anymore.
1582    #[zenoh_macros::unstable]
1583    pub fn background(self) -> SampleMissListenerBuilder<'a, Callback<Miss>, true> {
1584        SampleMissListenerBuilder {
1585            statesref: self.statesref,
1586            handler: self.handler,
1587        }
1588    }
1589}
1590
1591#[zenoh_macros::unstable]
1592impl<Handler> Resolvable for SampleMissListenerBuilder<'_, Handler>
1593where
1594    Handler: IntoHandler<Miss> + Send,
1595    Handler::Handler: Send,
1596{
1597    type To = ZResult<SampleMissListener<Handler::Handler>>;
1598}
1599
1600#[zenoh_macros::unstable]
1601impl<Handler> Wait for SampleMissListenerBuilder<'_, Handler>
1602where
1603    Handler: IntoHandler<Miss> + Send,
1604    Handler::Handler: Send,
1605{
1606    #[zenoh_macros::unstable]
1607    fn wait(self) -> <Self as Resolvable>::To {
1608        let (callback, handler) = self.handler.into_handler();
1609        let id = zlock!(self.statesref).register_miss_callback(callback);
1610        Ok(SampleMissListener {
1611            id,
1612            statesref: self.statesref.clone(),
1613            handler,
1614            undeclare_on_drop: true,
1615        })
1616    }
1617}
1618
1619#[zenoh_macros::unstable]
1620impl<Handler> IntoFuture for SampleMissListenerBuilder<'_, Handler>
1621where
1622    Handler: IntoHandler<Miss> + Send,
1623    Handler::Handler: Send,
1624{
1625    type Output = <Self as Resolvable>::To;
1626    type IntoFuture = Ready<<Self as Resolvable>::To>;
1627
1628    #[zenoh_macros::unstable]
1629    fn into_future(self) -> Self::IntoFuture {
1630        std::future::ready(self.wait())
1631    }
1632}
1633
1634#[zenoh_macros::unstable]
1635impl Resolvable for SampleMissListenerBuilder<'_, Callback<Miss>, true> {
1636    type To = ZResult<()>;
1637}
1638
1639#[zenoh_macros::unstable]
1640impl Wait for SampleMissListenerBuilder<'_, Callback<Miss>, true> {
1641    #[zenoh_macros::unstable]
1642    fn wait(self) -> <Self as Resolvable>::To {
1643        let (callback, _) = self.handler.into_handler();
1644        zlock!(self.statesref).register_miss_callback(callback);
1645        Ok(())
1646    }
1647}
1648
1649#[zenoh_macros::unstable]
1650impl IntoFuture for SampleMissListenerBuilder<'_, Callback<Miss>, true> {
1651    type Output = <Self as Resolvable>::To;
1652    type IntoFuture = Ready<<Self as Resolvable>::To>;
1653
1654    #[zenoh_macros::unstable]
1655    fn into_future(self) -> Self::IntoFuture {
1656        std::future::ready(self.wait())
1657    }
1658}