zenoh/api/builders/
publisher.rs

1//
2// Copyright (c) 2024 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::future::{IntoFuture, Ready};
15
16use itertools::Itertools;
17use zenoh_config::qos::PublisherQoSConfig;
18use zenoh_core::{Resolvable, Result as ZResult, Wait};
19use zenoh_keyexpr::keyexpr_tree::{IKeyExprTree, IKeyExprTreeNode};
20use zenoh_protocol::core::CongestionControl;
21#[cfg(feature = "unstable")]
22use zenoh_protocol::core::Reliability;
23
24#[cfg(feature = "unstable")]
25use crate::api::sample::SourceInfo;
26use crate::{
27    api::{
28        builders::sample::{
29            EncodingBuilderTrait, QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait,
30        },
31        bytes::{OptionZBytes, ZBytes},
32        encoding::Encoding,
33        key_expr::KeyExpr,
34        publisher::{Priority, Publisher},
35        sample::{Locality, SampleKind},
36    },
37    Session,
38};
39
40pub type SessionPutBuilder<'a, 'b> =
41    PublicationBuilder<PublisherBuilder<'a, 'b>, PublicationBuilderPut>;
42
43pub type SessionDeleteBuilder<'a, 'b> =
44    PublicationBuilder<PublisherBuilder<'a, 'b>, PublicationBuilderDelete>;
45
46pub type PublisherPutBuilder<'a> = PublicationBuilder<&'a Publisher<'a>, PublicationBuilderPut>;
47
48pub type PublisherDeleteBuilder<'a> =
49    PublicationBuilder<&'a Publisher<'a>, PublicationBuilderDelete>;
50
51#[derive(Debug, Clone)]
52pub struct PublicationBuilderPut {
53    pub(crate) payload: ZBytes,
54    pub(crate) encoding: Encoding,
55}
56#[derive(Debug, Clone)]
57pub struct PublicationBuilderDelete;
58
59/// A publication builder.
60///
61/// This object is returned by the following methods:
62/// - [`crate::session::Session::put`]
63/// - [`crate::session::Session::delete`]
64/// - [`crate::pubsub::Publisher::put`]
65/// - [`crate::pubsub::Publisher::delete`]
66///
67/// # Examples
68/// ```
69/// # #[tokio::main]
70/// # async fn main() {
71/// use zenoh::{bytes::Encoding, qos::CongestionControl};
72///
73/// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
74/// session
75///     .put("key/expression", "payload")
76///     .encoding(Encoding::TEXT_PLAIN)
77///     .congestion_control(CongestionControl::Block)
78///     .await
79///     .unwrap();
80/// # }
81/// ```
82#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
83#[derive(Debug, Clone)]
84pub struct PublicationBuilder<P, T> {
85    pub(crate) publisher: P,
86    pub(crate) kind: T,
87    pub(crate) timestamp: Option<uhlc::Timestamp>,
88    #[cfg(feature = "unstable")]
89    pub(crate) source_info: SourceInfo,
90    pub(crate) attachment: Option<ZBytes>,
91}
92
93#[zenoh_macros::internal_trait]
94impl<T> QoSBuilderTrait for PublicationBuilder<PublisherBuilder<'_, '_>, T> {
95    /// Changes the [`crate::qos::CongestionControl`] to apply when routing the data.
96    #[inline]
97    fn congestion_control(self, congestion_control: CongestionControl) -> Self {
98        Self {
99            publisher: self.publisher.congestion_control(congestion_control),
100            ..self
101        }
102    }
103
104    /// Changes the [`crate::qos::Priority`] of the written data.
105    #[inline]
106    fn priority(self, priority: Priority) -> Self {
107        Self {
108            publisher: self.publisher.priority(priority),
109            ..self
110        }
111    }
112
113    /// Changes the Express policy to apply when routing the data.
114    ///
115    /// When express is set to `true`, then the message will not be batched.
116    /// This usually has a positive impact on latency but negative impact on throughput.
117    #[inline]
118    fn express(self, is_express: bool) -> Self {
119        Self {
120            publisher: self.publisher.express(is_express),
121            ..self
122        }
123    }
124}
125
126impl<T> PublicationBuilder<PublisherBuilder<'_, '_>, T> {
127    /// Changes the [`crate::sample::Locality`] applied when routing the data.
128    ///
129    /// This restricts the matching subscribers that will receive the published data to the ones
130    /// that have the given [`crate::sample::Locality`].
131    #[zenoh_macros::unstable]
132    #[inline]
133    pub fn allowed_destination(mut self, destination: Locality) -> Self {
134        self.publisher = self.publisher.allowed_destination(destination);
135        self
136    }
137
138    /// Changes the [`crate::qos::Reliability`] to apply when routing the data.
139    ///
140    /// **NOTE**: Currently `reliability` does not trigger any data retransmission on the wire. It
141    ///   is rather used as a marker on the wire and it may be used to select the best link
142    ///   available (e.g. TCP for reliable data and UDP for best effort data).
143    #[zenoh_macros::unstable]
144    #[inline]
145    pub fn reliability(self, reliability: Reliability) -> Self {
146        Self {
147            publisher: self.publisher.reliability(reliability),
148            ..self
149        }
150    }
151}
152
153#[zenoh_macros::internal_trait]
154impl EncodingBuilderTrait for PublisherBuilder<'_, '_> {
155    fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
156        Self {
157            encoding: encoding.into(),
158            ..self
159        }
160    }
161}
162
163#[zenoh_macros::internal_trait]
164impl<P> EncodingBuilderTrait for PublicationBuilder<P, PublicationBuilderPut> {
165    fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
166        Self {
167            kind: PublicationBuilderPut {
168                encoding: encoding.into(),
169                ..self.kind
170            },
171            ..self
172        }
173    }
174}
175
176#[zenoh_macros::internal_trait]
177impl<P, T> SampleBuilderTrait for PublicationBuilder<P, T> {
178    #[cfg(feature = "unstable")]
179    fn source_info(self, source_info: SourceInfo) -> Self {
180        Self {
181            source_info,
182            ..self
183        }
184    }
185    fn attachment<TA: Into<OptionZBytes>>(self, attachment: TA) -> Self {
186        let attachment: OptionZBytes = attachment.into();
187        Self {
188            attachment: attachment.into(),
189            ..self
190        }
191    }
192}
193
194#[zenoh_macros::internal_trait]
195impl<P, T> TimestampBuilderTrait for PublicationBuilder<P, T> {
196    fn timestamp<TS: Into<Option<uhlc::Timestamp>>>(self, timestamp: TS) -> Self {
197        Self {
198            timestamp: timestamp.into(),
199            ..self
200        }
201    }
202}
203
204impl<P, T> Resolvable for PublicationBuilder<P, T> {
205    type To = ZResult<()>;
206}
207
208impl Wait for PublicationBuilder<PublisherBuilder<'_, '_>, PublicationBuilderPut> {
209    #[inline]
210    fn wait(mut self) -> <Self as Resolvable>::To {
211        self.publisher = self.publisher.apply_qos_overwrites();
212        self.publisher.session.0.resolve_put(
213            &self.publisher.key_expr?,
214            self.kind.payload,
215            SampleKind::Put,
216            self.kind.encoding,
217            self.publisher.congestion_control,
218            self.publisher.priority,
219            self.publisher.is_express,
220            self.publisher.destination,
221            #[cfg(feature = "unstable")]
222            self.publisher.reliability,
223            self.timestamp,
224            #[cfg(feature = "unstable")]
225            self.source_info,
226            self.attachment,
227        )
228    }
229}
230
231impl Wait for PublicationBuilder<PublisherBuilder<'_, '_>, PublicationBuilderDelete> {
232    #[inline]
233    fn wait(mut self) -> <Self as Resolvable>::To {
234        self.publisher = self.publisher.apply_qos_overwrites();
235        self.publisher.session.0.resolve_put(
236            &self.publisher.key_expr?,
237            ZBytes::new(),
238            SampleKind::Delete,
239            Encoding::ZENOH_BYTES,
240            self.publisher.congestion_control,
241            self.publisher.priority,
242            self.publisher.is_express,
243            self.publisher.destination,
244            #[cfg(feature = "unstable")]
245            self.publisher.reliability,
246            self.timestamp,
247            #[cfg(feature = "unstable")]
248            self.source_info,
249            self.attachment,
250        )
251    }
252}
253
254impl IntoFuture for PublicationBuilder<PublisherBuilder<'_, '_>, PublicationBuilderPut> {
255    type Output = <Self as Resolvable>::To;
256    type IntoFuture = Ready<<Self as Resolvable>::To>;
257
258    fn into_future(self) -> Self::IntoFuture {
259        std::future::ready(self.wait())
260    }
261}
262
263impl IntoFuture for PublicationBuilder<PublisherBuilder<'_, '_>, PublicationBuilderDelete> {
264    type Output = <Self as Resolvable>::To;
265    type IntoFuture = Ready<<Self as Resolvable>::To>;
266
267    fn into_future(self) -> Self::IntoFuture {
268        std::future::ready(self.wait())
269    }
270}
271
272/// A builder for initializing a [`Publisher`].
273///
274/// # Examples
275/// ```
276/// # #[tokio::main]
277/// # async fn main() {
278/// use zenoh::qos::CongestionControl;
279///
280/// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
281/// let publisher = session
282///     .declare_publisher("key/expression")
283///     .congestion_control(CongestionControl::Block)
284///     .await
285///     .unwrap();
286/// # }
287/// ```
288#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
289#[derive(Debug)]
290pub struct PublisherBuilder<'a, 'b> {
291    #[cfg(feature = "internal")]
292    pub session: &'a Session,
293    #[cfg(not(feature = "internal"))]
294    pub(crate) session: &'a Session,
295
296    #[cfg(feature = "internal")]
297    pub key_expr: ZResult<KeyExpr<'b>>,
298    #[cfg(not(feature = "internal"))]
299    pub(crate) key_expr: ZResult<KeyExpr<'b>>,
300
301    #[cfg(feature = "internal")]
302    pub encoding: Encoding,
303    #[cfg(not(feature = "internal"))]
304    pub(crate) encoding: Encoding,
305    #[cfg(feature = "internal")]
306    pub congestion_control: CongestionControl,
307    #[cfg(not(feature = "internal"))]
308    pub(crate) congestion_control: CongestionControl,
309    #[cfg(feature = "internal")]
310    pub priority: Priority,
311    #[cfg(not(feature = "internal"))]
312    pub(crate) priority: Priority,
313    #[cfg(feature = "internal")]
314    pub is_express: bool,
315    #[cfg(not(feature = "internal"))]
316    pub(crate) is_express: bool,
317    #[cfg(feature = "internal")]
318    #[cfg(feature = "unstable")]
319    pub reliability: Reliability,
320    #[cfg(not(feature = "internal"))]
321    #[cfg(feature = "unstable")]
322    pub(crate) reliability: Reliability,
323    #[cfg(feature = "internal")]
324    pub destination: Locality,
325    #[cfg(not(feature = "internal"))]
326    pub(crate) destination: Locality,
327}
328
329impl Clone for PublisherBuilder<'_, '_> {
330    fn clone(&self) -> Self {
331        Self {
332            session: self.session,
333            key_expr: match &self.key_expr {
334                Ok(k) => Ok(k.clone()),
335                Err(e) => Err(zerror!("Cloned KE Error: {}", e).into()),
336            },
337            encoding: self.encoding.clone(),
338            congestion_control: self.congestion_control,
339            priority: self.priority,
340            is_express: self.is_express,
341            #[cfg(feature = "unstable")]
342            reliability: self.reliability,
343            destination: self.destination,
344        }
345    }
346}
347
348#[zenoh_macros::internal_trait]
349impl QoSBuilderTrait for PublisherBuilder<'_, '_> {
350    /// Changes the [`crate::qos::CongestionControl`] to apply when routing the data.
351    #[inline]
352    fn congestion_control(self, congestion_control: CongestionControl) -> Self {
353        Self {
354            congestion_control,
355            ..self
356        }
357    }
358
359    /// Changes the [`crate::qos::Priority`] of the written data.
360    #[inline]
361    fn priority(self, priority: Priority) -> Self {
362        Self { priority, ..self }
363    }
364
365    /// Changes the Express policy to apply when routing the data.
366    ///
367    /// When express is set to `true`, then the message will not be batched.
368    /// This usually has a positive impact on latency but negative impact on throughput.
369    #[inline]
370    fn express(self, is_express: bool) -> Self {
371        Self { is_express, ..self }
372    }
373}
374
375impl PublisherBuilder<'_, '_> {
376    /// Looks up if any configured QoS overwrites apply on the builder's key expression.
377    /// Returns a new builder with the overwritten QoS parameters.
378    pub(crate) fn apply_qos_overwrites(self) -> Self {
379        let mut qos_overwrites = PublisherQoSConfig::default();
380        if let Ok(key_expr) = &self.key_expr {
381            // get overwritten builder
382            let state = zread!(self.session.0.state);
383            let nodes_including = state
384                .publisher_qos_tree
385                .nodes_including(key_expr)
386                .filter(|n| n.weight().is_some())
387                .collect_vec();
388            if let Some(node) = nodes_including.first() {
389                qos_overwrites = node
390                    .weight()
391                    .expect("first node weight should not be None")
392                    .clone();
393                if nodes_including.len() > 1 {
394                    tracing::warn!(
395                        "Publisher declared on `{}` which is included by multiple key_exprs in qos config ({}). Using qos config for `{}`",
396                        key_expr,
397                        nodes_including.iter().map(|n| n.keyexpr().to_string()).join(", "),
398                        node.keyexpr(),
399                    );
400                }
401            }
402        }
403
404        Self {
405            congestion_control: qos_overwrites
406                .congestion_control
407                .map(|cc| cc.into())
408                .unwrap_or(self.congestion_control),
409            priority: qos_overwrites
410                .priority
411                .map(|p| p.into())
412                .unwrap_or(self.priority),
413            is_express: qos_overwrites.express.unwrap_or(self.is_express),
414            #[cfg(feature = "unstable")]
415            reliability: qos_overwrites
416                .reliability
417                .map(|r| r.into())
418                .unwrap_or(self.reliability),
419            #[cfg(feature = "unstable")]
420            destination: qos_overwrites
421                .allowed_destination
422                .map(|d| d.into())
423                .unwrap_or(self.destination),
424            ..self
425        }
426    }
427
428    /// Changes the [`crate::sample::Locality`] applied when routing the data.
429    ///
430    /// This restricts the matching subscribers that will receive the published data to the ones
431    /// that have the given [`crate::sample::Locality`].
432    #[zenoh_macros::unstable]
433    #[inline]
434    pub fn allowed_destination(mut self, destination: Locality) -> Self {
435        self.destination = destination;
436        self
437    }
438
439    /// Changes the [`crate::qos::Reliability`] to apply when routing the data.
440    ///
441    /// **NOTE**: Currently `reliability` does not trigger any data retransmission on the wire. It
442    ///   is rather used as a marker on the wire and it may be used to select the best link
443    ///   available (e.g. TCP for reliable data and UDP for best effort data).
444    #[zenoh_macros::unstable]
445    #[inline]
446    pub fn reliability(self, reliability: Reliability) -> Self {
447        Self {
448            reliability,
449            ..self
450        }
451    }
452}
453
454impl<'b> Resolvable for PublisherBuilder<'_, 'b> {
455    type To = ZResult<Publisher<'b>>;
456}
457
458impl Wait for PublisherBuilder<'_, '_> {
459    fn wait(mut self) -> <Self as Resolvable>::To {
460        self = self.apply_qos_overwrites();
461        let mut key_expr = self.key_expr?;
462        if !key_expr.is_fully_optimized(&self.session.0) {
463            key_expr = self.session.declare_keyexpr(key_expr).wait()?;
464        }
465        let id = self
466            .session
467            .0
468            .declare_publisher_inner(key_expr.clone(), self.destination)?;
469        Ok(Publisher {
470            session: self.session.downgrade(),
471            id,
472            key_expr,
473            encoding: self.encoding,
474            congestion_control: self.congestion_control,
475            priority: self.priority,
476            is_express: self.is_express,
477            destination: self.destination,
478            #[cfg(feature = "unstable")]
479            reliability: self.reliability,
480            #[cfg(feature = "unstable")]
481            matching_listeners: Default::default(),
482            undeclare_on_drop: true,
483        })
484    }
485}
486
487impl IntoFuture for PublisherBuilder<'_, '_> {
488    type Output = <Self as Resolvable>::To;
489    type IntoFuture = Ready<<Self as Resolvable>::To>;
490
491    fn into_future(self) -> Self::IntoFuture {
492        std::future::ready(self.wait())
493    }
494}
495
496impl Wait for PublicationBuilder<&Publisher<'_>, PublicationBuilderPut> {
497    fn wait(self) -> <Self as Resolvable>::To {
498        self.publisher.session.resolve_put(
499            &self.publisher.key_expr,
500            self.kind.payload,
501            SampleKind::Put,
502            self.kind.encoding,
503            self.publisher.congestion_control,
504            self.publisher.priority,
505            self.publisher.is_express,
506            self.publisher.destination,
507            #[cfg(feature = "unstable")]
508            self.publisher.reliability,
509            self.timestamp,
510            #[cfg(feature = "unstable")]
511            self.source_info,
512            self.attachment,
513        )
514    }
515}
516
517impl Wait for PublicationBuilder<&Publisher<'_>, PublicationBuilderDelete> {
518    fn wait(self) -> <Self as Resolvable>::To {
519        self.publisher.session.resolve_put(
520            &self.publisher.key_expr,
521            ZBytes::new(),
522            SampleKind::Delete,
523            Encoding::ZENOH_BYTES,
524            self.publisher.congestion_control,
525            self.publisher.priority,
526            self.publisher.is_express,
527            self.publisher.destination,
528            #[cfg(feature = "unstable")]
529            self.publisher.reliability,
530            self.timestamp,
531            #[cfg(feature = "unstable")]
532            self.source_info,
533            self.attachment,
534        )
535    }
536}
537
538impl IntoFuture for PublicationBuilder<&Publisher<'_>, PublicationBuilderPut> {
539    type Output = <Self as Resolvable>::To;
540    type IntoFuture = Ready<<Self as Resolvable>::To>;
541
542    fn into_future(self) -> Self::IntoFuture {
543        std::future::ready(self.wait())
544    }
545}
546
547impl IntoFuture for PublicationBuilder<&Publisher<'_>, PublicationBuilderDelete> {
548    type Output = <Self as Resolvable>::To;
549    type IntoFuture = Ready<<Self as Resolvable>::To>;
550
551    fn into_future(self) -> Self::IntoFuture {
552        std::future::ready(self.wait())
553    }
554}