1use std::marker::PhantomData;
15
16use uhlc::Timestamp;
17use zenoh_core::zresult;
18use zenoh_protocol::core::CongestionControl;
19#[cfg(feature = "unstable")]
20use zenoh_protocol::core::Reliability;
21
22use crate::api::{
23 bytes::{OptionZBytes, ZBytes},
24 encoding::Encoding,
25 key_expr::KeyExpr,
26 publisher::Priority,
27 sample::{QoS, QoSBuilder, Sample, SampleKind},
28};
29#[zenoh_macros::internal]
30use crate::pubsub::{
31 PublicationBuilder, PublicationBuilderDelete, PublicationBuilderPut, Publisher,
32};
33#[cfg(feature = "unstable")]
34use crate::sample::SourceInfo;
35pub trait QoSBuilderTrait {
36 fn congestion_control(self, congestion_control: CongestionControl) -> Self;
38 fn priority(self, priority: Priority) -> Self;
40 fn express(self, is_express: bool) -> Self;
44}
45
46pub trait TimestampBuilderTrait {
47 fn timestamp<T: Into<Option<Timestamp>>>(self, timestamp: T) -> Self;
49}
50
51pub trait SampleBuilderTrait {
52 #[zenoh_macros::unstable]
54 fn source_info<T: Into<Option<SourceInfo>>>(self, source_info: T) -> Self;
55 fn attachment<T: Into<OptionZBytes>>(self, attachment: T) -> Self;
57}
58
59pub trait EncodingBuilderTrait {
60 fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self;
62}
63
64#[derive(Clone, Debug)]
66pub struct SampleBuilderPut;
67#[derive(Clone, Debug)]
69pub struct SampleBuilderDelete;
70#[derive(Clone, Debug)]
77pub struct SampleBuilderAny;
78
79#[derive(Clone, Debug)]
85pub struct SampleBuilder<T> {
86 sample: Sample,
87 _t: PhantomData<T>,
88}
89
90impl SampleBuilder<SampleBuilderPut> {
91 pub fn put<IntoKeyExpr, IntoZBytes>(
92 key_expr: IntoKeyExpr,
93 payload: IntoZBytes,
94 ) -> SampleBuilder<SampleBuilderPut>
95 where
96 IntoKeyExpr: Into<KeyExpr<'static>>,
97 IntoZBytes: Into<ZBytes>,
98 {
99 Self {
100 sample: Sample {
101 key_expr: key_expr.into(),
102 payload: payload.into(),
103 kind: SampleKind::Put,
104 encoding: Encoding::default(),
105 timestamp: None,
106 qos: QoS::default(),
107 #[cfg(feature = "unstable")]
108 reliability: Reliability::DEFAULT,
109 #[cfg(feature = "unstable")]
110 source_info: None,
111 attachment: None,
112 },
113 _t: PhantomData::<SampleBuilderPut>,
114 }
115 }
116
117 pub fn payload<IntoZBytes>(mut self, payload: IntoZBytes) -> Self
118 where
119 IntoZBytes: Into<ZBytes>,
120 {
121 self.sample.payload = payload.into();
122 self
123 }
124}
125
126impl SampleBuilder<SampleBuilderDelete> {
127 pub fn delete<IntoKeyExpr>(key_expr: IntoKeyExpr) -> SampleBuilder<SampleBuilderDelete>
128 where
129 IntoKeyExpr: Into<KeyExpr<'static>>,
130 {
131 Self {
132 sample: Sample {
133 key_expr: key_expr.into(),
134 payload: ZBytes::new(),
135 kind: SampleKind::Delete,
136 encoding: Encoding::default(),
137 timestamp: None,
138 qos: QoS::default(),
139 #[cfg(feature = "unstable")]
140 reliability: Reliability::DEFAULT,
141 #[cfg(feature = "unstable")]
142 source_info: None,
143 attachment: None,
144 },
145 _t: PhantomData::<SampleBuilderDelete>,
146 }
147 }
148}
149
150impl<T> SampleBuilder<T> {
151 pub fn keyexpr<IntoKeyExpr>(self, key_expr: IntoKeyExpr) -> Self
153 where
154 IntoKeyExpr: Into<KeyExpr<'static>>,
155 {
156 Self {
157 sample: Sample {
158 key_expr: key_expr.into(),
159 ..self.sample
160 },
161 _t: PhantomData::<T>,
162 }
163 }
164
165 pub(crate) fn qos(self, qos: QoS) -> Self {
167 Self {
168 sample: Sample { qos, ..self.sample },
169 _t: PhantomData::<T>,
170 }
171 }
172
173 #[zenoh_macros::unstable]
174 pub fn reliability(self, reliability: Reliability) -> Self {
175 Self {
176 sample: Sample {
177 reliability,
178 ..self.sample
179 },
180 _t: PhantomData::<T>,
181 }
182 }
183}
184
185#[zenoh_macros::internal_trait]
186impl<T> TimestampBuilderTrait for SampleBuilder<T> {
187 fn timestamp<U: Into<Option<Timestamp>>>(self, timestamp: U) -> Self {
188 Self {
189 sample: Sample {
190 timestamp: timestamp.into(),
191 ..self.sample
192 },
193 _t: PhantomData::<T>,
194 }
195 }
196}
197
198#[zenoh_macros::internal_trait]
199impl<T> SampleBuilderTrait for SampleBuilder<T> {
200 #[zenoh_macros::unstable]
201 fn source_info<S: Into<Option<SourceInfo>>>(self, source_info: S) -> Self {
202 Self {
203 sample: Sample {
204 source_info: source_info.into(),
205 ..self.sample
206 },
207 _t: PhantomData::<T>,
208 }
209 }
210
211 fn attachment<U: Into<OptionZBytes>>(self, attachment: U) -> Self {
212 let attachment: OptionZBytes = attachment.into();
213 Self {
214 sample: Sample {
215 attachment: attachment.into(),
216 ..self.sample
217 },
218 _t: PhantomData::<T>,
219 }
220 }
221}
222
223#[zenoh_macros::internal_trait]
224impl<T> QoSBuilderTrait for SampleBuilder<T> {
225 fn congestion_control(self, congestion_control: CongestionControl) -> Self {
226 let qos: QoSBuilder = self.sample.qos.into();
227 let qos = qos.congestion_control(congestion_control).into();
228 Self {
229 sample: Sample { qos, ..self.sample },
230 _t: PhantomData::<T>,
231 }
232 }
233 fn priority(self, priority: Priority) -> Self {
234 let qos: QoSBuilder = self.sample.qos.into();
235 let qos = qos.priority(priority).into();
236 Self {
237 sample: Sample { qos, ..self.sample },
238 _t: PhantomData::<T>,
239 }
240 }
241 fn express(self, is_express: bool) -> Self {
242 let qos: QoSBuilder = self.sample.qos.into();
243 let qos = qos.express(is_express).into();
244 Self {
245 sample: Sample { qos, ..self.sample },
246 _t: PhantomData::<T>,
247 }
248 }
249}
250
251#[zenoh_macros::internal_trait]
252impl EncodingBuilderTrait for SampleBuilder<SampleBuilderPut> {
253 fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
254 Self {
255 sample: Sample {
256 encoding: encoding.into(),
257 ..self.sample
258 },
259 _t: PhantomData::<SampleBuilderPut>,
260 }
261 }
262}
263
264impl From<Sample> for SampleBuilder<SampleBuilderAny> {
265 fn from(sample: Sample) -> Self {
266 SampleBuilder {
267 sample,
268 _t: PhantomData::<SampleBuilderAny>,
269 }
270 }
271}
272
273impl TryFrom<Sample> for SampleBuilder<SampleBuilderPut> {
274 type Error = zresult::Error;
275 fn try_from(sample: Sample) -> Result<Self, Self::Error> {
276 if sample.kind != SampleKind::Put {
277 bail!("Sample is not a put sample")
278 }
279 Ok(SampleBuilder {
280 sample,
281 _t: PhantomData::<SampleBuilderPut>,
282 })
283 }
284}
285
286impl TryFrom<Sample> for SampleBuilder<SampleBuilderDelete> {
287 type Error = zresult::Error;
288 fn try_from(sample: Sample) -> Result<Self, Self::Error> {
289 if sample.kind != SampleKind::Delete {
290 bail!("Sample is not a delete sample")
291 }
292 Ok(SampleBuilder {
293 sample,
294 _t: PhantomData::<SampleBuilderDelete>,
295 })
296 }
297}
298
299impl<T> From<SampleBuilder<T>> for Sample {
300 fn from(sample_builder: SampleBuilder<T>) -> Self {
301 sample_builder.sample
302 }
303}
304
305#[zenoh_macros::internal]
306impl From<&PublicationBuilder<&Publisher<'_>, PublicationBuilderPut>> for Sample {
307 fn from(builder: &PublicationBuilder<&Publisher<'_>, PublicationBuilderPut>) -> Self {
308 Sample {
309 key_expr: builder.publisher.key_expr.clone().into_owned(),
310 payload: builder.kind.payload.clone(),
311 kind: SampleKind::Put,
312 encoding: builder.kind.encoding.clone(),
313 timestamp: builder.timestamp,
314 qos: QoSBuilder::from(QoS::default())
315 .congestion_control(builder.publisher.congestion_control)
316 .priority(builder.publisher.priority)
317 .express(builder.publisher.is_express)
318 .into(),
319 #[cfg(feature = "unstable")]
320 reliability: builder.publisher.reliability,
321 #[cfg(feature = "unstable")]
322 source_info: builder.source_info.clone(),
323 attachment: builder.attachment.clone(),
324 }
325 }
326}
327
328#[zenoh_macros::internal]
329impl From<&PublicationBuilder<&Publisher<'_>, PublicationBuilderDelete>> for Sample {
330 fn from(builder: &PublicationBuilder<&Publisher<'_>, PublicationBuilderDelete>) -> Self {
331 Sample {
332 key_expr: builder.publisher.key_expr.clone().into_owned(),
333 payload: ZBytes::new(),
334 kind: SampleKind::Put,
335 encoding: Encoding::ZENOH_BYTES,
336 timestamp: builder.timestamp,
337 qos: QoSBuilder::from(QoS::default())
338 .congestion_control(builder.publisher.congestion_control)
339 .priority(builder.publisher.priority)
340 .express(builder.publisher.is_express)
341 .into(),
342 #[cfg(feature = "unstable")]
343 reliability: builder.publisher.reliability,
344 #[cfg(feature = "unstable")]
345 source_info: builder.source_info.clone(),
346 attachment: builder.attachment.clone(),
347 }
348 }
349}