Skip to main content

zenoh/api/builders/
sample.rs

1//
2// Copyright (c) 2024 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::marker::PhantomData;
15
16use uhlc::Timestamp;
17use zenoh_core::zresult;
18use zenoh_protocol::core::CongestionControl;
19#[cfg(feature = "unstable")]
20use zenoh_protocol::core::Reliability;
21
22use crate::api::{
23    bytes::{OptionZBytes, ZBytes},
24    encoding::Encoding,
25    key_expr::KeyExpr,
26    publisher::Priority,
27    sample::{QoS, QoSBuilder, Sample, SampleKind},
28};
29#[zenoh_macros::internal]
30use crate::pubsub::{
31    PublicationBuilder, PublicationBuilderDelete, PublicationBuilderPut, Publisher,
32};
33#[cfg(feature = "unstable")]
34use crate::sample::SourceInfo;
35pub trait QoSBuilderTrait {
36    /// Change the `congestion_control` to apply when routing the data.
37    fn congestion_control(self, congestion_control: CongestionControl) -> Self;
38    /// Change the priority of the written data.
39    fn priority(self, priority: Priority) -> Self;
40    /// Change the `express` policy to apply when routing the data.
41    /// When express is set to `true`, then the message will not be batched.
42    /// This usually has a positive impact on latency but negative impact on throughput.
43    fn express(self, is_express: bool) -> Self;
44}
45
46pub trait TimestampBuilderTrait {
47    /// Sets or clears the timestamp
48    fn timestamp<T: Into<Option<Timestamp>>>(self, timestamp: T) -> Self;
49}
50
51pub trait SampleBuilderTrait {
52    /// Attach source information
53    #[zenoh_macros::unstable]
54    fn source_info<T: Into<Option<SourceInfo>>>(self, source_info: T) -> Self;
55    /// Attach user-provided data in key-value format
56    fn attachment<T: Into<OptionZBytes>>(self, attachment: T) -> Self;
57}
58
59pub trait EncodingBuilderTrait {
60    /// Set the [`Encoding`]
61    fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self;
62}
63
64/// The type modifier for a [`SampleBuilder`] to create a [`Put`](crate::sample::SampleKind::Put) sample.
65#[derive(Clone, Debug)]
66pub struct SampleBuilderPut;
67/// The type modifier for a [`SampleBuilder`] to create a [`Delete`](crate::sample::SampleKind::Delete) sample.
68#[derive(Clone, Debug)]
69pub struct SampleBuilderDelete;
70/// The type modifier for a [`SampleBuilder`] for the building stage
71/// when the sample [`kind`](crate::sample::Sample::kind) is not yet specified.
72///
73/// With this modifier the `SampleBuilder` can't be resolved;
74/// the selection of the kind must be done with [`SampleBuilder::put`] or
75/// [`SampleBuilder::delete`].
76#[derive(Clone, Debug)]
77pub struct SampleBuilderAny;
78
79/// A builder for [`Sample`](crate::sample::Sample)
80///
81/// As the `Sample` struct is not mutable, the `SampleBuilder` can be used to
82/// create or modify an existing `Sample` instance or to create a new one from scratch
83/// for storing it for later use.
84#[derive(Clone, Debug)]
85pub struct SampleBuilder<T> {
86    sample: Sample,
87    _t: PhantomData<T>,
88}
89
90impl SampleBuilder<SampleBuilderPut> {
91    pub fn put<IntoKeyExpr, IntoZBytes>(
92        key_expr: IntoKeyExpr,
93        payload: IntoZBytes,
94    ) -> SampleBuilder<SampleBuilderPut>
95    where
96        IntoKeyExpr: Into<KeyExpr<'static>>,
97        IntoZBytes: Into<ZBytes>,
98    {
99        Self {
100            sample: Sample {
101                key_expr: key_expr.into(),
102                payload: payload.into(),
103                kind: SampleKind::Put,
104                encoding: Encoding::default(),
105                timestamp: None,
106                qos: QoS::default(),
107                #[cfg(feature = "unstable")]
108                reliability: Reliability::DEFAULT,
109                #[cfg(feature = "unstable")]
110                source_info: None,
111                attachment: None,
112            },
113            _t: PhantomData::<SampleBuilderPut>,
114        }
115    }
116
117    pub fn payload<IntoZBytes>(mut self, payload: IntoZBytes) -> Self
118    where
119        IntoZBytes: Into<ZBytes>,
120    {
121        self.sample.payload = payload.into();
122        self
123    }
124}
125
126impl SampleBuilder<SampleBuilderDelete> {
127    pub fn delete<IntoKeyExpr>(key_expr: IntoKeyExpr) -> SampleBuilder<SampleBuilderDelete>
128    where
129        IntoKeyExpr: Into<KeyExpr<'static>>,
130    {
131        Self {
132            sample: Sample {
133                key_expr: key_expr.into(),
134                payload: ZBytes::new(),
135                kind: SampleKind::Delete,
136                encoding: Encoding::default(),
137                timestamp: None,
138                qos: QoS::default(),
139                #[cfg(feature = "unstable")]
140                reliability: Reliability::DEFAULT,
141                #[cfg(feature = "unstable")]
142                source_info: None,
143                attachment: None,
144            },
145            _t: PhantomData::<SampleBuilderDelete>,
146        }
147    }
148}
149
150impl<T> SampleBuilder<T> {
151    /// Allows changing the key expression of a [`Sample`]
152    pub fn keyexpr<IntoKeyExpr>(self, key_expr: IntoKeyExpr) -> Self
153    where
154        IntoKeyExpr: Into<KeyExpr<'static>>,
155    {
156        Self {
157            sample: Sample {
158                key_expr: key_expr.into(),
159                ..self.sample
160            },
161            _t: PhantomData::<T>,
162        }
163    }
164
165    // Allows changing the QoS of a [`Sample`] as a whole
166    pub(crate) fn qos(self, qos: QoS) -> Self {
167        Self {
168            sample: Sample { qos, ..self.sample },
169            _t: PhantomData::<T>,
170        }
171    }
172
173    #[zenoh_macros::unstable]
174    pub fn reliability(self, reliability: Reliability) -> Self {
175        Self {
176            sample: Sample {
177                reliability,
178                ..self.sample
179            },
180            _t: PhantomData::<T>,
181        }
182    }
183}
184
185#[zenoh_macros::internal_trait]
186impl<T> TimestampBuilderTrait for SampleBuilder<T> {
187    fn timestamp<U: Into<Option<Timestamp>>>(self, timestamp: U) -> Self {
188        Self {
189            sample: Sample {
190                timestamp: timestamp.into(),
191                ..self.sample
192            },
193            _t: PhantomData::<T>,
194        }
195    }
196}
197
198#[zenoh_macros::internal_trait]
199impl<T> SampleBuilderTrait for SampleBuilder<T> {
200    #[zenoh_macros::unstable]
201    fn source_info<S: Into<Option<SourceInfo>>>(self, source_info: S) -> Self {
202        Self {
203            sample: Sample {
204                source_info: source_info.into(),
205                ..self.sample
206            },
207            _t: PhantomData::<T>,
208        }
209    }
210
211    fn attachment<U: Into<OptionZBytes>>(self, attachment: U) -> Self {
212        let attachment: OptionZBytes = attachment.into();
213        Self {
214            sample: Sample {
215                attachment: attachment.into(),
216                ..self.sample
217            },
218            _t: PhantomData::<T>,
219        }
220    }
221}
222
223#[zenoh_macros::internal_trait]
224impl<T> QoSBuilderTrait for SampleBuilder<T> {
225    fn congestion_control(self, congestion_control: CongestionControl) -> Self {
226        let qos: QoSBuilder = self.sample.qos.into();
227        let qos = qos.congestion_control(congestion_control).into();
228        Self {
229            sample: Sample { qos, ..self.sample },
230            _t: PhantomData::<T>,
231        }
232    }
233    fn priority(self, priority: Priority) -> Self {
234        let qos: QoSBuilder = self.sample.qos.into();
235        let qos = qos.priority(priority).into();
236        Self {
237            sample: Sample { qos, ..self.sample },
238            _t: PhantomData::<T>,
239        }
240    }
241    fn express(self, is_express: bool) -> Self {
242        let qos: QoSBuilder = self.sample.qos.into();
243        let qos = qos.express(is_express).into();
244        Self {
245            sample: Sample { qos, ..self.sample },
246            _t: PhantomData::<T>,
247        }
248    }
249}
250
251#[zenoh_macros::internal_trait]
252impl EncodingBuilderTrait for SampleBuilder<SampleBuilderPut> {
253    fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
254        Self {
255            sample: Sample {
256                encoding: encoding.into(),
257                ..self.sample
258            },
259            _t: PhantomData::<SampleBuilderPut>,
260        }
261    }
262}
263
264impl From<Sample> for SampleBuilder<SampleBuilderAny> {
265    fn from(sample: Sample) -> Self {
266        SampleBuilder {
267            sample,
268            _t: PhantomData::<SampleBuilderAny>,
269        }
270    }
271}
272
273impl TryFrom<Sample> for SampleBuilder<SampleBuilderPut> {
274    type Error = zresult::Error;
275    fn try_from(sample: Sample) -> Result<Self, Self::Error> {
276        if sample.kind != SampleKind::Put {
277            bail!("Sample is not a put sample")
278        }
279        Ok(SampleBuilder {
280            sample,
281            _t: PhantomData::<SampleBuilderPut>,
282        })
283    }
284}
285
286impl TryFrom<Sample> for SampleBuilder<SampleBuilderDelete> {
287    type Error = zresult::Error;
288    fn try_from(sample: Sample) -> Result<Self, Self::Error> {
289        if sample.kind != SampleKind::Delete {
290            bail!("Sample is not a delete sample")
291        }
292        Ok(SampleBuilder {
293            sample,
294            _t: PhantomData::<SampleBuilderDelete>,
295        })
296    }
297}
298
299impl<T> From<SampleBuilder<T>> for Sample {
300    fn from(sample_builder: SampleBuilder<T>) -> Self {
301        sample_builder.sample
302    }
303}
304
305#[zenoh_macros::internal]
306impl From<&PublicationBuilder<&Publisher<'_>, PublicationBuilderPut>> for Sample {
307    fn from(builder: &PublicationBuilder<&Publisher<'_>, PublicationBuilderPut>) -> Self {
308        Sample {
309            key_expr: builder.publisher.key_expr.clone().into_owned(),
310            payload: builder.kind.payload.clone(),
311            kind: SampleKind::Put,
312            encoding: builder.kind.encoding.clone(),
313            timestamp: builder.timestamp,
314            qos: QoSBuilder::from(QoS::default())
315                .congestion_control(builder.publisher.congestion_control)
316                .priority(builder.publisher.priority)
317                .express(builder.publisher.is_express)
318                .into(),
319            #[cfg(feature = "unstable")]
320            reliability: builder.publisher.reliability,
321            #[cfg(feature = "unstable")]
322            source_info: builder.source_info.clone(),
323            attachment: builder.attachment.clone(),
324        }
325    }
326}
327
328#[zenoh_macros::internal]
329impl From<&PublicationBuilder<&Publisher<'_>, PublicationBuilderDelete>> for Sample {
330    fn from(builder: &PublicationBuilder<&Publisher<'_>, PublicationBuilderDelete>) -> Self {
331        Sample {
332            key_expr: builder.publisher.key_expr.clone().into_owned(),
333            payload: ZBytes::new(),
334            kind: SampleKind::Put,
335            encoding: Encoding::ZENOH_BYTES,
336            timestamp: builder.timestamp,
337            qos: QoSBuilder::from(QoS::default())
338                .congestion_control(builder.publisher.congestion_control)
339                .priority(builder.publisher.priority)
340                .express(builder.publisher.is_express)
341                .into(),
342            #[cfg(feature = "unstable")]
343            reliability: builder.publisher.reliability,
344            #[cfg(feature = "unstable")]
345            source_info: builder.source_info.clone(),
346            attachment: builder.attachment.clone(),
347        }
348    }
349}