zenoh/api/builders/
publisher.rs

1//
2// Copyright (c) 2024 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::future::{IntoFuture, Ready};
15
16use itertools::Itertools;
17use zenoh_config::qos::PublisherQoSConfig;
18use zenoh_core::{Resolvable, Result as ZResult, Wait};
19use zenoh_keyexpr::keyexpr_tree::{IKeyExprTree, IKeyExprTreeNode};
20use zenoh_protocol::core::CongestionControl;
21#[cfg(feature = "unstable")]
22use zenoh_protocol::core::Reliability;
23
24#[cfg(feature = "unstable")]
25use crate::api::sample::SourceInfo;
26use crate::{
27    api::{
28        builders::sample::{
29            EncodingBuilderTrait, QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait,
30        },
31        bytes::{OptionZBytes, ZBytes},
32        encoding::Encoding,
33        key_expr::KeyExpr,
34        publisher::{Priority, Publisher},
35        sample::{Locality, SampleKind},
36    },
37    Session,
38};
39
40/// The alias for [`PublicationBuilder`]
41/// returned by the [`Session::put`](crate::session::Session::put) method.
42pub type SessionPutBuilder<'a, 'b> =
43    PublicationBuilder<PublisherBuilder<'a, 'b>, PublicationBuilderPut>;
44
45/// The alias for [`PublicationBuilder`]
46/// returned by the [`Session::delete`](crate::session::Session::delete) method.
47pub type SessionDeleteBuilder<'a, 'b> =
48    PublicationBuilder<PublisherBuilder<'a, 'b>, PublicationBuilderDelete>;
49
50/// The alias for [`PublicationBuilder`]
51/// returned by the [`Publisher::put`](crate::pubsub::Publisher::put) method.
52pub type PublisherPutBuilder<'a> = PublicationBuilder<&'a Publisher<'a>, PublicationBuilderPut>;
53
54/// The alias for [`PublicationBuilder`]
55/// returned by the [`Publisher::delete`](crate::pubsub::Publisher::delete) method.
56pub type PublisherDeleteBuilder<'a> =
57    PublicationBuilder<&'a Publisher<'a>, PublicationBuilderDelete>;
58
59/// The type-modifier for a [`PublicationBuilder`] for a `Put` operation.
60///
61/// Makes the publication builder make a sample of a [`kind`](crate::sample::Sample::kind) [`SampleKind::Put`].
62#[derive(Debug, Clone)]
63pub struct PublicationBuilderPut {
64    pub(crate) payload: ZBytes,
65    pub(crate) encoding: Encoding,
66}
67
68/// The type-modifier for a [`PublicationBuilder`] for a `Delete` operation.
69///
70/// Makes the publication builder make a sample of a [`kind`](crate::sample::Sample::kind) [`SampleKind::Delete`].
71#[derive(Debug, Clone)]
72pub struct PublicationBuilderDelete;
73
74/// Publication builder
75///
76/// A publication builder object is returned by the following methods:
77/// - [`Session::put`](crate::session::Session::put)
78/// - [`Session::delete`](crate::session::Session::delete)
79/// - [`Publisher::put`](crate::pubsub::Publisher::put)
80/// - [`Publisher::delete`](crate::pubsub::Publisher::delete)
81///
82/// It resolves to `ZResult<()>` when awaited or when calling `.wait()`.
83///
84/// # Examples
85/// ```
86/// # #[tokio::main]
87/// # async fn main() {
88/// use zenoh::{bytes::Encoding, qos::CongestionControl};
89///
90/// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
91/// session
92///     .put("key/expression", "payload")
93///     .encoding(Encoding::TEXT_PLAIN)
94///     .congestion_control(CongestionControl::Block)
95///     .await
96///     .unwrap();
97/// # }
98/// ```
99#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
100#[derive(Debug, Clone)]
101pub struct PublicationBuilder<P, T> {
102    pub(crate) publisher: P,
103    pub(crate) kind: T,
104    pub(crate) timestamp: Option<uhlc::Timestamp>,
105    #[cfg(feature = "unstable")]
106    pub(crate) source_info: Option<SourceInfo>,
107    pub(crate) attachment: Option<ZBytes>,
108}
109
110#[zenoh_macros::internal_trait]
111impl<T> QoSBuilderTrait for PublicationBuilder<PublisherBuilder<'_, '_>, T> {
112    /// Changes the [`CongestionControl`](crate::qos::CongestionControl) to apply when routing the data.
113    #[inline]
114    fn congestion_control(self, congestion_control: CongestionControl) -> Self {
115        Self {
116            publisher: self.publisher.congestion_control(congestion_control),
117            ..self
118        }
119    }
120
121    /// Changes the [`Priority`](crate::qos::Priority) of the written data.
122    #[inline]
123    fn priority(self, priority: Priority) -> Self {
124        Self {
125            publisher: self.publisher.priority(priority),
126            ..self
127        }
128    }
129
130    /// Changes the Express policy to apply when routing the data.
131    ///
132    /// When express is set to `true`, then the message will not be batched.
133    /// This usually has a positive impact on latency but a negative impact on throughput.
134    #[inline]
135    fn express(self, is_express: bool) -> Self {
136        Self {
137            publisher: self.publisher.express(is_express),
138            ..self
139        }
140    }
141}
142
143impl<T> PublicationBuilder<PublisherBuilder<'_, '_>, T> {
144    /// Changes the [`Locality`](crate::sample::Locality) applied when routing the data.
145    ///
146    /// This restricts the matching subscribers that will receive the published data to the ones
147    /// that have the given [`Locality`](crate::sample::Locality).
148    #[zenoh_macros::unstable]
149    #[inline]
150    pub fn allowed_destination(mut self, destination: Locality) -> Self {
151        self.publisher = self.publisher.allowed_destination(destination);
152        self
153    }
154
155    /// Changes the [`Reliability`](crate::qos::Reliability) to apply when routing the data.
156    ///
157    /// **NOTE**: Currently `reliability` does not trigger any data retransmission on the wire. It
158    ///   is rather used as a marker on the wire and it may be used to select the best link
159    ///   available (e.g. TCP for reliable data and UDP for best effort data).
160    #[zenoh_macros::unstable]
161    #[inline]
162    pub fn reliability(self, reliability: Reliability) -> Self {
163        Self {
164            publisher: self.publisher.reliability(reliability),
165            ..self
166        }
167    }
168}
169
170#[zenoh_macros::internal_trait]
171impl EncodingBuilderTrait for PublisherBuilder<'_, '_> {
172    /// Sets the default [`Encoding`](crate::bytes::Encoding) of the payload generated by this publisher.
173    fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
174        Self {
175            encoding: encoding.into(),
176            ..self
177        }
178    }
179}
180
181#[zenoh_macros::internal_trait]
182impl<P> EncodingBuilderTrait for PublicationBuilder<P, PublicationBuilderPut> {
183    /// Sets the [`Encoding`](crate::bytes::Encoding) of the payload of this publication.
184    fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
185        Self {
186            kind: PublicationBuilderPut {
187                encoding: encoding.into(),
188                ..self.kind
189            },
190            ..self
191        }
192    }
193}
194
195#[zenoh_macros::internal_trait]
196impl<P, T> SampleBuilderTrait for PublicationBuilder<P, T> {
197    /// Sets an optional [`SourceInfo`](crate::sample::SourceInfo) to be sent along with the publication.
198    #[zenoh_macros::unstable]
199    fn source_info<TS: Into<Option<SourceInfo>>>(self, source_info: TS) -> Self {
200        Self {
201            source_info: source_info.into(),
202            ..self
203        }
204    }
205    /// Sets an optional attachment to be sent along with the publication.
206    /// The method accepts both `Into<ZBytes>` and `Option<Into<ZBytes>>`.
207    fn attachment<TA: Into<OptionZBytes>>(self, attachment: TA) -> Self {
208        let attachment: OptionZBytes = attachment.into();
209        Self {
210            attachment: attachment.into(),
211            ..self
212        }
213    }
214}
215
216#[zenoh_macros::internal_trait]
217impl<P, T> TimestampBuilderTrait for PublicationBuilder<P, T> {
218    /// Sets an optional timestamp to be sent along with the publication.
219    fn timestamp<TS: Into<Option<uhlc::Timestamp>>>(self, timestamp: TS) -> Self {
220        Self {
221            timestamp: timestamp.into(),
222            ..self
223        }
224    }
225}
226
227impl<P, T> Resolvable for PublicationBuilder<P, T> {
228    type To = ZResult<()>;
229}
230
231impl Wait for PublicationBuilder<PublisherBuilder<'_, '_>, PublicationBuilderPut> {
232    #[inline]
233    fn wait(mut self) -> <Self as Resolvable>::To {
234        self.publisher = self.publisher.apply_qos_overwrites();
235        self.publisher.session.0.resolve_put(
236            &self.publisher.key_expr?,
237            self.kind.payload,
238            SampleKind::Put,
239            self.kind.encoding,
240            self.publisher.congestion_control,
241            self.publisher.priority,
242            self.publisher.is_express,
243            self.publisher.destination,
244            #[cfg(feature = "unstable")]
245            self.publisher.reliability,
246            self.timestamp,
247            #[cfg(feature = "unstable")]
248            self.source_info,
249            self.attachment,
250        )
251    }
252}
253
254impl Wait for PublicationBuilder<PublisherBuilder<'_, '_>, PublicationBuilderDelete> {
255    #[inline]
256    fn wait(mut self) -> <Self as Resolvable>::To {
257        self.publisher = self.publisher.apply_qos_overwrites();
258        self.publisher.session.0.resolve_put(
259            &self.publisher.key_expr?,
260            ZBytes::new(),
261            SampleKind::Delete,
262            Encoding::ZENOH_BYTES,
263            self.publisher.congestion_control,
264            self.publisher.priority,
265            self.publisher.is_express,
266            self.publisher.destination,
267            #[cfg(feature = "unstable")]
268            self.publisher.reliability,
269            self.timestamp,
270            #[cfg(feature = "unstable")]
271            self.source_info,
272            self.attachment,
273        )
274    }
275}
276
277impl IntoFuture for PublicationBuilder<PublisherBuilder<'_, '_>, PublicationBuilderPut> {
278    type Output = <Self as Resolvable>::To;
279    type IntoFuture = Ready<<Self as Resolvable>::To>;
280
281    fn into_future(self) -> Self::IntoFuture {
282        std::future::ready(self.wait())
283    }
284}
285
286impl IntoFuture for PublicationBuilder<PublisherBuilder<'_, '_>, PublicationBuilderDelete> {
287    type Output = <Self as Resolvable>::To;
288    type IntoFuture = Ready<<Self as Resolvable>::To>;
289
290    fn into_future(self) -> Self::IntoFuture {
291        std::future::ready(self.wait())
292    }
293}
294
295/// A builder for initializing a [`Publisher`].
296/// Returned by the
297/// [`Session::declare_publisher`](crate::Session::declare_publisher) method.
298///
299/// # Examples
300/// ```
301/// # #[tokio::main]
302/// # async fn main() {
303/// use zenoh::qos::CongestionControl;
304///
305/// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
306/// let publisher = session
307///     .declare_publisher("key/expression")
308///     .congestion_control(CongestionControl::Block)
309///     .await
310///     .unwrap();
311/// # }
312/// ```
313#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
314#[derive(Debug)]
315pub struct PublisherBuilder<'a, 'b> {
316    #[cfg(feature = "internal")]
317    pub session: &'a Session,
318    #[cfg(not(feature = "internal"))]
319    pub(crate) session: &'a Session,
320
321    #[cfg(feature = "internal")]
322    pub key_expr: ZResult<KeyExpr<'b>>,
323    #[cfg(not(feature = "internal"))]
324    pub(crate) key_expr: ZResult<KeyExpr<'b>>,
325
326    #[cfg(feature = "internal")]
327    pub encoding: Encoding,
328    #[cfg(not(feature = "internal"))]
329    pub(crate) encoding: Encoding,
330    #[cfg(feature = "internal")]
331    pub congestion_control: CongestionControl,
332    #[cfg(not(feature = "internal"))]
333    pub(crate) congestion_control: CongestionControl,
334    #[cfg(feature = "internal")]
335    pub priority: Priority,
336    #[cfg(not(feature = "internal"))]
337    pub(crate) priority: Priority,
338    #[cfg(feature = "internal")]
339    pub is_express: bool,
340    #[cfg(not(feature = "internal"))]
341    pub(crate) is_express: bool,
342    #[cfg(feature = "internal")]
343    #[cfg(feature = "unstable")]
344    pub reliability: Reliability,
345    #[cfg(not(feature = "internal"))]
346    #[cfg(feature = "unstable")]
347    pub(crate) reliability: Reliability,
348    #[cfg(feature = "internal")]
349    pub destination: Locality,
350    #[cfg(not(feature = "internal"))]
351    pub(crate) destination: Locality,
352}
353
354impl Clone for PublisherBuilder<'_, '_> {
355    fn clone(&self) -> Self {
356        Self {
357            session: self.session,
358            key_expr: match &self.key_expr {
359                Ok(k) => Ok(k.clone()),
360                Err(e) => Err(zerror!("Cloned KE Error: {}", e).into()),
361            },
362            encoding: self.encoding.clone(),
363            congestion_control: self.congestion_control,
364            priority: self.priority,
365            is_express: self.is_express,
366            #[cfg(feature = "unstable")]
367            reliability: self.reliability,
368            destination: self.destination,
369        }
370    }
371}
372
373#[zenoh_macros::internal_trait]
374impl QoSBuilderTrait for PublisherBuilder<'_, '_> {
375    /// Changes the [`CongestionControl`](crate::qos::CongestionControl) to apply when routing the data.
376    #[inline]
377    fn congestion_control(self, congestion_control: CongestionControl) -> Self {
378        Self {
379            congestion_control,
380            ..self
381        }
382    }
383
384    /// Changes the [`Priority`](crate::qos::Priority) of the written data.
385    #[inline]
386    fn priority(self, priority: Priority) -> Self {
387        Self { priority, ..self }
388    }
389
390    /// Changes the Express policy to apply when routing the data.
391    ///
392    /// When express is set to `true`, then the message will not be batched.
393    /// This usually has a positive impact on latency but a negative impact on throughput.
394    #[inline]
395    fn express(self, is_express: bool) -> Self {
396        Self { is_express, ..self }
397    }
398}
399
400impl PublisherBuilder<'_, '_> {
401    /// Looks up whether any configured QoS overwrites apply to the builder's key expression.
402    /// Returns a new builder with the overwritten QoS parameters.
403    pub(crate) fn apply_qos_overwrites(self) -> Self {
404        let mut qos_overwrites = PublisherQoSConfig::default();
405        if let Ok(key_expr) = &self.key_expr {
406            // get overwritten builder
407            let state = zread!(self.session.0.state);
408            let nodes_including = state
409                .publisher_qos_tree
410                .nodes_including(key_expr)
411                .filter(|n| n.weight().is_some())
412                .collect_vec();
413            if let Some(node) = nodes_including.first() {
414                qos_overwrites = node
415                    .weight()
416                    .expect("first node weight should not be None")
417                    .clone();
418                if nodes_including.len() > 1 {
419                    tracing::warn!(
420                        "Publisher declared on `{}` which is included by multiple key_exprs in qos config ({}). Using qos config for `{}`",
421                        key_expr,
422                        nodes_including.iter().map(|n| n.keyexpr().to_string()).join(", "),
423                        node.keyexpr(),
424                    );
425                }
426            }
427        }
428
429        Self {
430            congestion_control: qos_overwrites
431                .congestion_control
432                .map(|cc| cc.into())
433                .unwrap_or(self.congestion_control),
434            priority: qos_overwrites
435                .priority
436                .map(|p| p.into())
437                .unwrap_or(self.priority),
438            is_express: qos_overwrites.express.unwrap_or(self.is_express),
439            #[cfg(feature = "unstable")]
440            reliability: qos_overwrites
441                .reliability
442                .map(|r| r.into())
443                .unwrap_or(self.reliability),
444            #[cfg(feature = "unstable")]
445            destination: qos_overwrites
446                .allowed_destination
447                .map(|d| d.into())
448                .unwrap_or(self.destination),
449            ..self
450        }
451    }
452
453    /// Changes the [`Locality`](crate::sample::Locality) applied when routing the data.
454    ///
455    /// This restricts the matching subscribers that will receive the published data to the ones
456    /// that have the given [`Locality`](crate::sample::Locality).
457    #[inline]
458    pub fn allowed_destination(mut self, destination: Locality) -> Self {
459        self.destination = destination;
460        self
461    }
462
463    /// Changes the [`Reliability`](crate::qos::Reliability) to apply when routing the data.
464    ///
465    /// **NOTE**: Currently `reliability` does not trigger any data retransmission on the wire. It
466    ///   is rather used as a marker on the wire and it may be used to select the best link
467    ///   available (e.g. TCP for reliable data and UDP for best effort data).
468    #[zenoh_macros::unstable]
469    #[inline]
470    pub fn reliability(self, reliability: Reliability) -> Self {
471        Self {
472            reliability,
473            ..self
474        }
475    }
476}
477
478impl<'b> Resolvable for PublisherBuilder<'_, 'b> {
479    type To = ZResult<Publisher<'b>>;
480}
481
482impl Wait for PublisherBuilder<'_, '_> {
483    fn wait(mut self) -> <Self as Resolvable>::To {
484        self = self.apply_qos_overwrites();
485        let mut key_expr = self.key_expr?;
486        key_expr = self.session.declare_keyexpr(key_expr).wait()?;
487        let id = self
488            .session
489            .0
490            .declare_publisher_inner(key_expr.clone(), self.destination)?;
491        Ok(Publisher {
492            session: self.session.downgrade(),
493            id,
494            key_expr,
495            encoding: self.encoding,
496            congestion_control: self.congestion_control,
497            priority: self.priority,
498            is_express: self.is_express,
499            destination: self.destination,
500            #[cfg(feature = "unstable")]
501            reliability: self.reliability,
502            matching_listeners: Default::default(),
503            undeclare_on_drop: true,
504        })
505    }
506}
507
508impl IntoFuture for PublisherBuilder<'_, '_> {
509    type Output = <Self as Resolvable>::To;
510    type IntoFuture = Ready<<Self as Resolvable>::To>;
511
512    fn into_future(self) -> Self::IntoFuture {
513        std::future::ready(self.wait())
514    }
515}
516
517impl Wait for PublicationBuilder<&Publisher<'_>, PublicationBuilderPut> {
518    fn wait(self) -> <Self as Resolvable>::To {
519        self.publisher.session.resolve_put(
520            &self.publisher.key_expr,
521            self.kind.payload,
522            SampleKind::Put,
523            self.kind.encoding,
524            self.publisher.congestion_control,
525            self.publisher.priority,
526            self.publisher.is_express,
527            self.publisher.destination,
528            #[cfg(feature = "unstable")]
529            self.publisher.reliability,
530            self.timestamp,
531            #[cfg(feature = "unstable")]
532            self.source_info,
533            self.attachment,
534        )
535    }
536}
537
538impl Wait for PublicationBuilder<&Publisher<'_>, PublicationBuilderDelete> {
539    fn wait(self) -> <Self as Resolvable>::To {
540        self.publisher.session.resolve_put(
541            &self.publisher.key_expr,
542            ZBytes::new(),
543            SampleKind::Delete,
544            Encoding::ZENOH_BYTES,
545            self.publisher.congestion_control,
546            self.publisher.priority,
547            self.publisher.is_express,
548            self.publisher.destination,
549            #[cfg(feature = "unstable")]
550            self.publisher.reliability,
551            self.timestamp,
552            #[cfg(feature = "unstable")]
553            self.source_info,
554            self.attachment,
555        )
556    }
557}
558
559impl IntoFuture for PublicationBuilder<&Publisher<'_>, PublicationBuilderPut> {
560    type Output = <Self as Resolvable>::To;
561    type IntoFuture = Ready<<Self as Resolvable>::To>;
562
563    fn into_future(self) -> Self::IntoFuture {
564        std::future::ready(self.wait())
565    }
566}
567
568impl IntoFuture for PublicationBuilder<&Publisher<'_>, PublicationBuilderDelete> {
569    type Output = <Self as Resolvable>::To;
570    type IntoFuture = Ready<<Self as Resolvable>::To>;
571
572    fn into_future(self) -> Self::IntoFuture {
573        std::future::ready(self.wait())
574    }
575}