1use std::future::{IntoFuture, Ready};
15
16use itertools::Itertools;
17use zenoh_config::qos::PublisherQoSConfig;
18use zenoh_core::{Resolvable, Result as ZResult, Wait};
19use zenoh_keyexpr::keyexpr_tree::{IKeyExprTree, IKeyExprTreeNode};
20use zenoh_protocol::core::CongestionControl;
21#[cfg(feature = "unstable")]
22use zenoh_protocol::core::Reliability;
23
24#[cfg(feature = "unstable")]
25use crate::api::sample::SourceInfo;
26use crate::{
27 api::{
28 builders::sample::{
29 EncodingBuilderTrait, QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait,
30 },
31 bytes::{OptionZBytes, ZBytes},
32 encoding::Encoding,
33 key_expr::KeyExpr,
34 publisher::{Priority, Publisher},
35 sample::{Locality, SampleKind},
36 },
37 Session,
38};
39
40pub type SessionPutBuilder<'a, 'b> =
41 PublicationBuilder<PublisherBuilder<'a, 'b>, PublicationBuilderPut>;
42
43pub type SessionDeleteBuilder<'a, 'b> =
44 PublicationBuilder<PublisherBuilder<'a, 'b>, PublicationBuilderDelete>;
45
46pub type PublisherPutBuilder<'a> = PublicationBuilder<&'a Publisher<'a>, PublicationBuilderPut>;
47
48pub type PublisherDeleteBuilder<'a> =
49 PublicationBuilder<&'a Publisher<'a>, PublicationBuilderDelete>;
50
51#[derive(Debug, Clone)]
52pub struct PublicationBuilderPut {
53 pub(crate) payload: ZBytes,
54 pub(crate) encoding: Encoding,
55}
56#[derive(Debug, Clone)]
57pub struct PublicationBuilderDelete;
58
59#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
83#[derive(Debug, Clone)]
84pub struct PublicationBuilder<P, T> {
85 pub(crate) publisher: P,
86 pub(crate) kind: T,
87 pub(crate) timestamp: Option<uhlc::Timestamp>,
88 #[cfg(feature = "unstable")]
89 pub(crate) source_info: SourceInfo,
90 pub(crate) attachment: Option<ZBytes>,
91}
92
93#[zenoh_macros::internal_trait]
94impl<T> QoSBuilderTrait for PublicationBuilder<PublisherBuilder<'_, '_>, T> {
95 #[inline]
97 fn congestion_control(self, congestion_control: CongestionControl) -> Self {
98 Self {
99 publisher: self.publisher.congestion_control(congestion_control),
100 ..self
101 }
102 }
103
104 #[inline]
106 fn priority(self, priority: Priority) -> Self {
107 Self {
108 publisher: self.publisher.priority(priority),
109 ..self
110 }
111 }
112
113 #[inline]
118 fn express(self, is_express: bool) -> Self {
119 Self {
120 publisher: self.publisher.express(is_express),
121 ..self
122 }
123 }
124}
125
126impl<T> PublicationBuilder<PublisherBuilder<'_, '_>, T> {
127 #[zenoh_macros::unstable]
132 #[inline]
133 pub fn allowed_destination(mut self, destination: Locality) -> Self {
134 self.publisher = self.publisher.allowed_destination(destination);
135 self
136 }
137
138 #[zenoh_macros::unstable]
144 #[inline]
145 pub fn reliability(self, reliability: Reliability) -> Self {
146 Self {
147 publisher: self.publisher.reliability(reliability),
148 ..self
149 }
150 }
151}
152
153#[zenoh_macros::internal_trait]
154impl EncodingBuilderTrait for PublisherBuilder<'_, '_> {
155 fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
156 Self {
157 encoding: encoding.into(),
158 ..self
159 }
160 }
161}
162
163#[zenoh_macros::internal_trait]
164impl<P> EncodingBuilderTrait for PublicationBuilder<P, PublicationBuilderPut> {
165 fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
166 Self {
167 kind: PublicationBuilderPut {
168 encoding: encoding.into(),
169 ..self.kind
170 },
171 ..self
172 }
173 }
174}
175
176#[zenoh_macros::internal_trait]
177impl<P, T> SampleBuilderTrait for PublicationBuilder<P, T> {
178 #[cfg(feature = "unstable")]
179 fn source_info(self, source_info: SourceInfo) -> Self {
180 Self {
181 source_info,
182 ..self
183 }
184 }
185 fn attachment<TA: Into<OptionZBytes>>(self, attachment: TA) -> Self {
186 let attachment: OptionZBytes = attachment.into();
187 Self {
188 attachment: attachment.into(),
189 ..self
190 }
191 }
192}
193
194#[zenoh_macros::internal_trait]
195impl<P, T> TimestampBuilderTrait for PublicationBuilder<P, T> {
196 fn timestamp<TS: Into<Option<uhlc::Timestamp>>>(self, timestamp: TS) -> Self {
197 Self {
198 timestamp: timestamp.into(),
199 ..self
200 }
201 }
202}
203
204impl<P, T> Resolvable for PublicationBuilder<P, T> {
205 type To = ZResult<()>;
206}
207
208impl Wait for PublicationBuilder<PublisherBuilder<'_, '_>, PublicationBuilderPut> {
209 #[inline]
210 fn wait(mut self) -> <Self as Resolvable>::To {
211 self.publisher = self.publisher.apply_qos_overwrites();
212 self.publisher.session.0.resolve_put(
213 &self.publisher.key_expr?,
214 self.kind.payload,
215 SampleKind::Put,
216 self.kind.encoding,
217 self.publisher.congestion_control,
218 self.publisher.priority,
219 self.publisher.is_express,
220 self.publisher.destination,
221 #[cfg(feature = "unstable")]
222 self.publisher.reliability,
223 self.timestamp,
224 #[cfg(feature = "unstable")]
225 self.source_info,
226 self.attachment,
227 )
228 }
229}
230
231impl Wait for PublicationBuilder<PublisherBuilder<'_, '_>, PublicationBuilderDelete> {
232 #[inline]
233 fn wait(mut self) -> <Self as Resolvable>::To {
234 self.publisher = self.publisher.apply_qos_overwrites();
235 self.publisher.session.0.resolve_put(
236 &self.publisher.key_expr?,
237 ZBytes::new(),
238 SampleKind::Delete,
239 Encoding::ZENOH_BYTES,
240 self.publisher.congestion_control,
241 self.publisher.priority,
242 self.publisher.is_express,
243 self.publisher.destination,
244 #[cfg(feature = "unstable")]
245 self.publisher.reliability,
246 self.timestamp,
247 #[cfg(feature = "unstable")]
248 self.source_info,
249 self.attachment,
250 )
251 }
252}
253
254impl IntoFuture for PublicationBuilder<PublisherBuilder<'_, '_>, PublicationBuilderPut> {
255 type Output = <Self as Resolvable>::To;
256 type IntoFuture = Ready<<Self as Resolvable>::To>;
257
258 fn into_future(self) -> Self::IntoFuture {
259 std::future::ready(self.wait())
260 }
261}
262
263impl IntoFuture for PublicationBuilder<PublisherBuilder<'_, '_>, PublicationBuilderDelete> {
264 type Output = <Self as Resolvable>::To;
265 type IntoFuture = Ready<<Self as Resolvable>::To>;
266
267 fn into_future(self) -> Self::IntoFuture {
268 std::future::ready(self.wait())
269 }
270}
271
272#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
289#[derive(Debug)]
290pub struct PublisherBuilder<'a, 'b> {
291 #[cfg(feature = "internal")]
292 pub session: &'a Session,
293 #[cfg(not(feature = "internal"))]
294 pub(crate) session: &'a Session,
295
296 #[cfg(feature = "internal")]
297 pub key_expr: ZResult<KeyExpr<'b>>,
298 #[cfg(not(feature = "internal"))]
299 pub(crate) key_expr: ZResult<KeyExpr<'b>>,
300
301 #[cfg(feature = "internal")]
302 pub encoding: Encoding,
303 #[cfg(not(feature = "internal"))]
304 pub(crate) encoding: Encoding,
305 #[cfg(feature = "internal")]
306 pub congestion_control: CongestionControl,
307 #[cfg(not(feature = "internal"))]
308 pub(crate) congestion_control: CongestionControl,
309 #[cfg(feature = "internal")]
310 pub priority: Priority,
311 #[cfg(not(feature = "internal"))]
312 pub(crate) priority: Priority,
313 #[cfg(feature = "internal")]
314 pub is_express: bool,
315 #[cfg(not(feature = "internal"))]
316 pub(crate) is_express: bool,
317 #[cfg(feature = "internal")]
318 #[cfg(feature = "unstable")]
319 pub reliability: Reliability,
320 #[cfg(not(feature = "internal"))]
321 #[cfg(feature = "unstable")]
322 pub(crate) reliability: Reliability,
323 #[cfg(feature = "internal")]
324 pub destination: Locality,
325 #[cfg(not(feature = "internal"))]
326 pub(crate) destination: Locality,
327}
328
329impl Clone for PublisherBuilder<'_, '_> {
330 fn clone(&self) -> Self {
331 Self {
332 session: self.session,
333 key_expr: match &self.key_expr {
334 Ok(k) => Ok(k.clone()),
335 Err(e) => Err(zerror!("Cloned KE Error: {}", e).into()),
336 },
337 encoding: self.encoding.clone(),
338 congestion_control: self.congestion_control,
339 priority: self.priority,
340 is_express: self.is_express,
341 #[cfg(feature = "unstable")]
342 reliability: self.reliability,
343 destination: self.destination,
344 }
345 }
346}
347
348#[zenoh_macros::internal_trait]
349impl QoSBuilderTrait for PublisherBuilder<'_, '_> {
350 #[inline]
352 fn congestion_control(self, congestion_control: CongestionControl) -> Self {
353 Self {
354 congestion_control,
355 ..self
356 }
357 }
358
359 #[inline]
361 fn priority(self, priority: Priority) -> Self {
362 Self { priority, ..self }
363 }
364
365 #[inline]
370 fn express(self, is_express: bool) -> Self {
371 Self { is_express, ..self }
372 }
373}
374
375impl PublisherBuilder<'_, '_> {
376 pub(crate) fn apply_qos_overwrites(self) -> Self {
379 let mut qos_overwrites = PublisherQoSConfig::default();
380 if let Ok(key_expr) = &self.key_expr {
381 let state = zread!(self.session.0.state);
383 let nodes_including = state
384 .publisher_qos_tree
385 .nodes_including(key_expr)
386 .filter(|n| n.weight().is_some())
387 .collect_vec();
388 if let Some(node) = nodes_including.first() {
389 qos_overwrites = node
390 .weight()
391 .expect("first node weight should not be None")
392 .clone();
393 if nodes_including.len() > 1 {
394 tracing::warn!(
395 "Publisher declared on `{}` which is included by multiple key_exprs in qos config ({}). Using qos config for `{}`",
396 key_expr,
397 nodes_including.iter().map(|n| n.keyexpr().to_string()).join(", "),
398 node.keyexpr(),
399 );
400 }
401 }
402 }
403
404 Self {
405 congestion_control: qos_overwrites
406 .congestion_control
407 .map(|cc| cc.into())
408 .unwrap_or(self.congestion_control),
409 priority: qos_overwrites
410 .priority
411 .map(|p| p.into())
412 .unwrap_or(self.priority),
413 is_express: qos_overwrites.express.unwrap_or(self.is_express),
414 #[cfg(feature = "unstable")]
415 reliability: qos_overwrites
416 .reliability
417 .map(|r| r.into())
418 .unwrap_or(self.reliability),
419 #[cfg(feature = "unstable")]
420 destination: qos_overwrites
421 .allowed_destination
422 .map(|d| d.into())
423 .unwrap_or(self.destination),
424 ..self
425 }
426 }
427
428 #[zenoh_macros::unstable]
433 #[inline]
434 pub fn allowed_destination(mut self, destination: Locality) -> Self {
435 self.destination = destination;
436 self
437 }
438
439 #[zenoh_macros::unstable]
445 #[inline]
446 pub fn reliability(self, reliability: Reliability) -> Self {
447 Self {
448 reliability,
449 ..self
450 }
451 }
452}
453
454impl<'b> Resolvable for PublisherBuilder<'_, 'b> {
455 type To = ZResult<Publisher<'b>>;
456}
457
458impl Wait for PublisherBuilder<'_, '_> {
459 fn wait(mut self) -> <Self as Resolvable>::To {
460 self = self.apply_qos_overwrites();
461 let mut key_expr = self.key_expr?;
462 if !key_expr.is_fully_optimized(&self.session.0) {
463 key_expr = self.session.declare_keyexpr(key_expr).wait()?;
464 }
465 let id = self
466 .session
467 .0
468 .declare_publisher_inner(key_expr.clone(), self.destination)?;
469 Ok(Publisher {
470 session: self.session.downgrade(),
471 id,
472 key_expr,
473 encoding: self.encoding,
474 congestion_control: self.congestion_control,
475 priority: self.priority,
476 is_express: self.is_express,
477 destination: self.destination,
478 #[cfg(feature = "unstable")]
479 reliability: self.reliability,
480 #[cfg(feature = "unstable")]
481 matching_listeners: Default::default(),
482 undeclare_on_drop: true,
483 })
484 }
485}
486
487impl IntoFuture for PublisherBuilder<'_, '_> {
488 type Output = <Self as Resolvable>::To;
489 type IntoFuture = Ready<<Self as Resolvable>::To>;
490
491 fn into_future(self) -> Self::IntoFuture {
492 std::future::ready(self.wait())
493 }
494}
495
496impl Wait for PublicationBuilder<&Publisher<'_>, PublicationBuilderPut> {
497 fn wait(self) -> <Self as Resolvable>::To {
498 self.publisher.session.resolve_put(
499 &self.publisher.key_expr,
500 self.kind.payload,
501 SampleKind::Put,
502 self.kind.encoding,
503 self.publisher.congestion_control,
504 self.publisher.priority,
505 self.publisher.is_express,
506 self.publisher.destination,
507 #[cfg(feature = "unstable")]
508 self.publisher.reliability,
509 self.timestamp,
510 #[cfg(feature = "unstable")]
511 self.source_info,
512 self.attachment,
513 )
514 }
515}
516
517impl Wait for PublicationBuilder<&Publisher<'_>, PublicationBuilderDelete> {
518 fn wait(self) -> <Self as Resolvable>::To {
519 self.publisher.session.resolve_put(
520 &self.publisher.key_expr,
521 ZBytes::new(),
522 SampleKind::Delete,
523 Encoding::ZENOH_BYTES,
524 self.publisher.congestion_control,
525 self.publisher.priority,
526 self.publisher.is_express,
527 self.publisher.destination,
528 #[cfg(feature = "unstable")]
529 self.publisher.reliability,
530 self.timestamp,
531 #[cfg(feature = "unstable")]
532 self.source_info,
533 self.attachment,
534 )
535 }
536}
537
538impl IntoFuture for PublicationBuilder<&Publisher<'_>, PublicationBuilderPut> {
539 type Output = <Self as Resolvable>::To;
540 type IntoFuture = Ready<<Self as Resolvable>::To>;
541
542 fn into_future(self) -> Self::IntoFuture {
543 std::future::ready(self.wait())
544 }
545}
546
547impl IntoFuture for PublicationBuilder<&Publisher<'_>, PublicationBuilderDelete> {
548 type Output = <Self as Resolvable>::To;
549 type IntoFuture = Ready<<Self as Resolvable>::To>;
550
551 fn into_future(self) -> Self::IntoFuture {
552 std::future::ready(self.wait())
553 }
554}