Skip to main content

zenoh_ext/
advanced_publisher.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::{
15    future::{IntoFuture, Ready},
16    sync::{
17        atomic::{AtomicU32, Ordering},
18        Arc,
19    },
20    time::Duration,
21};
22
23use zenoh::{
24    bytes::{Encoding, OptionZBytes, ZBytes},
25    internal::{
26        bail,
27        runtime::ZRuntime,
28        traits::{
29            EncodingBuilderTrait, QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait,
30        },
31        TerminatableTask,
32    },
33    key_expr::{keyexpr, KeyExpr},
34    liveliness::LivelinessToken,
35    pubsub::{
36        PublicationBuilder, PublicationBuilderDelete, PublicationBuilderPut, Publisher,
37        PublisherBuilder, PublisherUndeclaration,
38    },
39    qos::{CongestionControl, Priority, Reliability},
40    sample::{Locality, SourceInfo},
41    session::EntityGlobalId,
42    Resolvable, Resolve, Result as ZResult, Session, Wait, KE_ADV_PREFIX, KE_EMPTY,
43};
44use zenoh_macros::ke;
45
46use crate::{
47    advanced_cache::{AdvancedCache, AdvancedCacheBuilder, CacheConfig, KE_UHLC},
48    z_serialize,
49};
50
51pub(crate) static KE_PUB: &keyexpr = ke!("pub");
52
53#[derive(PartialEq)]
54#[zenoh_macros::unstable]
55pub(crate) enum Sequencing {
56    None,
57    Timestamp,
58    SequenceNumber,
59}
60
61/// Configuration for sample miss detection
62///
63/// Enabling [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection) in [`AdvancedPublisher`](crate::AdvancedPublisher)
64/// allows [`AdvancedSubscribers`](crate::AdvancedSubscriber) to detect missed samples
65/// through [`sample_miss_listener`](crate::AdvancedSubscriber::sample_miss_listener)
66/// and to recover missed samples through [`recovery`](crate::AdvancedSubscriberBuilder::recovery).
67#[zenoh_macros::unstable]
68#[derive(Debug, Default, Clone)]
69pub struct MissDetectionConfig {
70    pub(crate) state_publisher: Option<(Duration, bool)>,
71}
72
73#[zenoh_macros::unstable]
74impl MissDetectionConfig {
75    /// Allow last sample miss detection through periodic heartbeat.
76    ///
77    /// Periodically send the last published sample's sequence number to allow last sample recovery.
78    ///
79    /// [`heartbeat`](MissDetectionConfig::heartbeat) and [`sporadic_heartbeat`](MissDetectionConfig::sporadic_heartbeat)
80    /// are mutually exclusive. Enabling one will disable the other.
81    ///
82    /// [`AdvancedSubscribers`](crate::AdvancedSubscriber) can recover last sample with the
83    /// [`heartbeat`](crate::advanced_subscriber::RecoveryConfig::heartbeat) option.
84    #[zenoh_macros::unstable]
85    pub fn heartbeat(mut self, period: Duration) -> Self {
86        self.state_publisher = Some((period, false));
87        self
88    }
89
90    /// Allow last sample miss detection through sporadic heartbeat.
91    ///
92    /// Each period, the last published sample's sequence number is sent with [`CongestionControl::Block`]
93    /// but only if it changed since the last period.
94    ///
95    /// [`heartbeat`](MissDetectionConfig::heartbeat) and [`sporadic_heartbeat`](MissDetectionConfig::sporadic_heartbeat)
96    /// are mutually exclusive. Enabling one will disable the other.
97    ///
98    /// [`AdvancedSubscribers`](crate::AdvancedSubscriber) can recover last sample with the
99    /// [`heartbeat`](crate::advanced_subscriber::RecoveryConfig::heartbeat) option.
100    #[zenoh_macros::unstable]
101    pub fn sporadic_heartbeat(mut self, period: Duration) -> Self {
102        self.state_publisher = Some((period, true));
103        self
104    }
105}
106
107/// The builder of AdvancedPublisher, allowing to configure it.
108#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"]
109#[zenoh_macros::unstable]
110pub struct AdvancedPublisherBuilder<'a, 'b, 'c> {
111    session: &'a Session,
112    pub_key_expr: ZResult<KeyExpr<'b>>,
113    encoding: Encoding,
114    destination: Locality,
115    reliability: Reliability,
116    congestion_control: CongestionControl,
117    priority: Priority,
118    is_express: bool,
119    meta_key_expr: Option<ZResult<KeyExpr<'c>>>,
120    sequencing: Sequencing,
121    miss_config: Option<MissDetectionConfig>,
122    liveliness: bool,
123    cache: bool,
124    history: CacheConfig,
125}
126
127#[zenoh_macros::unstable]
128impl<'a, 'b, 'c> AdvancedPublisherBuilder<'a, 'b, 'c> {
129    #[zenoh_macros::unstable]
130    pub(crate) fn new(builder: PublisherBuilder<'a, 'b>) -> AdvancedPublisherBuilder<'a, 'b, 'c> {
131        AdvancedPublisherBuilder {
132            session: builder.session,
133            pub_key_expr: builder.key_expr,
134            encoding: builder.encoding,
135            destination: builder.destination,
136            reliability: builder.reliability,
137            congestion_control: builder.congestion_control,
138            priority: builder.priority,
139            is_express: builder.is_express,
140            meta_key_expr: None,
141            sequencing: Sequencing::None,
142            miss_config: None,
143            liveliness: false,
144            cache: false,
145            history: CacheConfig::default(),
146        }
147    }
148
149    /// Changes the [`zenoh::sample::Locality`] applied when routing the data.
150    ///
151    /// This restricts the matching subscribers that will receive the published data to the ones
152    /// that have the given [`zenoh::sample::Locality`].
153    #[zenoh_macros::unstable]
154    #[inline]
155    pub fn allowed_destination(mut self, destination: Locality) -> Self {
156        self.destination = destination;
157        self
158    }
159
160    /// Changes the [`zenoh::qos::Reliability`] to apply when routing the data.
161    ///
162    /// **NOTE**: Currently `reliability` does not trigger any data retransmission on the wire. It
163    ///   is rather used as a marker on the wire and it may be used to select the best link
164    ///   available (e.g. TCP for reliable data and UDP for best effort data).
165    #[zenoh_macros::unstable]
166    #[inline]
167    pub fn reliability(self, reliability: Reliability) -> Self {
168        Self {
169            reliability,
170            ..self
171        }
172    }
173
174    /// Allow matching [`AdvancedSubscribers`](crate::AdvancedSubscriber) to detect lost samples and optionally ask for retransmission.
175    ///
176    /// Retransmission can only be achieved if [`cache`](crate::AdvancedPublisherBuilder::cache) is enabled.
177    #[zenoh_macros::unstable]
178    pub fn sample_miss_detection(mut self, config: MissDetectionConfig) -> Self {
179        self.sequencing = Sequencing::SequenceNumber;
180        self.miss_config = Some(config);
181        self
182    }
183
184    /// Attach a cache to this [`AdvancedPublisher`].
185    ///
186    /// The cache can be used for history and/or recovery.
187    #[zenoh_macros::unstable]
188    pub fn cache(mut self, config: CacheConfig) -> Self {
189        self.cache = true;
190        if self.sequencing == Sequencing::None {
191            self.sequencing = Sequencing::Timestamp;
192        }
193        self.history = config;
194        self
195    }
196
197    /// Allow this [`AdvancedPublisher`] to be detected by [`AdvancedSubscribers`](crate::AdvancedSubscriber).
198    ///
199    /// This allows [`AdvancedSubscribers`](crate::AdvancedSubscriber) to retrieve the local history.
200    #[zenoh_macros::unstable]
201    pub fn publisher_detection(mut self) -> Self {
202        self.liveliness = true;
203        self
204    }
205
206    /// A key expression added to the liveliness token key expression and to the cache queryable key expression.
207    ///
208    /// It can be used to convey meta data.
209    #[zenoh_macros::unstable]
210    pub fn publisher_detection_metadata<TryIntoKeyExpr>(mut self, meta: TryIntoKeyExpr) -> Self
211    where
212        TryIntoKeyExpr: TryInto<KeyExpr<'c>>,
213        <TryIntoKeyExpr as TryInto<KeyExpr<'c>>>::Error: Into<zenoh::Error>,
214    {
215        self.meta_key_expr = Some(meta.try_into().map_err(Into::into));
216        self
217    }
218}
219
220#[zenoh_macros::internal_trait]
221#[zenoh_macros::unstable]
222impl EncodingBuilderTrait for AdvancedPublisherBuilder<'_, '_, '_> {
223    /// Set the [`Encoding`]
224    #[zenoh_macros::unstable]
225    fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
226        Self {
227            encoding: encoding.into(),
228            ..self
229        }
230    }
231}
232
233#[zenoh_macros::internal_trait]
234#[zenoh_macros::unstable]
235impl QoSBuilderTrait for AdvancedPublisherBuilder<'_, '_, '_> {
236    /// Changes the [`CongestionControl`] to apply when routing the data.
237    #[inline]
238    #[zenoh_macros::unstable]
239    fn congestion_control(self, congestion_control: CongestionControl) -> Self {
240        Self {
241            congestion_control,
242            ..self
243        }
244    }
245
246    /// Changes the [`Priority`] to apply when routing the data.
247    #[inline]
248    #[zenoh_macros::unstable]
249    fn priority(self, priority: Priority) -> Self {
250        Self { priority, ..self }
251    }
252
253    /// Changes the Express policy to apply when routing the data.
254    ///
255    /// When express is set to `true`, then the message will not be batched.
256    /// This usually has a positive impact on latency but negative impact on throughput.
257    #[inline]
258    #[zenoh_macros::unstable]
259    fn express(self, is_express: bool) -> Self {
260        Self { is_express, ..self }
261    }
262}
263
264#[zenoh_macros::unstable]
265impl<'b> Resolvable for AdvancedPublisherBuilder<'_, 'b, '_> {
266    type To = ZResult<AdvancedPublisher<'b>>;
267}
268
269#[zenoh_macros::unstable]
270impl Wait for AdvancedPublisherBuilder<'_, '_, '_> {
271    #[zenoh_macros::unstable]
272    fn wait(self) -> <Self as Resolvable>::To {
273        AdvancedPublisher::new(self)
274    }
275}
276
277#[zenoh_macros::unstable]
278impl IntoFuture for AdvancedPublisherBuilder<'_, '_, '_> {
279    type Output = <Self as Resolvable>::To;
280    type IntoFuture = Ready<<Self as Resolvable>::To>;
281
282    #[zenoh_macros::unstable]
283    fn into_future(self) -> Self::IntoFuture {
284        std::future::ready(self.wait())
285    }
286}
287
288/// The extension to a [`Publisher`](zenoh::pubsub::Publisher) providing advanced functionalities.
289///
290/// The `AdvancedPublisher` is constructed over a regular [`Publisher`](zenoh::pubsub::Publisher) through
291/// [`advanced`](crate::AdvancedPublisherBuilderExt::advanced) method or by using
292/// any other method of [`AdvancedPublisherBuilder`](crate::AdvancedPublisherBuilder).
293///
294/// The `AdvancedPublisher` works with [`AdvancedSubscriber`](crate::AdvancedSubscriber) to provide additional functionalities such as:
295/// - [`cache`](crate::AdvancedPublisherBuilderExt::cache) last published samples to be retrieved by
296///   [`AdvancedSubscriber`](crate::AdvancedSubscriber)'s [`history`](crate::AdvancedSubscriberBuilderExt::history) mechanism
297/// - [`sample_miss_detection`](crate::AdvancedPublisherBuilderExt::sample_miss_detection) to allow detecting missed samples
298///   using [`AdvancedSubscriber`](crate::AdvancedSubscriber)'s [`sample_miss_listener`](crate::AdvancedSubscriber::sample_miss_listener)
299/// - [`publisher_detection`](crate::AdvancedPublisherBuilderExt::publisher_detection) to create a Liveliness token to assert its presence and
300///   allow it to be requested for missed samples if [`detect_late_publishers`](crate::HistoryConfig::detect_late_publishers) is enabled
301///
302/// # Example
303/// ```no_run
304/// # #[tokio::main]
305/// # async fn main() {
306/// use zenoh_ext::{AdvancedPublisherBuilderExt, CacheConfig, MissDetectionConfig};
307/// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
308/// let publisher = session
309///     .declare_publisher("key/expression")
310///     .cache(CacheConfig::default().max_samples(10))
311///     .sample_miss_detection(
312///         MissDetectionConfig::default().heartbeat(std::time::Duration::from_secs(1))
313///     )
314///     .publisher_detection()
315///     .await
316///     .unwrap();
317/// publisher.put("Value").await.unwrap();
318/// # }
319/// ```
320#[zenoh_macros::unstable]
321pub struct AdvancedPublisher<'a> {
322    publisher: Publisher<'a>,
323    seqnum: Option<Arc<AtomicU32>>,
324    cache: Option<AdvancedCache>,
325    _token: Option<LivelinessToken>,
326    _state_publisher: Option<TerminatableTask>,
327}
328
329#[zenoh_macros::unstable]
330impl<'a> AdvancedPublisher<'a> {
331    #[zenoh_macros::unstable]
332    fn new(conf: AdvancedPublisherBuilder<'_, 'a, '_>) -> ZResult<Self> {
333        let key_expr = conf.pub_key_expr?;
334        let meta = match conf.meta_key_expr {
335            Some(meta) => Some(meta?),
336            None => None,
337        };
338        tracing::debug!("Create AdvancedPublisher{{key_expr: {}}}", &key_expr);
339
340        let publisher = conf
341            .session
342            .declare_publisher(key_expr.clone())
343            .encoding(conf.encoding)
344            .allowed_destination(conf.destination)
345            .reliability(conf.reliability)
346            .congestion_control(conf.congestion_control)
347            .priority(conf.priority)
348            .express(conf.is_express)
349            .wait()?;
350        let id = publisher.id();
351        let suffix = KE_ADV_PREFIX / KE_PUB / &id.zid().into_keyexpr();
352        let suffix = match conf.sequencing {
353            Sequencing::SequenceNumber => {
354                suffix / &KeyExpr::try_from(id.eid().to_string()).unwrap()
355            }
356            _ => suffix / KE_UHLC,
357        };
358        let suffix = match meta {
359            Some(meta) => suffix / &meta,
360            // We need this empty chunk because of a routing matching bug
361            _ => suffix / KE_EMPTY,
362        };
363
364        let seqnum = match conf.sequencing {
365            Sequencing::SequenceNumber => Some(Arc::new(AtomicU32::new(0))),
366            Sequencing::Timestamp => {
367                if conf.session.hlc().is_none() {
368                    bail!(
369                        "Cannot create AdvancedPublisher {} with Sequencing::Timestamp: \
370                            the 'timestamping' setting must be enabled in the Zenoh configuration.",
371                        key_expr,
372                    )
373                }
374                None
375            }
376            _ => None,
377        };
378
379        let cache = if conf.cache {
380            Some(
381                AdvancedCacheBuilder::new(conf.session, Ok(key_expr.clone()))
382                    .history(conf.history)
383                    .queryable_suffix(&suffix)
384                    .wait()?,
385            )
386        } else {
387            None
388        };
389
390        let token = if conf.liveliness {
391            tracing::debug!(
392                "AdvancedPublisher{{key_expr: {}}}: Declare liveliness token {}",
393                key_expr,
394                &key_expr / &suffix,
395            );
396            Some(
397                conf.session
398                    .liveliness()
399                    .declare_token(&key_expr / &suffix)
400                    .wait()?,
401            )
402        } else {
403            None
404        };
405
406        let state_publisher = if let Some((period, sporadic)) =
407            conf.miss_config.as_ref().and_then(|c| c.state_publisher)
408        {
409            if let Some(seqnum) = seqnum.as_ref() {
410                tracing::debug!(
411                    "AdvancedPublisher{{key_expr: {}}}: Enable {}heartbeat on {} with period {:?}",
412                    key_expr,
413                    if sporadic { "sporadic " } else { "" },
414                    &key_expr / &suffix,
415                    period
416                );
417                let seqnum = seqnum.clone();
418                if !sporadic {
419                    let publisher = conf.session.declare_publisher(&key_expr / &suffix).wait()?;
420                    Some(TerminatableTask::spawn_abortable(
421                        ZRuntime::Net,
422                        async move {
423                            loop {
424                                tokio::time::sleep(period).await;
425                                let seqnum = seqnum.load(Ordering::Relaxed);
426                                if seqnum > 0 {
427                                    let _ = publisher.put(z_serialize(&(seqnum - 1))).await;
428                                }
429                            }
430                        },
431                    ))
432                } else {
433                    let mut last_seqnum = 0;
434                    let publisher = conf
435                        .session
436                        .declare_publisher(&key_expr / &suffix)
437                        .congestion_control(CongestionControl::Block)
438                        .wait()?;
439                    Some(TerminatableTask::spawn_abortable(
440                        ZRuntime::Net,
441                        async move {
442                            loop {
443                                tokio::time::sleep(period).await;
444                                let seqnum = seqnum.load(Ordering::Relaxed);
445                                if seqnum > last_seqnum {
446                                    let _ = publisher.put(z_serialize(&(seqnum - 1))).await;
447                                    last_seqnum = seqnum;
448                                }
449                            }
450                        },
451                    ))
452                }
453            } else {
454                None
455            }
456        } else {
457            None
458        };
459
460        Ok(AdvancedPublisher {
461            publisher,
462            seqnum,
463            cache,
464            _token: token,
465            _state_publisher: state_publisher,
466        })
467    }
468
469    /// Returns the [`EntityGlobalId`] of this Publisher.
470    ///
471    /// Wraps [`Publisher::id`](zenoh::pubsub::Publisher::id) method
472    #[zenoh_macros::unstable]
473    pub fn id(&self) -> EntityGlobalId {
474        self.publisher.id()
475    }
476
477    /// Returns the [`KeyExpr`] of this Publisher.
478    ///
479    /// Wraps [`Publisher::key_expr`](zenoh::pubsub::Publisher::key_expr) method
480    #[inline]
481    #[zenoh_macros::unstable]
482    pub fn key_expr(&self) -> &KeyExpr<'a> {
483        self.publisher.key_expr()
484    }
485
486    /// Get the [`Encoding`] used when publishing data.
487    ///
488    /// Wraps [`Publisher::encoding`](zenoh::pubsub::Publisher::encoding) method
489    #[inline]
490    #[zenoh_macros::unstable]
491    pub fn encoding(&self) -> &Encoding {
492        self.publisher.encoding()
493    }
494
495    /// Get the `congestion_control` applied when routing the data.
496    ///
497    /// Wraps [`Publisher::congestion_control`](zenoh::pubsub::Publisher::congestion_control) method
498    #[inline]
499    #[zenoh_macros::unstable]
500    pub fn congestion_control(&self) -> CongestionControl {
501        self.publisher.congestion_control()
502    }
503
504    /// Get the priority of the written data.
505    ///
506    /// Wraps [`Publisher::priority`](zenoh::pubsub::Publisher::priority) method
507    #[inline]
508    #[zenoh_macros::unstable]
509    pub fn priority(&self) -> Priority {
510        self.publisher.priority()
511    }
512
513    /// Put data.
514    ///
515    /// Wraps [`Publisher::put`](zenoh::pubsub::Publisher::put) method
516    ///
517    /// # Examples
518    /// ```
519    /// # #[tokio::main]
520    /// # async fn main() {
521    /// use zenoh_ext::AdvancedPublisherBuilderExt;
522    ///
523    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
524    /// let publisher = session.declare_publisher("key/expression").advanced().await.unwrap();
525    /// publisher.put("value").await.unwrap();
526    /// # }
527    /// ```
528    #[inline]
529    #[zenoh_macros::unstable]
530    pub fn put<IntoZBytes>(&self, payload: IntoZBytes) -> AdvancedPublisherPutBuilder<'_>
531    where
532        IntoZBytes: Into<ZBytes>,
533    {
534        let mut builder = self.publisher.put(payload);
535        if let Some(seqnum) = &self.seqnum {
536            let info = Some(SourceInfo::new(
537                self.publisher.id(),
538                seqnum.fetch_add(1, Ordering::Relaxed),
539            ));
540            tracing::trace!(
541                "AdvancedPublisher{{key_expr: {}}}: Put data with {:?}",
542                self.publisher.key_expr(),
543                info
544            );
545            builder = builder.source_info(info);
546        }
547        if let Some(hlc) = self.publisher.session().hlc() {
548            builder = builder.timestamp(hlc.new_timestamp());
549        }
550        AdvancedPublisherPutBuilder {
551            builder,
552            cache: self.cache.as_ref(),
553        }
554    }
555
556    /// Delete data.
557    ///
558    /// Wraps [`Publisher::delete`](zenoh::pubsub::Publisher::delete) method
559    ///
560    /// # Examples
561    /// ```
562    /// # #[tokio::main]
563    /// # async fn main() {
564    /// use zenoh_ext::AdvancedPublisherBuilderExt;
565    ///
566    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
567    /// let publisher = session.declare_publisher("key/expression").advanced().await.unwrap();
568    /// publisher.delete().await.unwrap();
569    /// # }
570    /// ```
571    #[zenoh_macros::unstable]
572    pub fn delete(&self) -> AdvancedPublisherDeleteBuilder<'_> {
573        let mut builder = self.publisher.delete();
574        if let Some(seqnum) = &self.seqnum {
575            builder = builder.source_info(Some(SourceInfo::new(
576                self.publisher.id(),
577                seqnum.fetch_add(1, Ordering::Relaxed),
578            )));
579        }
580        if let Some(hlc) = self.publisher.session().hlc() {
581            builder = builder.timestamp(hlc.new_timestamp());
582        }
583        AdvancedPublisherDeleteBuilder {
584            builder,
585            cache: self.cache.as_ref(),
586        }
587    }
588
589    /// Return the [`MatchingStatus`](zenoh::matching::MatchingStatus) of the publisher.
590    ///
591    /// Wraps [`Publisher::matching_status`](zenoh::pubsub::Publisher::matching_status) method.
592    ///
593    /// [`MatchingStatus::matching`](zenoh::matching::MatchingStatus::matching)
594    /// will return true if there exist Subscribers matching the Publisher's key expression and false otherwise.
595    ///
596    /// # Examples
597    /// ```
598    /// # #[tokio::main]
599    /// # async fn main() {
600    /// use zenoh_ext::AdvancedPublisherBuilderExt;
601    ///
602    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
603    /// let publisher = session.declare_publisher("key/expression").advanced().await.unwrap();
604    /// let matching_subscribers: bool = publisher
605    ///     .matching_status()
606    ///     .await
607    ///     .unwrap()
608    ///     .matching();
609    /// # }
610    /// ```
611    #[zenoh_macros::unstable]
612    pub fn matching_status(&self) -> impl Resolve<ZResult<zenoh::matching::MatchingStatus>> + '_ {
613        self.publisher.matching_status()
614    }
615
616    /// Return a [`MatchingListener`](zenoh::matching::MatchingStatus) for this Publisher.
617    ///
618    /// Wraps [`Publisher::matching_listener`](zenoh::pubsub::Publisher::matching_listener) method.
619    ///
620    /// The [`MatchingListener`](zenoh::matching::MatchingStatus) that will send a notification each time
621    /// the [`MatchingStatus`](zenoh::matching::MatchingStatus) of the Publisher changes.
622    ///
623    /// # Examples
624    /// ```no_run
625    /// # #[tokio::main]
626    /// # async fn main() {
627    /// use zenoh_ext::AdvancedPublisherBuilderExt;
628    ///
629    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
630    /// let publisher = session.declare_publisher("key/expression").advanced().await.unwrap();
631    /// let matching_listener = publisher.matching_listener().await.unwrap();
632    /// while let Ok(matching_status) = matching_listener.recv_async().await {
633    ///     if matching_status.matching() {
634    ///         println!("Publisher has matching subscribers.");
635    ///     } else {
636    ///         println!("Publisher has NO MORE matching subscribers.");
637    ///     }
638    /// }
639    /// # }
640    /// ```
641    #[zenoh_macros::unstable]
642    pub fn matching_listener(
643        &self,
644    ) -> zenoh::matching::MatchingListenerBuilder<'_, zenoh::handlers::DefaultHandler> {
645        self.publisher.matching_listener()
646    }
647
648    /// Undeclares the [`Publisher`], informing the network that it needn't optimize publications for its key expression anymore.
649    ///
650    /// Wraps [`Publisher::undeclare`](zenoh::pubsub::Publisher::undeclare) method
651    ///
652    /// # Examples
653    /// ```
654    /// # #[tokio::main]
655    /// # async fn main() {
656    /// use zenoh_ext::AdvancedPublisherBuilderExt;
657    ///
658    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
659    /// let publisher = session.declare_publisher("key/expression").advanced().await.unwrap();
660    /// publisher.undeclare().await.unwrap();
661    /// # }
662    /// ```
663    #[zenoh_macros::unstable]
664    pub fn undeclare(self) -> PublisherUndeclaration<'a> {
665        tracing::debug!(
666            "AdvancedPublisher{{key_expr: {}}}: Undeclare",
667            self.key_expr()
668        );
669        self.publisher.undeclare()
670    }
671}
672
673#[zenoh_macros::unstable]
674pub type AdvancedPublisherPutBuilder<'a> = AdvancedPublicationBuilder<'a, PublicationBuilderPut>;
675#[zenoh_macros::unstable]
676pub type AdvancedPublisherDeleteBuilder<'a> =
677    AdvancedPublicationBuilder<'a, PublicationBuilderDelete>;
678
679#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
680#[derive(Clone)]
681#[zenoh_macros::unstable]
682pub struct AdvancedPublicationBuilder<'a, P> {
683    pub(crate) builder: PublicationBuilder<&'a Publisher<'a>, P>,
684    pub(crate) cache: Option<&'a AdvancedCache>,
685}
686
687#[zenoh_macros::internal_trait]
688#[zenoh_macros::unstable]
689impl EncodingBuilderTrait for AdvancedPublicationBuilder<'_, PublicationBuilderPut> {
690    /// Set the [`Encoding`]
691    #[zenoh_macros::unstable]
692    fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
693        Self {
694            builder: self.builder.encoding(encoding),
695            ..self
696        }
697    }
698}
699
700#[zenoh_macros::internal_trait]
701#[zenoh_macros::unstable]
702impl<P> SampleBuilderTrait for AdvancedPublicationBuilder<'_, P> {
703    #[zenoh_macros::unstable]
704    /// Sets an optional [`SourceInfo`](zenoh::sample::SourceInfo) to be sent along with the publication.
705    fn source_info<TS: Into<Option<SourceInfo>>>(self, source_info: TS) -> Self {
706        Self {
707            builder: self.builder.source_info(source_info),
708            ..self
709        }
710    }
711    #[zenoh_macros::unstable]
712    /// Sets an optional attachment to be sent along with the publication.
713    ///
714    /// The argument is converted via [`OptionZBytes`], which supports both `T: Into<ZBytes>`
715    /// and `Option<T>` where `T: Into<ZBytes>`.
716    fn attachment<TA: Into<OptionZBytes>>(self, attachment: TA) -> Self {
717        let attachment: OptionZBytes = attachment.into();
718        Self {
719            builder: self.builder.attachment(attachment),
720            ..self
721        }
722    }
723}
724
725#[zenoh_macros::internal_trait]
726#[zenoh_macros::unstable]
727impl<P> TimestampBuilderTrait for AdvancedPublicationBuilder<'_, P> {
728    /// Sets an optional timestamp to be sent along with the publication.
729    #[zenoh_macros::unstable]
730    fn timestamp<TS: Into<Option<uhlc::Timestamp>>>(self, timestamp: TS) -> Self {
731        Self {
732            builder: self.builder.timestamp(timestamp),
733            ..self
734        }
735    }
736}
737
738#[zenoh_macros::unstable]
739impl<P> Resolvable for AdvancedPublicationBuilder<'_, P> {
740    type To = ZResult<()>;
741}
742
743#[zenoh_macros::unstable]
744impl Wait for AdvancedPublisherPutBuilder<'_> {
745    #[inline]
746    #[zenoh_macros::unstable]
747    fn wait(self) -> <Self as Resolvable>::To {
748        if let Some(cache) = self.cache {
749            cache.cache_sample(zenoh::sample::Sample::from(&self.builder));
750        }
751        self.builder.wait()
752    }
753}
754
755#[zenoh_macros::unstable]
756impl Wait for AdvancedPublisherDeleteBuilder<'_> {
757    #[inline]
758    #[zenoh_macros::unstable]
759    fn wait(self) -> <Self as Resolvable>::To {
760        if let Some(cache) = self.cache {
761            cache.cache_sample(zenoh::sample::Sample::from(&self.builder));
762        }
763        self.builder.wait()
764    }
765}
766
767#[zenoh_macros::unstable]
768impl IntoFuture for AdvancedPublisherPutBuilder<'_> {
769    type Output = <Self as Resolvable>::To;
770    type IntoFuture = Ready<<Self as Resolvable>::To>;
771
772    #[zenoh_macros::unstable]
773    fn into_future(self) -> Self::IntoFuture {
774        std::future::ready(self.wait())
775    }
776}
777
778#[zenoh_macros::unstable]
779impl IntoFuture for AdvancedPublisherDeleteBuilder<'_> {
780    type Output = <Self as Resolvable>::To;
781    type IntoFuture = Ready<<Self as Resolvable>::To>;
782
783    #[zenoh_macros::unstable]
784    fn into_future(self) -> Self::IntoFuture {
785        std::future::ready(self.wait())
786    }
787}