1use std::future::{IntoFuture, Ready};
15
16use itertools::Itertools;
17use zenoh_config::qos::PublisherQoSConfig;
18use zenoh_core::{Resolvable, Result as ZResult, Wait};
19use zenoh_keyexpr::keyexpr_tree::{IKeyExprTree, IKeyExprTreeNode};
20use zenoh_protocol::core::CongestionControl;
21#[cfg(feature = "unstable")]
22use zenoh_protocol::core::Reliability;
23
24#[cfg(feature = "unstable")]
25use crate::api::sample::SourceInfo;
26use crate::{
27 api::{
28 builders::sample::{
29 EncodingBuilderTrait, QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait,
30 },
31 bytes::{OptionZBytes, ZBytes},
32 encoding::Encoding,
33 key_expr::KeyExpr,
34 publisher::{Priority, Publisher},
35 sample::{Locality, SampleKind},
36 },
37 Session,
38};
39
40pub type SessionPutBuilder<'a, 'b> =
43 PublicationBuilder<PublisherBuilder<'a, 'b>, PublicationBuilderPut>;
44
45pub type SessionDeleteBuilder<'a, 'b> =
48 PublicationBuilder<PublisherBuilder<'a, 'b>, PublicationBuilderDelete>;
49
50pub type PublisherPutBuilder<'a> = PublicationBuilder<&'a Publisher<'a>, PublicationBuilderPut>;
53
54pub type PublisherDeleteBuilder<'a> =
57 PublicationBuilder<&'a Publisher<'a>, PublicationBuilderDelete>;
58
59#[derive(Debug, Clone)]
63pub struct PublicationBuilderPut {
64 pub(crate) payload: ZBytes,
65 pub(crate) encoding: Encoding,
66}
67
68#[derive(Debug, Clone)]
72pub struct PublicationBuilderDelete;
73
74#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
100#[derive(Debug, Clone)]
101pub struct PublicationBuilder<P, T> {
102 pub(crate) publisher: P,
103 pub(crate) kind: T,
104 pub(crate) timestamp: Option<uhlc::Timestamp>,
105 #[cfg(feature = "unstable")]
106 pub(crate) source_info: Option<SourceInfo>,
107 pub(crate) attachment: Option<ZBytes>,
108}
109
110#[zenoh_macros::internal_trait]
111impl<T> QoSBuilderTrait for PublicationBuilder<PublisherBuilder<'_, '_>, T> {
112 #[inline]
114 fn congestion_control(self, congestion_control: CongestionControl) -> Self {
115 Self {
116 publisher: self.publisher.congestion_control(congestion_control),
117 ..self
118 }
119 }
120
121 #[inline]
123 fn priority(self, priority: Priority) -> Self {
124 Self {
125 publisher: self.publisher.priority(priority),
126 ..self
127 }
128 }
129
130 #[inline]
135 fn express(self, is_express: bool) -> Self {
136 Self {
137 publisher: self.publisher.express(is_express),
138 ..self
139 }
140 }
141}
142
143impl<T> PublicationBuilder<PublisherBuilder<'_, '_>, T> {
144 #[zenoh_macros::unstable]
149 #[inline]
150 pub fn allowed_destination(mut self, destination: Locality) -> Self {
151 self.publisher = self.publisher.allowed_destination(destination);
152 self
153 }
154
155 #[zenoh_macros::unstable]
161 #[inline]
162 pub fn reliability(self, reliability: Reliability) -> Self {
163 Self {
164 publisher: self.publisher.reliability(reliability),
165 ..self
166 }
167 }
168}
169
170#[zenoh_macros::internal_trait]
171impl EncodingBuilderTrait for PublisherBuilder<'_, '_> {
172 fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
174 Self {
175 encoding: encoding.into(),
176 ..self
177 }
178 }
179}
180
181#[zenoh_macros::internal_trait]
182impl<P> EncodingBuilderTrait for PublicationBuilder<P, PublicationBuilderPut> {
183 fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
185 Self {
186 kind: PublicationBuilderPut {
187 encoding: encoding.into(),
188 ..self.kind
189 },
190 ..self
191 }
192 }
193}
194
195#[zenoh_macros::internal_trait]
196impl<P, T> SampleBuilderTrait for PublicationBuilder<P, T> {
197 #[zenoh_macros::unstable]
199 fn source_info<TS: Into<Option<SourceInfo>>>(self, source_info: TS) -> Self {
200 Self {
201 source_info: source_info.into(),
202 ..self
203 }
204 }
205 fn attachment<TA: Into<OptionZBytes>>(self, attachment: TA) -> Self {
208 let attachment: OptionZBytes = attachment.into();
209 Self {
210 attachment: attachment.into(),
211 ..self
212 }
213 }
214}
215
216#[zenoh_macros::internal_trait]
217impl<P, T> TimestampBuilderTrait for PublicationBuilder<P, T> {
218 fn timestamp<TS: Into<Option<uhlc::Timestamp>>>(self, timestamp: TS) -> Self {
220 Self {
221 timestamp: timestamp.into(),
222 ..self
223 }
224 }
225}
226
227impl<P, T> Resolvable for PublicationBuilder<P, T> {
228 type To = ZResult<()>;
229}
230
231impl Wait for PublicationBuilder<PublisherBuilder<'_, '_>, PublicationBuilderPut> {
232 #[inline]
233 fn wait(mut self) -> <Self as Resolvable>::To {
234 self.publisher = self.publisher.apply_qos_overwrites();
235 self.publisher.session.0.resolve_put(
236 &self.publisher.key_expr?,
237 self.kind.payload,
238 SampleKind::Put,
239 self.kind.encoding,
240 self.publisher.congestion_control,
241 self.publisher.priority,
242 self.publisher.is_express,
243 self.publisher.destination,
244 #[cfg(feature = "unstable")]
245 self.publisher.reliability,
246 self.timestamp,
247 #[cfg(feature = "unstable")]
248 self.source_info,
249 self.attachment,
250 )
251 }
252}
253
254impl Wait for PublicationBuilder<PublisherBuilder<'_, '_>, PublicationBuilderDelete> {
255 #[inline]
256 fn wait(mut self) -> <Self as Resolvable>::To {
257 self.publisher = self.publisher.apply_qos_overwrites();
258 self.publisher.session.0.resolve_put(
259 &self.publisher.key_expr?,
260 ZBytes::new(),
261 SampleKind::Delete,
262 Encoding::ZENOH_BYTES,
263 self.publisher.congestion_control,
264 self.publisher.priority,
265 self.publisher.is_express,
266 self.publisher.destination,
267 #[cfg(feature = "unstable")]
268 self.publisher.reliability,
269 self.timestamp,
270 #[cfg(feature = "unstable")]
271 self.source_info,
272 self.attachment,
273 )
274 }
275}
276
277impl IntoFuture for PublicationBuilder<PublisherBuilder<'_, '_>, PublicationBuilderPut> {
278 type Output = <Self as Resolvable>::To;
279 type IntoFuture = Ready<<Self as Resolvable>::To>;
280
281 fn into_future(self) -> Self::IntoFuture {
282 std::future::ready(self.wait())
283 }
284}
285
286impl IntoFuture for PublicationBuilder<PublisherBuilder<'_, '_>, PublicationBuilderDelete> {
287 type Output = <Self as Resolvable>::To;
288 type IntoFuture = Ready<<Self as Resolvable>::To>;
289
290 fn into_future(self) -> Self::IntoFuture {
291 std::future::ready(self.wait())
292 }
293}
294
295#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
314#[derive(Debug)]
315pub struct PublisherBuilder<'a, 'b> {
316 #[cfg(feature = "internal")]
317 pub session: &'a Session,
318 #[cfg(not(feature = "internal"))]
319 pub(crate) session: &'a Session,
320
321 #[cfg(feature = "internal")]
322 pub key_expr: ZResult<KeyExpr<'b>>,
323 #[cfg(not(feature = "internal"))]
324 pub(crate) key_expr: ZResult<KeyExpr<'b>>,
325
326 #[cfg(feature = "internal")]
327 pub encoding: Encoding,
328 #[cfg(not(feature = "internal"))]
329 pub(crate) encoding: Encoding,
330 #[cfg(feature = "internal")]
331 pub congestion_control: CongestionControl,
332 #[cfg(not(feature = "internal"))]
333 pub(crate) congestion_control: CongestionControl,
334 #[cfg(feature = "internal")]
335 pub priority: Priority,
336 #[cfg(not(feature = "internal"))]
337 pub(crate) priority: Priority,
338 #[cfg(feature = "internal")]
339 pub is_express: bool,
340 #[cfg(not(feature = "internal"))]
341 pub(crate) is_express: bool,
342 #[cfg(feature = "internal")]
343 #[cfg(feature = "unstable")]
344 pub reliability: Reliability,
345 #[cfg(not(feature = "internal"))]
346 #[cfg(feature = "unstable")]
347 pub(crate) reliability: Reliability,
348 #[cfg(feature = "internal")]
349 pub destination: Locality,
350 #[cfg(not(feature = "internal"))]
351 pub(crate) destination: Locality,
352}
353
354impl Clone for PublisherBuilder<'_, '_> {
355 fn clone(&self) -> Self {
356 Self {
357 session: self.session,
358 key_expr: match &self.key_expr {
359 Ok(k) => Ok(k.clone()),
360 Err(e) => Err(zerror!("Cloned KE Error: {}", e).into()),
361 },
362 encoding: self.encoding.clone(),
363 congestion_control: self.congestion_control,
364 priority: self.priority,
365 is_express: self.is_express,
366 #[cfg(feature = "unstable")]
367 reliability: self.reliability,
368 destination: self.destination,
369 }
370 }
371}
372
373#[zenoh_macros::internal_trait]
374impl QoSBuilderTrait for PublisherBuilder<'_, '_> {
375 #[inline]
377 fn congestion_control(self, congestion_control: CongestionControl) -> Self {
378 Self {
379 congestion_control,
380 ..self
381 }
382 }
383
384 #[inline]
386 fn priority(self, priority: Priority) -> Self {
387 Self { priority, ..self }
388 }
389
390 #[inline]
395 fn express(self, is_express: bool) -> Self {
396 Self { is_express, ..self }
397 }
398}
399
400impl PublisherBuilder<'_, '_> {
401 pub(crate) fn apply_qos_overwrites(self) -> Self {
404 let mut qos_overwrites = PublisherQoSConfig::default();
405 if let Ok(key_expr) = &self.key_expr {
406 let state = zread!(self.session.0.state);
408 let nodes_including = state
409 .publisher_qos_tree
410 .nodes_including(key_expr)
411 .filter(|n| n.weight().is_some())
412 .collect_vec();
413 if let Some(node) = nodes_including.first() {
414 qos_overwrites = node
415 .weight()
416 .expect("first node weight should not be None")
417 .clone();
418 if nodes_including.len() > 1 {
419 tracing::warn!(
420 "Publisher declared on `{}` which is included by multiple key_exprs in qos config ({}). Using qos config for `{}`",
421 key_expr,
422 nodes_including.iter().map(|n| n.keyexpr().to_string()).join(", "),
423 node.keyexpr(),
424 );
425 }
426 }
427 }
428
429 Self {
430 congestion_control: qos_overwrites
431 .congestion_control
432 .map(|cc| cc.into())
433 .unwrap_or(self.congestion_control),
434 priority: qos_overwrites
435 .priority
436 .map(|p| p.into())
437 .unwrap_or(self.priority),
438 is_express: qos_overwrites.express.unwrap_or(self.is_express),
439 #[cfg(feature = "unstable")]
440 reliability: qos_overwrites
441 .reliability
442 .map(|r| r.into())
443 .unwrap_or(self.reliability),
444 #[cfg(feature = "unstable")]
445 destination: qos_overwrites
446 .allowed_destination
447 .map(|d| d.into())
448 .unwrap_or(self.destination),
449 ..self
450 }
451 }
452
453 #[inline]
458 pub fn allowed_destination(mut self, destination: Locality) -> Self {
459 self.destination = destination;
460 self
461 }
462
463 #[zenoh_macros::unstable]
469 #[inline]
470 pub fn reliability(self, reliability: Reliability) -> Self {
471 Self {
472 reliability,
473 ..self
474 }
475 }
476}
477
478impl<'b> Resolvable for PublisherBuilder<'_, 'b> {
479 type To = ZResult<Publisher<'b>>;
480}
481
482impl Wait for PublisherBuilder<'_, '_> {
483 fn wait(mut self) -> <Self as Resolvable>::To {
484 self = self.apply_qos_overwrites();
485 let mut key_expr = self.key_expr?;
486 key_expr = self.session.declare_keyexpr(key_expr).wait()?;
487 let id = self
488 .session
489 .0
490 .declare_publisher_inner(key_expr.clone(), self.destination)?;
491 Ok(Publisher {
492 session: self.session.downgrade(),
493 id,
494 key_expr,
495 encoding: self.encoding,
496 congestion_control: self.congestion_control,
497 priority: self.priority,
498 is_express: self.is_express,
499 destination: self.destination,
500 #[cfg(feature = "unstable")]
501 reliability: self.reliability,
502 matching_listeners: Default::default(),
503 undeclare_on_drop: true,
504 })
505 }
506}
507
508impl IntoFuture for PublisherBuilder<'_, '_> {
509 type Output = <Self as Resolvable>::To;
510 type IntoFuture = Ready<<Self as Resolvable>::To>;
511
512 fn into_future(self) -> Self::IntoFuture {
513 std::future::ready(self.wait())
514 }
515}
516
517impl Wait for PublicationBuilder<&Publisher<'_>, PublicationBuilderPut> {
518 fn wait(self) -> <Self as Resolvable>::To {
519 self.publisher.session.resolve_put(
520 &self.publisher.key_expr,
521 self.kind.payload,
522 SampleKind::Put,
523 self.kind.encoding,
524 self.publisher.congestion_control,
525 self.publisher.priority,
526 self.publisher.is_express,
527 self.publisher.destination,
528 #[cfg(feature = "unstable")]
529 self.publisher.reliability,
530 self.timestamp,
531 #[cfg(feature = "unstable")]
532 self.source_info,
533 self.attachment,
534 )
535 }
536}
537
538impl Wait for PublicationBuilder<&Publisher<'_>, PublicationBuilderDelete> {
539 fn wait(self) -> <Self as Resolvable>::To {
540 self.publisher.session.resolve_put(
541 &self.publisher.key_expr,
542 ZBytes::new(),
543 SampleKind::Delete,
544 Encoding::ZENOH_BYTES,
545 self.publisher.congestion_control,
546 self.publisher.priority,
547 self.publisher.is_express,
548 self.publisher.destination,
549 #[cfg(feature = "unstable")]
550 self.publisher.reliability,
551 self.timestamp,
552 #[cfg(feature = "unstable")]
553 self.source_info,
554 self.attachment,
555 )
556 }
557}
558
559impl IntoFuture for PublicationBuilder<&Publisher<'_>, PublicationBuilderPut> {
560 type Output = <Self as Resolvable>::To;
561 type IntoFuture = Ready<<Self as Resolvable>::To>;
562
563 fn into_future(self) -> Self::IntoFuture {
564 std::future::ready(self.wait())
565 }
566}
567
568impl IntoFuture for PublicationBuilder<&Publisher<'_>, PublicationBuilderDelete> {
569 type Output = <Self as Resolvable>::To;
570 type IntoFuture = Ready<<Self as Resolvable>::To>;
571
572 fn into_future(self) -> Self::IntoFuture {
573 std::future::ready(self.wait())
574 }
575}