zenoh_ext/
advanced_publisher.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::{
15    future::{IntoFuture, Ready},
16    sync::{
17        atomic::{AtomicU32, Ordering},
18        Arc,
19    },
20    time::Duration,
21};
22
23use zenoh::{
24    bytes::{Encoding, OptionZBytes, ZBytes},
25    internal::{
26        bail,
27        runtime::ZRuntime,
28        traits::{
29            EncodingBuilderTrait, QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait,
30        },
31        TerminatableTask,
32    },
33    key_expr::{keyexpr, KeyExpr},
34    liveliness::LivelinessToken,
35    pubsub::{
36        PublicationBuilder, PublicationBuilderDelete, PublicationBuilderPut, Publisher,
37        PublisherBuilder,
38    },
39    qos::{CongestionControl, Priority, Reliability},
40    sample::{Locality, SourceInfo},
41    session::EntityGlobalId,
42    Resolvable, Resolve, Result as ZResult, Session, Wait, KE_ADV_PREFIX, KE_EMPTY,
43};
44use zenoh_macros::ke;
45
46use crate::{
47    advanced_cache::{AdvancedCache, AdvancedCacheBuilder, CacheConfig, KE_UHLC},
48    z_serialize,
49};
50
51pub(crate) static KE_PUB: &keyexpr = ke!("pub");
52
53#[derive(PartialEq)]
54#[zenoh_macros::unstable]
55pub(crate) enum Sequencing {
56    None,
57    Timestamp,
58    SequenceNumber,
59}
60
61/// Configuration for sample miss detection
62///
63/// Enabling [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection) in [`AdvancedPublisher`](crate::AdvancedPublisher)
64/// allows [`AdvancedSubscribers`](crate::AdvancedSubscriber) to detect missed samples
65/// through [`sample_miss_listener`](crate::AdvancedSubscriber::sample_miss_listener)
66/// and to recover missed samples through [`recovery`](crate::AdvancedSubscriberBuilder::recovery).
67#[zenoh_macros::unstable]
68#[derive(Debug, Default, Clone)]
69pub struct MissDetectionConfig {
70    pub(crate) state_publisher: Option<(Duration, bool)>,
71}
72
73#[zenoh_macros::unstable]
74impl MissDetectionConfig {
75    /// Allow last sample miss detection through periodic heartbeat.
76    ///
77    /// Periodically send the last published sample's sequence number to allow last sample recovery.
78    ///
79    /// [`heartbeat`](MissDetectionConfig::heartbeat) and [`sporadic_heartbeat`](MissDetectionConfig::sporadic_heartbeat)
80    /// are mutually exclusive. Enabling one will disable the other.
81    ///
82    /// [`AdvancedSubscribers`](crate::AdvancedSubscriber) can recover last sample with the
83    /// [`heartbeat`](crate::advanced_subscriber::RecoveryConfig::heartbeat) option.
84    #[zenoh_macros::unstable]
85    pub fn heartbeat(mut self, period: Duration) -> Self {
86        self.state_publisher = Some((period, false));
87        self
88    }
89
90    /// Allow last sample miss detection through sporadic heartbeat.
91    ///
92    /// Each period, the last published sample's sequence number is sent with [`CongestionControl::Block`]
93    /// but only if it changed since the last period.
94    ///
95    /// [`heartbeat`](MissDetectionConfig::heartbeat) and [`sporadic_heartbeat`](MissDetectionConfig::sporadic_heartbeat)
96    /// are mutually exclusive. Enabling one will disable the other.
97    ///
98    /// [`AdvancedSubscribers`](crate::AdvancedSubscriber) can recover last sample with the
99    /// [`heartbeat`](crate::advanced_subscriber::RecoveryConfig::heartbeat) option.
100    #[zenoh_macros::unstable]
101    pub fn sporadic_heartbeat(mut self, period: Duration) -> Self {
102        self.state_publisher = Some((period, true));
103        self
104    }
105}
106
107/// The builder of AdvancedPublisher, allowing to configure it.
108#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"]
109#[zenoh_macros::unstable]
110pub struct AdvancedPublisherBuilder<'a, 'b, 'c> {
111    session: &'a Session,
112    pub_key_expr: ZResult<KeyExpr<'b>>,
113    encoding: Encoding,
114    destination: Locality,
115    reliability: Reliability,
116    congestion_control: CongestionControl,
117    priority: Priority,
118    is_express: bool,
119    meta_key_expr: Option<ZResult<KeyExpr<'c>>>,
120    sequencing: Sequencing,
121    miss_config: Option<MissDetectionConfig>,
122    liveliness: bool,
123    cache: bool,
124    history: CacheConfig,
125}
126
127#[zenoh_macros::unstable]
128impl<'a, 'b, 'c> AdvancedPublisherBuilder<'a, 'b, 'c> {
129    #[zenoh_macros::unstable]
130    pub(crate) fn new(builder: PublisherBuilder<'a, 'b>) -> AdvancedPublisherBuilder<'a, 'b, 'c> {
131        AdvancedPublisherBuilder {
132            session: builder.session,
133            pub_key_expr: builder.key_expr,
134            encoding: builder.encoding,
135            destination: builder.destination,
136            reliability: builder.reliability,
137            congestion_control: builder.congestion_control,
138            priority: builder.priority,
139            is_express: builder.is_express,
140            meta_key_expr: None,
141            sequencing: Sequencing::None,
142            miss_config: None,
143            liveliness: false,
144            cache: false,
145            history: CacheConfig::default(),
146        }
147    }
148
149    /// Changes the [`zenoh::sample::Locality`] applied when routing the data.
150    ///
151    /// This restricts the matching subscribers that will receive the published data to the ones
152    /// that have the given [`zenoh::sample::Locality`].
153    #[zenoh_macros::unstable]
154    #[inline]
155    pub fn allowed_destination(mut self, destination: Locality) -> Self {
156        self.destination = destination;
157        self
158    }
159
160    /// Changes the [`zenoh::qos::Reliability`] to apply when routing the data.
161    ///
162    /// **NOTE**: Currently `reliability` does not trigger any data retransmission on the wire. It
163    ///   is rather used as a marker on the wire and it may be used to select the best link
164    ///   available (e.g. TCP for reliable data and UDP for best effort data).
165    #[zenoh_macros::unstable]
166    #[inline]
167    pub fn reliability(self, reliability: Reliability) -> Self {
168        Self {
169            reliability,
170            ..self
171        }
172    }
173
174    /// Allow matching [`AdvancedSubscribers`](crate::AdvancedSubscriber) to detect lost samples and optionally ask for retransmission.
175    ///
176    /// Retransmission can only be achieved if [`cache`](crate::AdvancedPublisherBuilder::cache) is enabled.
177    #[zenoh_macros::unstable]
178    pub fn sample_miss_detection(mut self, config: MissDetectionConfig) -> Self {
179        self.sequencing = Sequencing::SequenceNumber;
180        self.miss_config = Some(config);
181        self
182    }
183
184    /// Attach a cache to this [`AdvancedPublisher`].
185    ///
186    /// The cache can be used for history and/or recovery.
187    #[zenoh_macros::unstable]
188    pub fn cache(mut self, config: CacheConfig) -> Self {
189        self.cache = true;
190        if self.sequencing == Sequencing::None {
191            self.sequencing = Sequencing::Timestamp;
192        }
193        self.history = config;
194        self
195    }
196
197    /// Allow this [`AdvancedPublisher`] to be detected by [`AdvancedSubscribers`](crate::AdvancedSubscriber).
198    ///
199    /// This allows [`AdvancedSubscribers`](crate::AdvancedSubscriber) to retrieve the local history.
200    #[zenoh_macros::unstable]
201    pub fn publisher_detection(mut self) -> Self {
202        self.liveliness = true;
203        self
204    }
205
206    /// A key expression added to the liveliness token key expression
207    /// and to the cache queryable key expression.
208    /// It can be used to convey meta data.
209    #[zenoh_macros::unstable]
210    pub fn publisher_detection_metadata<TryIntoKeyExpr>(mut self, meta: TryIntoKeyExpr) -> Self
211    where
212        TryIntoKeyExpr: TryInto<KeyExpr<'c>>,
213        <TryIntoKeyExpr as TryInto<KeyExpr<'c>>>::Error: Into<zenoh::Error>,
214    {
215        self.meta_key_expr = Some(meta.try_into().map_err(Into::into));
216        self
217    }
218}
219
220#[zenoh_macros::internal_trait]
221#[zenoh_macros::unstable]
222impl EncodingBuilderTrait for AdvancedPublisherBuilder<'_, '_, '_> {
223    #[zenoh_macros::unstable]
224    fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
225        Self {
226            encoding: encoding.into(),
227            ..self
228        }
229    }
230}
231
232#[zenoh_macros::internal_trait]
233#[zenoh_macros::unstable]
234impl QoSBuilderTrait for AdvancedPublisherBuilder<'_, '_, '_> {
235    /// Changes the [`zenoh::qos::CongestionControl`] to apply when routing the data.
236    #[inline]
237    #[zenoh_macros::unstable]
238    fn congestion_control(self, congestion_control: CongestionControl) -> Self {
239        Self {
240            congestion_control,
241            ..self
242        }
243    }
244
245    /// Changes the [`zenoh::qos::Priority`] of the written data.
246    #[inline]
247    #[zenoh_macros::unstable]
248    fn priority(self, priority: Priority) -> Self {
249        Self { priority, ..self }
250    }
251
252    /// Changes the Express policy to apply when routing the data.
253    ///
254    /// When express is set to `true`, then the message will not be batched.
255    /// This usually has a positive impact on latency but negative impact on throughput.
256    #[inline]
257    #[zenoh_macros::unstable]
258    fn express(self, is_express: bool) -> Self {
259        Self { is_express, ..self }
260    }
261}
262
263#[zenoh_macros::unstable]
264impl<'b> Resolvable for AdvancedPublisherBuilder<'_, 'b, '_> {
265    type To = ZResult<AdvancedPublisher<'b>>;
266}
267
268#[zenoh_macros::unstable]
269impl Wait for AdvancedPublisherBuilder<'_, '_, '_> {
270    #[zenoh_macros::unstable]
271    fn wait(self) -> <Self as Resolvable>::To {
272        AdvancedPublisher::new(self)
273    }
274}
275
276#[zenoh_macros::unstable]
277impl IntoFuture for AdvancedPublisherBuilder<'_, '_, '_> {
278    type Output = <Self as Resolvable>::To;
279    type IntoFuture = Ready<<Self as Resolvable>::To>;
280
281    #[zenoh_macros::unstable]
282    fn into_future(self) -> Self::IntoFuture {
283        std::future::ready(self.wait())
284    }
285}
286
287/// The extension to a [`Publisher`](zenoh::pubsub::Publisher) providing advanced functionalities.
288///
289/// The `AdvancedPublisher` is constructed over a regular [`Publisher`](zenoh::pubsub::Publisher) through
290/// [`advanced`](crate::AdvancedPublisherBuilderExt::advanced) method or by using
291/// any other method of [`AdvancedPublisherBuilder`](crate::AdvancedPublisherBuilder).
292///
293/// The `AdvancedPublisher` works with [`AdvancedSubscriber`](crate::AdvancedSubscriber) to provide additional functionalities such as:
294/// - [`cache`](crate::AdvancedPublisherBuilderExt::cache) last published samples to be retrieved by
295///   [`AdvancedSubscriber`](crate::AdvancedSubscriber)'s [`history`](crate::AdvancedSubscriberBuilderExt::history) mechanism
296/// - [`sample_miss_detection`](crate::AdvancedPublisherBuilderExt::sample_miss_detection) to allow detecting missed samples
297///   using [`AdvancedSubscriber`](crate::AdvancedSubscriber)'s [`sample_miss_listener`](crate::AdvancedSubscriber::sample_miss_listener)
298/// - [`publisher_detection`](crate::AdvancedPublisherBuilderExt::publisher_detection) to create a Liveliness token to assert its presence and
299///   allow it to be requested for missed samples if [`detect_late_publishers`](crate::HistoryConfig::detect_late_publishers) is enabled
300///
301/// # Example
302/// ```no_run
303/// # #[tokio::main]
304/// # async fn main() {
305/// use zenoh_ext::{AdvancedPublisherBuilderExt, CacheConfig, MissDetectionConfig};
306/// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
307/// let publisher = session
308///     .declare_publisher("key/expression")
309///     .cache(CacheConfig::default().max_samples(10))
310///     .sample_miss_detection(
311///         MissDetectionConfig::default().heartbeat(std::time::Duration::from_secs(1))
312///     )
313///     .publisher_detection()
314///     .await
315///     .unwrap();
316/// publisher.put("Value").await.unwrap();
317/// # }
318/// ```
319#[zenoh_macros::unstable]
320pub struct AdvancedPublisher<'a> {
321    publisher: Publisher<'a>,
322    seqnum: Option<Arc<AtomicU32>>,
323    cache: Option<AdvancedCache>,
324    _token: Option<LivelinessToken>,
325    _state_publisher: Option<TerminatableTask>,
326}
327
328#[zenoh_macros::unstable]
329impl<'a> AdvancedPublisher<'a> {
330    #[zenoh_macros::unstable]
331    fn new(conf: AdvancedPublisherBuilder<'_, 'a, '_>) -> ZResult<Self> {
332        let key_expr = conf.pub_key_expr?;
333        let meta = match conf.meta_key_expr {
334            Some(meta) => Some(meta?),
335            None => None,
336        };
337        tracing::debug!("Create AdvancedPublisher{{key_expr: {}}}", &key_expr);
338
339        let publisher = conf
340            .session
341            .declare_publisher(key_expr.clone())
342            .encoding(conf.encoding)
343            .allowed_destination(conf.destination)
344            .reliability(conf.reliability)
345            .congestion_control(conf.congestion_control)
346            .priority(conf.priority)
347            .express(conf.is_express)
348            .wait()?;
349        let id = publisher.id();
350        let suffix = KE_ADV_PREFIX / KE_PUB / &id.zid().into_keyexpr();
351        let suffix = match conf.sequencing {
352            Sequencing::SequenceNumber => {
353                suffix / &KeyExpr::try_from(id.eid().to_string()).unwrap()
354            }
355            _ => suffix / KE_UHLC,
356        };
357        let suffix = match meta {
358            Some(meta) => suffix / &meta,
359            // We need this empty chunk because of a routing matching bug
360            _ => suffix / KE_EMPTY,
361        };
362
363        let seqnum = match conf.sequencing {
364            Sequencing::SequenceNumber => Some(Arc::new(AtomicU32::new(0))),
365            Sequencing::Timestamp => {
366                if conf.session.hlc().is_none() {
367                    bail!(
368                        "Cannot create AdvancedPublisher {} with Sequencing::Timestamp: \
369                            the 'timestamping' setting must be enabled in the Zenoh configuration.",
370                        key_expr,
371                    )
372                }
373                None
374            }
375            _ => None,
376        };
377
378        let cache = if conf.cache {
379            Some(
380                AdvancedCacheBuilder::new(conf.session, Ok(key_expr.clone()))
381                    .history(conf.history)
382                    .queryable_suffix(&suffix)
383                    .wait()?,
384            )
385        } else {
386            None
387        };
388
389        let token = if conf.liveliness {
390            tracing::debug!(
391                "AdvancedPublisher{{key_expr: {}}}: Declare liveliness token {}",
392                key_expr,
393                &key_expr / &suffix,
394            );
395            Some(
396                conf.session
397                    .liveliness()
398                    .declare_token(&key_expr / &suffix)
399                    .wait()?,
400            )
401        } else {
402            None
403        };
404
405        let state_publisher = if let Some((period, sporadic)) =
406            conf.miss_config.as_ref().and_then(|c| c.state_publisher)
407        {
408            if let Some(seqnum) = seqnum.as_ref() {
409                tracing::debug!(
410                    "AdvancedPublisher{{key_expr: {}}}: Enable {}heartbeat on {} with period {:?}",
411                    key_expr,
412                    if sporadic { "sporadic " } else { "" },
413                    &key_expr / &suffix,
414                    period
415                );
416                let seqnum = seqnum.clone();
417                if !sporadic {
418                    let publisher = conf.session.declare_publisher(&key_expr / &suffix).wait()?;
419                    Some(TerminatableTask::spawn_abortable(
420                        ZRuntime::Net,
421                        async move {
422                            loop {
423                                tokio::time::sleep(period).await;
424                                let seqnum = seqnum.load(Ordering::Relaxed);
425                                if seqnum > 0 {
426                                    let _ = publisher.put(z_serialize(&(seqnum - 1))).await;
427                                }
428                            }
429                        },
430                    ))
431                } else {
432                    let mut last_seqnum = 0;
433                    let publisher = conf
434                        .session
435                        .declare_publisher(&key_expr / &suffix)
436                        .congestion_control(CongestionControl::Block)
437                        .wait()?;
438                    Some(TerminatableTask::spawn_abortable(
439                        ZRuntime::Net,
440                        async move {
441                            loop {
442                                tokio::time::sleep(period).await;
443                                let seqnum = seqnum.load(Ordering::Relaxed);
444                                if seqnum > last_seqnum {
445                                    let _ = publisher.put(z_serialize(&(seqnum - 1))).await;
446                                    last_seqnum = seqnum;
447                                }
448                            }
449                        },
450                    ))
451                }
452            } else {
453                None
454            }
455        } else {
456            None
457        };
458
459        Ok(AdvancedPublisher {
460            publisher,
461            seqnum,
462            cache,
463            _token: token,
464            _state_publisher: state_publisher,
465        })
466    }
467
468    /// Returns the [`EntityGlobalId`] of this Publisher.
469    ///
470    /// Wraps [`Publisher::id`](zenoh::pubsub::Publisher::id) method
471    #[zenoh_macros::unstable]
472    pub fn id(&self) -> EntityGlobalId {
473        self.publisher.id()
474    }
475
476    /// Returns the [`KeyExpr`] of this Publisher.
477    ///
478    /// Wraps [`Publisher::key_expr`](zenoh::pubsub::Publisher::key_expr) method
479    #[inline]
480    #[zenoh_macros::unstable]
481    pub fn key_expr(&self) -> &KeyExpr<'a> {
482        self.publisher.key_expr()
483    }
484
485    /// Get the [`Encoding`] used when publishing data.
486    ///
487    /// Wraps [`Publisher::encoding`](zenoh::pubsub::Publisher::encoding) method
488    #[inline]
489    #[zenoh_macros::unstable]
490    pub fn encoding(&self) -> &Encoding {
491        self.publisher.encoding()
492    }
493
494    /// Get the `congestion_control` applied when routing the data.
495    ///
496    /// Wraps [`Publisher::congestion_control`](zenoh::pubsub::Publisher::congestion_control) method
497    #[inline]
498    #[zenoh_macros::unstable]
499    pub fn congestion_control(&self) -> CongestionControl {
500        self.publisher.congestion_control()
501    }
502
503    /// Get the priority of the written data.
504    ///
505    /// Wraps [`Publisher::priority`](zenoh::pubsub::Publisher::priority) method
506    #[inline]
507    #[zenoh_macros::unstable]
508    pub fn priority(&self) -> Priority {
509        self.publisher.priority()
510    }
511
512    /// Put data.
513    ///
514    /// Wraps [`Publisher::put`](zenoh::pubsub::Publisher::put) method
515    ///
516    /// # Examples
517    /// ```
518    /// # #[tokio::main]
519    /// # async fn main() {
520    /// use zenoh_ext::AdvancedPublisherBuilderExt;
521    ///
522    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
523    /// let publisher = session.declare_publisher("key/expression").advanced().await.unwrap();
524    /// publisher.put("value").await.unwrap();
525    /// # }
526    /// ```
527    #[inline]
528    #[zenoh_macros::unstable]
529    pub fn put<IntoZBytes>(&self, payload: IntoZBytes) -> AdvancedPublisherPutBuilder<'_>
530    where
531        IntoZBytes: Into<ZBytes>,
532    {
533        let mut builder = self.publisher.put(payload);
534        if let Some(seqnum) = &self.seqnum {
535            let info = Some(SourceInfo::new(
536                self.publisher.id(),
537                seqnum.fetch_add(1, Ordering::Relaxed),
538            ));
539            tracing::trace!(
540                "AdvancedPublisher{{key_expr: {}}}: Put data with {:?}",
541                self.publisher.key_expr(),
542                info
543            );
544            builder = builder.source_info(info);
545        }
546        if let Some(hlc) = self.publisher.session().hlc() {
547            builder = builder.timestamp(hlc.new_timestamp());
548        }
549        AdvancedPublisherPutBuilder {
550            builder,
551            cache: self.cache.as_ref(),
552        }
553    }
554
555    /// Delete data.
556    ///
557    /// Wraps [`Publisher::delete`](zenoh::pubsub::Publisher::delete) method
558    ///
559    /// # Examples
560    /// ```
561    /// # #[tokio::main]
562    /// # async fn main() {
563    /// use zenoh_ext::AdvancedPublisherBuilderExt;
564    ///
565    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
566    /// let publisher = session.declare_publisher("key/expression").advanced().await.unwrap();
567    /// publisher.delete().await.unwrap();
568    /// # }
569    /// ```
570    #[zenoh_macros::unstable]
571    pub fn delete(&self) -> AdvancedPublisherDeleteBuilder<'_> {
572        let mut builder = self.publisher.delete();
573        if let Some(seqnum) = &self.seqnum {
574            builder = builder.source_info(Some(SourceInfo::new(
575                self.publisher.id(),
576                seqnum.fetch_add(1, Ordering::Relaxed),
577            )));
578        }
579        if let Some(hlc) = self.publisher.session().hlc() {
580            builder = builder.timestamp(hlc.new_timestamp());
581        }
582        AdvancedPublisherDeleteBuilder {
583            builder,
584            cache: self.cache.as_ref(),
585        }
586    }
587
588    /// Return the [`MatchingStatus`](zenoh::matching::MatchingStatus) of the publisher.
589    ///
590    /// Wraps [`Publisher::matching_status`](zenoh::pubsub::Publisher::matching_status) method.
591    ///
592    /// [`MatchingStatus::matching`](zenoh::matching::MatchingStatus::matching)
593    /// will return true if there exist Subscribers matching the Publisher's key expression and false otherwise.
594    ///
595    /// # Examples
596    /// ```
597    /// # #[tokio::main]
598    /// # async fn main() {
599    /// use zenoh_ext::AdvancedPublisherBuilderExt;
600    ///
601    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
602    /// let publisher = session.declare_publisher("key/expression").advanced().await.unwrap();
603    /// let matching_subscribers: bool = publisher
604    ///     .matching_status()
605    ///     .await
606    ///     .unwrap()
607    ///     .matching();
608    /// # }
609    /// ```
610    #[zenoh_macros::unstable]
611    pub fn matching_status(&self) -> impl Resolve<ZResult<zenoh::matching::MatchingStatus>> + '_ {
612        self.publisher.matching_status()
613    }
614
615    /// Return a [`MatchingListener`](zenoh::matching::MatchingStatus) for this Publisher.
616    ///
617    /// Wraps [`Publisher::matching_listener`](zenoh::pubsub::Publisher::matching_listener) method.
618    ///
619    /// The [`MatchingListener`](zenoh::matching::MatchingStatus) that will send a notification each time
620    /// the [`MatchingStatus`](zenoh::matching::MatchingStatus) of the Publisher changes.
621    ///
622    /// # Examples
623    /// ```no_run
624    /// # #[tokio::main]
625    /// # async fn main() {
626    /// use zenoh_ext::AdvancedPublisherBuilderExt;
627    ///
628    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
629    /// let publisher = session.declare_publisher("key/expression").advanced().await.unwrap();
630    /// let matching_listener = publisher.matching_listener().await.unwrap();
631    /// while let Ok(matching_status) = matching_listener.recv_async().await {
632    ///     if matching_status.matching() {
633    ///         println!("Publisher has matching subscribers.");
634    ///     } else {
635    ///         println!("Publisher has NO MORE matching subscribers.");
636    ///     }
637    /// }
638    /// # }
639    /// ```
640    #[zenoh_macros::unstable]
641    pub fn matching_listener(
642        &self,
643    ) -> zenoh::matching::MatchingListenerBuilder<'_, zenoh::handlers::DefaultHandler> {
644        self.publisher.matching_listener()
645    }
646
647    /// Undeclares the [`Publisher`], informing the network that it needn't optimize publications for its key expression anymore.
648    ///
649    /// Wraps [`Publisher::undeclare`](zenoh::pubsub::Publisher::undeclare) method
650    ///
651    /// # Examples
652    /// ```
653    /// # #[tokio::main]
654    /// # async fn main() {
655    /// use zenoh_ext::AdvancedPublisherBuilderExt;
656    ///
657    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
658    /// let publisher = session.declare_publisher("key/expression").advanced().await.unwrap();
659    /// publisher.undeclare().await.unwrap();
660    /// # }
661    /// ```
662    #[zenoh_macros::unstable]
663    pub fn undeclare(self) -> impl Resolve<ZResult<()>> + 'a {
664        tracing::debug!(
665            "AdvancedPublisher{{key_expr: {}}}: Undeclare",
666            self.key_expr()
667        );
668        self.publisher.undeclare()
669    }
670}
671
672#[zenoh_macros::unstable]
673pub type AdvancedPublisherPutBuilder<'a> = AdvancedPublicationBuilder<'a, PublicationBuilderPut>;
674#[zenoh_macros::unstable]
675pub type AdvancedPublisherDeleteBuilder<'a> =
676    AdvancedPublicationBuilder<'a, PublicationBuilderDelete>;
677
678#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
679#[derive(Clone)]
680#[zenoh_macros::unstable]
681pub struct AdvancedPublicationBuilder<'a, P> {
682    pub(crate) builder: PublicationBuilder<&'a Publisher<'a>, P>,
683    pub(crate) cache: Option<&'a AdvancedCache>,
684}
685
686#[zenoh_macros::internal_trait]
687#[zenoh_macros::unstable]
688impl EncodingBuilderTrait for AdvancedPublicationBuilder<'_, PublicationBuilderPut> {
689    #[zenoh_macros::unstable]
690    fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
691        Self {
692            builder: self.builder.encoding(encoding),
693            ..self
694        }
695    }
696}
697
698#[zenoh_macros::internal_trait]
699#[zenoh_macros::unstable]
700impl<P> SampleBuilderTrait for AdvancedPublicationBuilder<'_, P> {
701    #[zenoh_macros::unstable]
702    fn source_info<TS: Into<Option<SourceInfo>>>(self, source_info: TS) -> Self {
703        Self {
704            builder: self.builder.source_info(source_info),
705            ..self
706        }
707    }
708    #[zenoh_macros::unstable]
709    fn attachment<TA: Into<OptionZBytes>>(self, attachment: TA) -> Self {
710        let attachment: OptionZBytes = attachment.into();
711        Self {
712            builder: self.builder.attachment(attachment),
713            ..self
714        }
715    }
716}
717
718#[zenoh_macros::internal_trait]
719#[zenoh_macros::unstable]
720impl<P> TimestampBuilderTrait for AdvancedPublicationBuilder<'_, P> {
721    #[zenoh_macros::unstable]
722    fn timestamp<TS: Into<Option<uhlc::Timestamp>>>(self, timestamp: TS) -> Self {
723        Self {
724            builder: self.builder.timestamp(timestamp),
725            ..self
726        }
727    }
728}
729
730#[zenoh_macros::unstable]
731impl<P> Resolvable for AdvancedPublicationBuilder<'_, P> {
732    type To = ZResult<()>;
733}
734
735#[zenoh_macros::unstable]
736impl Wait for AdvancedPublisherPutBuilder<'_> {
737    #[inline]
738    #[zenoh_macros::unstable]
739    fn wait(self) -> <Self as Resolvable>::To {
740        if let Some(cache) = self.cache {
741            cache.cache_sample(zenoh::sample::Sample::from(&self.builder));
742        }
743        self.builder.wait()
744    }
745}
746
747#[zenoh_macros::unstable]
748impl Wait for AdvancedPublisherDeleteBuilder<'_> {
749    #[inline]
750    #[zenoh_macros::unstable]
751    fn wait(self) -> <Self as Resolvable>::To {
752        if let Some(cache) = self.cache {
753            cache.cache_sample(zenoh::sample::Sample::from(&self.builder));
754        }
755        self.builder.wait()
756    }
757}
758
759#[zenoh_macros::unstable]
760impl IntoFuture for AdvancedPublisherPutBuilder<'_> {
761    type Output = <Self as Resolvable>::To;
762    type IntoFuture = Ready<<Self as Resolvable>::To>;
763
764    #[zenoh_macros::unstable]
765    fn into_future(self) -> Self::IntoFuture {
766        std::future::ready(self.wait())
767    }
768}
769
770#[zenoh_macros::unstable]
771impl IntoFuture for AdvancedPublisherDeleteBuilder<'_> {
772    type Output = <Self as Resolvable>::To;
773    type IntoFuture = Ready<<Self as Resolvable>::To>;
774
775    #[zenoh_macros::unstable]
776    fn into_future(self) -> Self::IntoFuture {
777        std::future::ready(self.wait())
778    }
779}