1use std::{
15 future::{IntoFuture, Ready},
16 sync::{
17 atomic::{AtomicU32, Ordering},
18 Arc,
19 },
20 time::Duration,
21};
22
23use zenoh::{
24 bytes::{Encoding, OptionZBytes, ZBytes},
25 internal::{
26 bail,
27 runtime::ZRuntime,
28 traits::{
29 EncodingBuilderTrait, QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait,
30 },
31 TerminatableTask,
32 },
33 key_expr::{keyexpr, KeyExpr},
34 liveliness::LivelinessToken,
35 pubsub::{
36 PublicationBuilder, PublicationBuilderDelete, PublicationBuilderPut, Publisher,
37 PublisherBuilder, PublisherUndeclaration,
38 },
39 qos::{CongestionControl, Priority, Reliability},
40 sample::{Locality, SourceInfo},
41 session::EntityGlobalId,
42 Resolvable, Resolve, Result as ZResult, Session, Wait, KE_ADV_PREFIX, KE_EMPTY,
43};
44use zenoh_macros::ke;
45
46use crate::{
47 advanced_cache::{AdvancedCache, AdvancedCacheBuilder, CacheConfig, KE_UHLC},
48 z_serialize,
49};
50
51pub(crate) static KE_PUB: &keyexpr = ke!("pub");
52
53#[derive(PartialEq)]
54#[zenoh_macros::unstable]
55pub(crate) enum Sequencing {
56 None,
57 Timestamp,
58 SequenceNumber,
59}
60
61#[zenoh_macros::unstable]
68#[derive(Debug, Default, Clone)]
69pub struct MissDetectionConfig {
70 pub(crate) state_publisher: Option<(Duration, bool)>,
71}
72
73#[zenoh_macros::unstable]
74impl MissDetectionConfig {
75 #[zenoh_macros::unstable]
85 pub fn heartbeat(mut self, period: Duration) -> Self {
86 self.state_publisher = Some((period, false));
87 self
88 }
89
90 #[zenoh_macros::unstable]
101 pub fn sporadic_heartbeat(mut self, period: Duration) -> Self {
102 self.state_publisher = Some((period, true));
103 self
104 }
105}
106
107#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"]
109#[zenoh_macros::unstable]
110pub struct AdvancedPublisherBuilder<'a, 'b, 'c> {
111 session: &'a Session,
112 pub_key_expr: ZResult<KeyExpr<'b>>,
113 encoding: Encoding,
114 destination: Locality,
115 reliability: Reliability,
116 congestion_control: CongestionControl,
117 priority: Priority,
118 is_express: bool,
119 meta_key_expr: Option<ZResult<KeyExpr<'c>>>,
120 sequencing: Sequencing,
121 miss_config: Option<MissDetectionConfig>,
122 liveliness: bool,
123 cache: bool,
124 history: CacheConfig,
125}
126
127#[zenoh_macros::unstable]
128impl<'a, 'b, 'c> AdvancedPublisherBuilder<'a, 'b, 'c> {
129 #[zenoh_macros::unstable]
130 pub(crate) fn new(builder: PublisherBuilder<'a, 'b>) -> AdvancedPublisherBuilder<'a, 'b, 'c> {
131 AdvancedPublisherBuilder {
132 session: builder.session,
133 pub_key_expr: builder.key_expr,
134 encoding: builder.encoding,
135 destination: builder.destination,
136 reliability: builder.reliability,
137 congestion_control: builder.congestion_control,
138 priority: builder.priority,
139 is_express: builder.is_express,
140 meta_key_expr: None,
141 sequencing: Sequencing::None,
142 miss_config: None,
143 liveliness: false,
144 cache: false,
145 history: CacheConfig::default(),
146 }
147 }
148
149 #[zenoh_macros::unstable]
154 #[inline]
155 pub fn allowed_destination(mut self, destination: Locality) -> Self {
156 self.destination = destination;
157 self
158 }
159
160 #[zenoh_macros::unstable]
166 #[inline]
167 pub fn reliability(self, reliability: Reliability) -> Self {
168 Self {
169 reliability,
170 ..self
171 }
172 }
173
174 #[zenoh_macros::unstable]
178 pub fn sample_miss_detection(mut self, config: MissDetectionConfig) -> Self {
179 self.sequencing = Sequencing::SequenceNumber;
180 self.miss_config = Some(config);
181 self
182 }
183
184 #[zenoh_macros::unstable]
188 pub fn cache(mut self, config: CacheConfig) -> Self {
189 self.cache = true;
190 if self.sequencing == Sequencing::None {
191 self.sequencing = Sequencing::Timestamp;
192 }
193 self.history = config;
194 self
195 }
196
197 #[zenoh_macros::unstable]
201 pub fn publisher_detection(mut self) -> Self {
202 self.liveliness = true;
203 self
204 }
205
206 #[zenoh_macros::unstable]
210 pub fn publisher_detection_metadata<TryIntoKeyExpr>(mut self, meta: TryIntoKeyExpr) -> Self
211 where
212 TryIntoKeyExpr: TryInto<KeyExpr<'c>>,
213 <TryIntoKeyExpr as TryInto<KeyExpr<'c>>>::Error: Into<zenoh::Error>,
214 {
215 self.meta_key_expr = Some(meta.try_into().map_err(Into::into));
216 self
217 }
218}
219
220#[zenoh_macros::internal_trait]
221#[zenoh_macros::unstable]
222impl EncodingBuilderTrait for AdvancedPublisherBuilder<'_, '_, '_> {
223 #[zenoh_macros::unstable]
225 fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
226 Self {
227 encoding: encoding.into(),
228 ..self
229 }
230 }
231}
232
233#[zenoh_macros::internal_trait]
234#[zenoh_macros::unstable]
235impl QoSBuilderTrait for AdvancedPublisherBuilder<'_, '_, '_> {
236 #[inline]
238 #[zenoh_macros::unstable]
239 fn congestion_control(self, congestion_control: CongestionControl) -> Self {
240 Self {
241 congestion_control,
242 ..self
243 }
244 }
245
246 #[inline]
248 #[zenoh_macros::unstable]
249 fn priority(self, priority: Priority) -> Self {
250 Self { priority, ..self }
251 }
252
253 #[inline]
258 #[zenoh_macros::unstable]
259 fn express(self, is_express: bool) -> Self {
260 Self { is_express, ..self }
261 }
262}
263
264#[zenoh_macros::unstable]
265impl<'b> Resolvable for AdvancedPublisherBuilder<'_, 'b, '_> {
266 type To = ZResult<AdvancedPublisher<'b>>;
267}
268
269#[zenoh_macros::unstable]
270impl Wait for AdvancedPublisherBuilder<'_, '_, '_> {
271 #[zenoh_macros::unstable]
272 fn wait(self) -> <Self as Resolvable>::To {
273 AdvancedPublisher::new(self)
274 }
275}
276
277#[zenoh_macros::unstable]
278impl IntoFuture for AdvancedPublisherBuilder<'_, '_, '_> {
279 type Output = <Self as Resolvable>::To;
280 type IntoFuture = Ready<<Self as Resolvable>::To>;
281
282 #[zenoh_macros::unstable]
283 fn into_future(self) -> Self::IntoFuture {
284 std::future::ready(self.wait())
285 }
286}
287
288#[zenoh_macros::unstable]
321pub struct AdvancedPublisher<'a> {
322 publisher: Publisher<'a>,
323 seqnum: Option<Arc<AtomicU32>>,
324 cache: Option<AdvancedCache>,
325 _token: Option<LivelinessToken>,
326 _state_publisher: Option<TerminatableTask>,
327}
328
329#[zenoh_macros::unstable]
330impl<'a> AdvancedPublisher<'a> {
331 #[zenoh_macros::unstable]
332 fn new(conf: AdvancedPublisherBuilder<'_, 'a, '_>) -> ZResult<Self> {
333 let key_expr = conf.pub_key_expr?;
334 let meta = match conf.meta_key_expr {
335 Some(meta) => Some(meta?),
336 None => None,
337 };
338 tracing::debug!("Create AdvancedPublisher{{key_expr: {}}}", &key_expr);
339
340 let publisher = conf
341 .session
342 .declare_publisher(key_expr.clone())
343 .encoding(conf.encoding)
344 .allowed_destination(conf.destination)
345 .reliability(conf.reliability)
346 .congestion_control(conf.congestion_control)
347 .priority(conf.priority)
348 .express(conf.is_express)
349 .wait()?;
350 let id = publisher.id();
351 let suffix = KE_ADV_PREFIX / KE_PUB / &id.zid().into_keyexpr();
352 let suffix = match conf.sequencing {
353 Sequencing::SequenceNumber => {
354 suffix / &KeyExpr::try_from(id.eid().to_string()).unwrap()
355 }
356 _ => suffix / KE_UHLC,
357 };
358 let suffix = match meta {
359 Some(meta) => suffix / &meta,
360 _ => suffix / KE_EMPTY,
362 };
363
364 let seqnum = match conf.sequencing {
365 Sequencing::SequenceNumber => Some(Arc::new(AtomicU32::new(0))),
366 Sequencing::Timestamp => {
367 if conf.session.hlc().is_none() {
368 bail!(
369 "Cannot create AdvancedPublisher {} with Sequencing::Timestamp: \
370 the 'timestamping' setting must be enabled in the Zenoh configuration.",
371 key_expr,
372 )
373 }
374 None
375 }
376 _ => None,
377 };
378
379 let cache = if conf.cache {
380 Some(
381 AdvancedCacheBuilder::new(conf.session, Ok(key_expr.clone()))
382 .history(conf.history)
383 .queryable_suffix(&suffix)
384 .wait()?,
385 )
386 } else {
387 None
388 };
389
390 let token = if conf.liveliness {
391 tracing::debug!(
392 "AdvancedPublisher{{key_expr: {}}}: Declare liveliness token {}",
393 key_expr,
394 &key_expr / &suffix,
395 );
396 Some(
397 conf.session
398 .liveliness()
399 .declare_token(&key_expr / &suffix)
400 .wait()?,
401 )
402 } else {
403 None
404 };
405
406 let state_publisher = if let Some((period, sporadic)) =
407 conf.miss_config.as_ref().and_then(|c| c.state_publisher)
408 {
409 if let Some(seqnum) = seqnum.as_ref() {
410 tracing::debug!(
411 "AdvancedPublisher{{key_expr: {}}}: Enable {}heartbeat on {} with period {:?}",
412 key_expr,
413 if sporadic { "sporadic " } else { "" },
414 &key_expr / &suffix,
415 period
416 );
417 let seqnum = seqnum.clone();
418 if !sporadic {
419 let publisher = conf.session.declare_publisher(&key_expr / &suffix).wait()?;
420 Some(TerminatableTask::spawn_abortable(
421 ZRuntime::Net,
422 async move {
423 loop {
424 tokio::time::sleep(period).await;
425 let seqnum = seqnum.load(Ordering::Relaxed);
426 if seqnum > 0 {
427 let _ = publisher.put(z_serialize(&(seqnum - 1))).await;
428 }
429 }
430 },
431 ))
432 } else {
433 let mut last_seqnum = 0;
434 let publisher = conf
435 .session
436 .declare_publisher(&key_expr / &suffix)
437 .congestion_control(CongestionControl::Block)
438 .wait()?;
439 Some(TerminatableTask::spawn_abortable(
440 ZRuntime::Net,
441 async move {
442 loop {
443 tokio::time::sleep(period).await;
444 let seqnum = seqnum.load(Ordering::Relaxed);
445 if seqnum > last_seqnum {
446 let _ = publisher.put(z_serialize(&(seqnum - 1))).await;
447 last_seqnum = seqnum;
448 }
449 }
450 },
451 ))
452 }
453 } else {
454 None
455 }
456 } else {
457 None
458 };
459
460 Ok(AdvancedPublisher {
461 publisher,
462 seqnum,
463 cache,
464 _token: token,
465 _state_publisher: state_publisher,
466 })
467 }
468
469 #[zenoh_macros::unstable]
473 pub fn id(&self) -> EntityGlobalId {
474 self.publisher.id()
475 }
476
477 #[inline]
481 #[zenoh_macros::unstable]
482 pub fn key_expr(&self) -> &KeyExpr<'a> {
483 self.publisher.key_expr()
484 }
485
486 #[inline]
490 #[zenoh_macros::unstable]
491 pub fn encoding(&self) -> &Encoding {
492 self.publisher.encoding()
493 }
494
495 #[inline]
499 #[zenoh_macros::unstable]
500 pub fn congestion_control(&self) -> CongestionControl {
501 self.publisher.congestion_control()
502 }
503
504 #[inline]
508 #[zenoh_macros::unstable]
509 pub fn priority(&self) -> Priority {
510 self.publisher.priority()
511 }
512
513 #[inline]
529 #[zenoh_macros::unstable]
530 pub fn put<IntoZBytes>(&self, payload: IntoZBytes) -> AdvancedPublisherPutBuilder<'_>
531 where
532 IntoZBytes: Into<ZBytes>,
533 {
534 let mut builder = self.publisher.put(payload);
535 if let Some(seqnum) = &self.seqnum {
536 let info = Some(SourceInfo::new(
537 self.publisher.id(),
538 seqnum.fetch_add(1, Ordering::Relaxed),
539 ));
540 tracing::trace!(
541 "AdvancedPublisher{{key_expr: {}}}: Put data with {:?}",
542 self.publisher.key_expr(),
543 info
544 );
545 builder = builder.source_info(info);
546 }
547 if let Some(hlc) = self.publisher.session().hlc() {
548 builder = builder.timestamp(hlc.new_timestamp());
549 }
550 AdvancedPublisherPutBuilder {
551 builder,
552 cache: self.cache.as_ref(),
553 }
554 }
555
556 #[zenoh_macros::unstable]
572 pub fn delete(&self) -> AdvancedPublisherDeleteBuilder<'_> {
573 let mut builder = self.publisher.delete();
574 if let Some(seqnum) = &self.seqnum {
575 builder = builder.source_info(Some(SourceInfo::new(
576 self.publisher.id(),
577 seqnum.fetch_add(1, Ordering::Relaxed),
578 )));
579 }
580 if let Some(hlc) = self.publisher.session().hlc() {
581 builder = builder.timestamp(hlc.new_timestamp());
582 }
583 AdvancedPublisherDeleteBuilder {
584 builder,
585 cache: self.cache.as_ref(),
586 }
587 }
588
589 #[zenoh_macros::unstable]
612 pub fn matching_status(&self) -> impl Resolve<ZResult<zenoh::matching::MatchingStatus>> + '_ {
613 self.publisher.matching_status()
614 }
615
616 #[zenoh_macros::unstable]
642 pub fn matching_listener(
643 &self,
644 ) -> zenoh::matching::MatchingListenerBuilder<'_, zenoh::handlers::DefaultHandler> {
645 self.publisher.matching_listener()
646 }
647
648 #[zenoh_macros::unstable]
664 pub fn undeclare(self) -> PublisherUndeclaration<'a> {
665 tracing::debug!(
666 "AdvancedPublisher{{key_expr: {}}}: Undeclare",
667 self.key_expr()
668 );
669 self.publisher.undeclare()
670 }
671}
672
673#[zenoh_macros::unstable]
674pub type AdvancedPublisherPutBuilder<'a> = AdvancedPublicationBuilder<'a, PublicationBuilderPut>;
675#[zenoh_macros::unstable]
676pub type AdvancedPublisherDeleteBuilder<'a> =
677 AdvancedPublicationBuilder<'a, PublicationBuilderDelete>;
678
679#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
680#[derive(Clone)]
681#[zenoh_macros::unstable]
682pub struct AdvancedPublicationBuilder<'a, P> {
683 pub(crate) builder: PublicationBuilder<&'a Publisher<'a>, P>,
684 pub(crate) cache: Option<&'a AdvancedCache>,
685}
686
687#[zenoh_macros::internal_trait]
688#[zenoh_macros::unstable]
689impl EncodingBuilderTrait for AdvancedPublicationBuilder<'_, PublicationBuilderPut> {
690 #[zenoh_macros::unstable]
692 fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
693 Self {
694 builder: self.builder.encoding(encoding),
695 ..self
696 }
697 }
698}
699
700#[zenoh_macros::internal_trait]
701#[zenoh_macros::unstable]
702impl<P> SampleBuilderTrait for AdvancedPublicationBuilder<'_, P> {
703 #[zenoh_macros::unstable]
704 fn source_info<TS: Into<Option<SourceInfo>>>(self, source_info: TS) -> Self {
706 Self {
707 builder: self.builder.source_info(source_info),
708 ..self
709 }
710 }
711 #[zenoh_macros::unstable]
712 fn attachment<TA: Into<OptionZBytes>>(self, attachment: TA) -> Self {
717 let attachment: OptionZBytes = attachment.into();
718 Self {
719 builder: self.builder.attachment(attachment),
720 ..self
721 }
722 }
723}
724
725#[zenoh_macros::internal_trait]
726#[zenoh_macros::unstable]
727impl<P> TimestampBuilderTrait for AdvancedPublicationBuilder<'_, P> {
728 #[zenoh_macros::unstable]
730 fn timestamp<TS: Into<Option<uhlc::Timestamp>>>(self, timestamp: TS) -> Self {
731 Self {
732 builder: self.builder.timestamp(timestamp),
733 ..self
734 }
735 }
736}
737
738#[zenoh_macros::unstable]
739impl<P> Resolvable for AdvancedPublicationBuilder<'_, P> {
740 type To = ZResult<()>;
741}
742
743#[zenoh_macros::unstable]
744impl Wait for AdvancedPublisherPutBuilder<'_> {
745 #[inline]
746 #[zenoh_macros::unstable]
747 fn wait(self) -> <Self as Resolvable>::To {
748 if let Some(cache) = self.cache {
749 cache.cache_sample(zenoh::sample::Sample::from(&self.builder));
750 }
751 self.builder.wait()
752 }
753}
754
755#[zenoh_macros::unstable]
756impl Wait for AdvancedPublisherDeleteBuilder<'_> {
757 #[inline]
758 #[zenoh_macros::unstable]
759 fn wait(self) -> <Self as Resolvable>::To {
760 if let Some(cache) = self.cache {
761 cache.cache_sample(zenoh::sample::Sample::from(&self.builder));
762 }
763 self.builder.wait()
764 }
765}
766
767#[zenoh_macros::unstable]
768impl IntoFuture for AdvancedPublisherPutBuilder<'_> {
769 type Output = <Self as Resolvable>::To;
770 type IntoFuture = Ready<<Self as Resolvable>::To>;
771
772 #[zenoh_macros::unstable]
773 fn into_future(self) -> Self::IntoFuture {
774 std::future::ready(self.wait())
775 }
776}
777
778#[zenoh_macros::unstable]
779impl IntoFuture for AdvancedPublisherDeleteBuilder<'_> {
780 type Output = <Self as Resolvable>::To;
781 type IntoFuture = Ready<<Self as Resolvable>::To>;
782
783 #[zenoh_macros::unstable]
784 fn into_future(self) -> Self::IntoFuture {
785 std::future::ready(self.wait())
786 }
787}