1use std::{
15 future::{IntoFuture, Ready},
16 sync::{
17 atomic::{AtomicU32, Ordering},
18 Arc,
19 },
20 time::Duration,
21};
22
23use zenoh::{
24 bytes::{Encoding, OptionZBytes, ZBytes},
25 internal::{
26 bail,
27 runtime::ZRuntime,
28 traits::{
29 EncodingBuilderTrait, QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait,
30 },
31 TerminatableTask,
32 },
33 key_expr::{keyexpr, KeyExpr},
34 liveliness::LivelinessToken,
35 pubsub::{
36 PublicationBuilder, PublicationBuilderDelete, PublicationBuilderPut, Publisher,
37 PublisherBuilder,
38 },
39 qos::{CongestionControl, Priority, Reliability},
40 sample::{Locality, SourceInfo},
41 session::EntityGlobalId,
42 Resolvable, Resolve, Result as ZResult, Session, Wait, KE_ADV_PREFIX, KE_EMPTY,
43};
44use zenoh_macros::ke;
45
46use crate::{
47 advanced_cache::{AdvancedCache, AdvancedCacheBuilder, CacheConfig, KE_UHLC},
48 z_serialize,
49};
50
51pub(crate) static KE_PUB: &keyexpr = ke!("pub");
52
53#[derive(PartialEq)]
54#[zenoh_macros::unstable]
55pub(crate) enum Sequencing {
56 None,
57 Timestamp,
58 SequenceNumber,
59}
60
61#[zenoh_macros::unstable]
68#[derive(Debug, Default, Clone)]
69pub struct MissDetectionConfig {
70 pub(crate) state_publisher: Option<(Duration, bool)>,
71}
72
73#[zenoh_macros::unstable]
74impl MissDetectionConfig {
75 #[zenoh_macros::unstable]
85 pub fn heartbeat(mut self, period: Duration) -> Self {
86 self.state_publisher = Some((period, false));
87 self
88 }
89
90 #[zenoh_macros::unstable]
101 pub fn sporadic_heartbeat(mut self, period: Duration) -> Self {
102 self.state_publisher = Some((period, true));
103 self
104 }
105}
106
107#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"]
109#[zenoh_macros::unstable]
110pub struct AdvancedPublisherBuilder<'a, 'b, 'c> {
111 session: &'a Session,
112 pub_key_expr: ZResult<KeyExpr<'b>>,
113 encoding: Encoding,
114 destination: Locality,
115 reliability: Reliability,
116 congestion_control: CongestionControl,
117 priority: Priority,
118 is_express: bool,
119 meta_key_expr: Option<ZResult<KeyExpr<'c>>>,
120 sequencing: Sequencing,
121 miss_config: Option<MissDetectionConfig>,
122 liveliness: bool,
123 cache: bool,
124 history: CacheConfig,
125}
126
127#[zenoh_macros::unstable]
128impl<'a, 'b, 'c> AdvancedPublisherBuilder<'a, 'b, 'c> {
129 #[zenoh_macros::unstable]
130 pub(crate) fn new(builder: PublisherBuilder<'a, 'b>) -> AdvancedPublisherBuilder<'a, 'b, 'c> {
131 AdvancedPublisherBuilder {
132 session: builder.session,
133 pub_key_expr: builder.key_expr,
134 encoding: builder.encoding,
135 destination: builder.destination,
136 reliability: builder.reliability,
137 congestion_control: builder.congestion_control,
138 priority: builder.priority,
139 is_express: builder.is_express,
140 meta_key_expr: None,
141 sequencing: Sequencing::None,
142 miss_config: None,
143 liveliness: false,
144 cache: false,
145 history: CacheConfig::default(),
146 }
147 }
148
149 #[zenoh_macros::unstable]
154 #[inline]
155 pub fn allowed_destination(mut self, destination: Locality) -> Self {
156 self.destination = destination;
157 self
158 }
159
160 #[zenoh_macros::unstable]
166 #[inline]
167 pub fn reliability(self, reliability: Reliability) -> Self {
168 Self {
169 reliability,
170 ..self
171 }
172 }
173
174 #[zenoh_macros::unstable]
178 pub fn sample_miss_detection(mut self, config: MissDetectionConfig) -> Self {
179 self.sequencing = Sequencing::SequenceNumber;
180 self.miss_config = Some(config);
181 self
182 }
183
184 #[zenoh_macros::unstable]
188 pub fn cache(mut self, config: CacheConfig) -> Self {
189 self.cache = true;
190 if self.sequencing == Sequencing::None {
191 self.sequencing = Sequencing::Timestamp;
192 }
193 self.history = config;
194 self
195 }
196
197 #[zenoh_macros::unstable]
201 pub fn publisher_detection(mut self) -> Self {
202 self.liveliness = true;
203 self
204 }
205
206 #[zenoh_macros::unstable]
210 pub fn publisher_detection_metadata<TryIntoKeyExpr>(mut self, meta: TryIntoKeyExpr) -> Self
211 where
212 TryIntoKeyExpr: TryInto<KeyExpr<'c>>,
213 <TryIntoKeyExpr as TryInto<KeyExpr<'c>>>::Error: Into<zenoh::Error>,
214 {
215 self.meta_key_expr = Some(meta.try_into().map_err(Into::into));
216 self
217 }
218}
219
220#[zenoh_macros::internal_trait]
221#[zenoh_macros::unstable]
222impl EncodingBuilderTrait for AdvancedPublisherBuilder<'_, '_, '_> {
223 #[zenoh_macros::unstable]
224 fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
225 Self {
226 encoding: encoding.into(),
227 ..self
228 }
229 }
230}
231
232#[zenoh_macros::internal_trait]
233#[zenoh_macros::unstable]
234impl QoSBuilderTrait for AdvancedPublisherBuilder<'_, '_, '_> {
235 #[inline]
237 #[zenoh_macros::unstable]
238 fn congestion_control(self, congestion_control: CongestionControl) -> Self {
239 Self {
240 congestion_control,
241 ..self
242 }
243 }
244
245 #[inline]
247 #[zenoh_macros::unstable]
248 fn priority(self, priority: Priority) -> Self {
249 Self { priority, ..self }
250 }
251
252 #[inline]
257 #[zenoh_macros::unstable]
258 fn express(self, is_express: bool) -> Self {
259 Self { is_express, ..self }
260 }
261}
262
263#[zenoh_macros::unstable]
264impl<'b> Resolvable for AdvancedPublisherBuilder<'_, 'b, '_> {
265 type To = ZResult<AdvancedPublisher<'b>>;
266}
267
268#[zenoh_macros::unstable]
269impl Wait for AdvancedPublisherBuilder<'_, '_, '_> {
270 #[zenoh_macros::unstable]
271 fn wait(self) -> <Self as Resolvable>::To {
272 AdvancedPublisher::new(self)
273 }
274}
275
276#[zenoh_macros::unstable]
277impl IntoFuture for AdvancedPublisherBuilder<'_, '_, '_> {
278 type Output = <Self as Resolvable>::To;
279 type IntoFuture = Ready<<Self as Resolvable>::To>;
280
281 #[zenoh_macros::unstable]
282 fn into_future(self) -> Self::IntoFuture {
283 std::future::ready(self.wait())
284 }
285}
286
287#[zenoh_macros::unstable]
320pub struct AdvancedPublisher<'a> {
321 publisher: Publisher<'a>,
322 seqnum: Option<Arc<AtomicU32>>,
323 cache: Option<AdvancedCache>,
324 _token: Option<LivelinessToken>,
325 _state_publisher: Option<TerminatableTask>,
326}
327
328#[zenoh_macros::unstable]
329impl<'a> AdvancedPublisher<'a> {
330 #[zenoh_macros::unstable]
331 fn new(conf: AdvancedPublisherBuilder<'_, 'a, '_>) -> ZResult<Self> {
332 let key_expr = conf.pub_key_expr?;
333 let meta = match conf.meta_key_expr {
334 Some(meta) => Some(meta?),
335 None => None,
336 };
337 tracing::debug!("Create AdvancedPublisher{{key_expr: {}}}", &key_expr);
338
339 let publisher = conf
340 .session
341 .declare_publisher(key_expr.clone())
342 .encoding(conf.encoding)
343 .allowed_destination(conf.destination)
344 .reliability(conf.reliability)
345 .congestion_control(conf.congestion_control)
346 .priority(conf.priority)
347 .express(conf.is_express)
348 .wait()?;
349 let id = publisher.id();
350 let suffix = KE_ADV_PREFIX / KE_PUB / &id.zid().into_keyexpr();
351 let suffix = match conf.sequencing {
352 Sequencing::SequenceNumber => {
353 suffix / &KeyExpr::try_from(id.eid().to_string()).unwrap()
354 }
355 _ => suffix / KE_UHLC,
356 };
357 let suffix = match meta {
358 Some(meta) => suffix / &meta,
359 _ => suffix / KE_EMPTY,
361 };
362
363 let seqnum = match conf.sequencing {
364 Sequencing::SequenceNumber => Some(Arc::new(AtomicU32::new(0))),
365 Sequencing::Timestamp => {
366 if conf.session.hlc().is_none() {
367 bail!(
368 "Cannot create AdvancedPublisher {} with Sequencing::Timestamp: \
369 the 'timestamping' setting must be enabled in the Zenoh configuration.",
370 key_expr,
371 )
372 }
373 None
374 }
375 _ => None,
376 };
377
378 let cache = if conf.cache {
379 Some(
380 AdvancedCacheBuilder::new(conf.session, Ok(key_expr.clone()))
381 .history(conf.history)
382 .queryable_suffix(&suffix)
383 .wait()?,
384 )
385 } else {
386 None
387 };
388
389 let token = if conf.liveliness {
390 tracing::debug!(
391 "AdvancedPublisher{{key_expr: {}}}: Declare liveliness token {}",
392 key_expr,
393 &key_expr / &suffix,
394 );
395 Some(
396 conf.session
397 .liveliness()
398 .declare_token(&key_expr / &suffix)
399 .wait()?,
400 )
401 } else {
402 None
403 };
404
405 let state_publisher = if let Some((period, sporadic)) =
406 conf.miss_config.as_ref().and_then(|c| c.state_publisher)
407 {
408 if let Some(seqnum) = seqnum.as_ref() {
409 tracing::debug!(
410 "AdvancedPublisher{{key_expr: {}}}: Enable {}heartbeat on {} with period {:?}",
411 key_expr,
412 if sporadic { "sporadic " } else { "" },
413 &key_expr / &suffix,
414 period
415 );
416 let seqnum = seqnum.clone();
417 if !sporadic {
418 let publisher = conf.session.declare_publisher(&key_expr / &suffix).wait()?;
419 Some(TerminatableTask::spawn_abortable(
420 ZRuntime::Net,
421 async move {
422 loop {
423 tokio::time::sleep(period).await;
424 let seqnum = seqnum.load(Ordering::Relaxed);
425 if seqnum > 0 {
426 let _ = publisher.put(z_serialize(&(seqnum - 1))).await;
427 }
428 }
429 },
430 ))
431 } else {
432 let mut last_seqnum = 0;
433 let publisher = conf
434 .session
435 .declare_publisher(&key_expr / &suffix)
436 .congestion_control(CongestionControl::Block)
437 .wait()?;
438 Some(TerminatableTask::spawn_abortable(
439 ZRuntime::Net,
440 async move {
441 loop {
442 tokio::time::sleep(period).await;
443 let seqnum = seqnum.load(Ordering::Relaxed);
444 if seqnum > last_seqnum {
445 let _ = publisher.put(z_serialize(&(seqnum - 1))).await;
446 last_seqnum = seqnum;
447 }
448 }
449 },
450 ))
451 }
452 } else {
453 None
454 }
455 } else {
456 None
457 };
458
459 Ok(AdvancedPublisher {
460 publisher,
461 seqnum,
462 cache,
463 _token: token,
464 _state_publisher: state_publisher,
465 })
466 }
467
468 #[zenoh_macros::unstable]
472 pub fn id(&self) -> EntityGlobalId {
473 self.publisher.id()
474 }
475
476 #[inline]
480 #[zenoh_macros::unstable]
481 pub fn key_expr(&self) -> &KeyExpr<'a> {
482 self.publisher.key_expr()
483 }
484
485 #[inline]
489 #[zenoh_macros::unstable]
490 pub fn encoding(&self) -> &Encoding {
491 self.publisher.encoding()
492 }
493
494 #[inline]
498 #[zenoh_macros::unstable]
499 pub fn congestion_control(&self) -> CongestionControl {
500 self.publisher.congestion_control()
501 }
502
503 #[inline]
507 #[zenoh_macros::unstable]
508 pub fn priority(&self) -> Priority {
509 self.publisher.priority()
510 }
511
512 #[inline]
528 #[zenoh_macros::unstable]
529 pub fn put<IntoZBytes>(&self, payload: IntoZBytes) -> AdvancedPublisherPutBuilder<'_>
530 where
531 IntoZBytes: Into<ZBytes>,
532 {
533 let mut builder = self.publisher.put(payload);
534 if let Some(seqnum) = &self.seqnum {
535 let info = Some(SourceInfo::new(
536 self.publisher.id(),
537 seqnum.fetch_add(1, Ordering::Relaxed),
538 ));
539 tracing::trace!(
540 "AdvancedPublisher{{key_expr: {}}}: Put data with {:?}",
541 self.publisher.key_expr(),
542 info
543 );
544 builder = builder.source_info(info);
545 }
546 if let Some(hlc) = self.publisher.session().hlc() {
547 builder = builder.timestamp(hlc.new_timestamp());
548 }
549 AdvancedPublisherPutBuilder {
550 builder,
551 cache: self.cache.as_ref(),
552 }
553 }
554
555 #[zenoh_macros::unstable]
571 pub fn delete(&self) -> AdvancedPublisherDeleteBuilder<'_> {
572 let mut builder = self.publisher.delete();
573 if let Some(seqnum) = &self.seqnum {
574 builder = builder.source_info(Some(SourceInfo::new(
575 self.publisher.id(),
576 seqnum.fetch_add(1, Ordering::Relaxed),
577 )));
578 }
579 if let Some(hlc) = self.publisher.session().hlc() {
580 builder = builder.timestamp(hlc.new_timestamp());
581 }
582 AdvancedPublisherDeleteBuilder {
583 builder,
584 cache: self.cache.as_ref(),
585 }
586 }
587
588 #[zenoh_macros::unstable]
611 pub fn matching_status(&self) -> impl Resolve<ZResult<zenoh::matching::MatchingStatus>> + '_ {
612 self.publisher.matching_status()
613 }
614
615 #[zenoh_macros::unstable]
641 pub fn matching_listener(
642 &self,
643 ) -> zenoh::matching::MatchingListenerBuilder<'_, zenoh::handlers::DefaultHandler> {
644 self.publisher.matching_listener()
645 }
646
647 #[zenoh_macros::unstable]
663 pub fn undeclare(self) -> impl Resolve<ZResult<()>> + 'a {
664 tracing::debug!(
665 "AdvancedPublisher{{key_expr: {}}}: Undeclare",
666 self.key_expr()
667 );
668 self.publisher.undeclare()
669 }
670}
671
672#[zenoh_macros::unstable]
673pub type AdvancedPublisherPutBuilder<'a> = AdvancedPublicationBuilder<'a, PublicationBuilderPut>;
674#[zenoh_macros::unstable]
675pub type AdvancedPublisherDeleteBuilder<'a> =
676 AdvancedPublicationBuilder<'a, PublicationBuilderDelete>;
677
678#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
679#[derive(Clone)]
680#[zenoh_macros::unstable]
681pub struct AdvancedPublicationBuilder<'a, P> {
682 pub(crate) builder: PublicationBuilder<&'a Publisher<'a>, P>,
683 pub(crate) cache: Option<&'a AdvancedCache>,
684}
685
686#[zenoh_macros::internal_trait]
687#[zenoh_macros::unstable]
688impl EncodingBuilderTrait for AdvancedPublicationBuilder<'_, PublicationBuilderPut> {
689 #[zenoh_macros::unstable]
690 fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
691 Self {
692 builder: self.builder.encoding(encoding),
693 ..self
694 }
695 }
696}
697
698#[zenoh_macros::internal_trait]
699#[zenoh_macros::unstable]
700impl<P> SampleBuilderTrait for AdvancedPublicationBuilder<'_, P> {
701 #[zenoh_macros::unstable]
702 fn source_info<TS: Into<Option<SourceInfo>>>(self, source_info: TS) -> Self {
703 Self {
704 builder: self.builder.source_info(source_info),
705 ..self
706 }
707 }
708 #[zenoh_macros::unstable]
709 fn attachment<TA: Into<OptionZBytes>>(self, attachment: TA) -> Self {
710 let attachment: OptionZBytes = attachment.into();
711 Self {
712 builder: self.builder.attachment(attachment),
713 ..self
714 }
715 }
716}
717
718#[zenoh_macros::internal_trait]
719#[zenoh_macros::unstable]
720impl<P> TimestampBuilderTrait for AdvancedPublicationBuilder<'_, P> {
721 #[zenoh_macros::unstable]
722 fn timestamp<TS: Into<Option<uhlc::Timestamp>>>(self, timestamp: TS) -> Self {
723 Self {
724 builder: self.builder.timestamp(timestamp),
725 ..self
726 }
727 }
728}
729
730#[zenoh_macros::unstable]
731impl<P> Resolvable for AdvancedPublicationBuilder<'_, P> {
732 type To = ZResult<()>;
733}
734
735#[zenoh_macros::unstable]
736impl Wait for AdvancedPublisherPutBuilder<'_> {
737 #[inline]
738 #[zenoh_macros::unstable]
739 fn wait(self) -> <Self as Resolvable>::To {
740 if let Some(cache) = self.cache {
741 cache.cache_sample(zenoh::sample::Sample::from(&self.builder));
742 }
743 self.builder.wait()
744 }
745}
746
747#[zenoh_macros::unstable]
748impl Wait for AdvancedPublisherDeleteBuilder<'_> {
749 #[inline]
750 #[zenoh_macros::unstable]
751 fn wait(self) -> <Self as Resolvable>::To {
752 if let Some(cache) = self.cache {
753 cache.cache_sample(zenoh::sample::Sample::from(&self.builder));
754 }
755 self.builder.wait()
756 }
757}
758
759#[zenoh_macros::unstable]
760impl IntoFuture for AdvancedPublisherPutBuilder<'_> {
761 type Output = <Self as Resolvable>::To;
762 type IntoFuture = Ready<<Self as Resolvable>::To>;
763
764 #[zenoh_macros::unstable]
765 fn into_future(self) -> Self::IntoFuture {
766 std::future::ready(self.wait())
767 }
768}
769
770#[zenoh_macros::unstable]
771impl IntoFuture for AdvancedPublisherDeleteBuilder<'_> {
772 type Output = <Self as Resolvable>::To;
773 type IntoFuture = Ready<<Self as Resolvable>::To>;
774
775 #[zenoh_macros::unstable]
776 fn into_future(self) -> Self::IntoFuture {
777 std::future::ready(self.wait())
778 }
779}