use std::{
future::{IntoFuture, Ready},
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
time::Duration,
};
use zenoh::{
bytes::{Encoding, OptionZBytes, ZBytes},
internal::{
bail,
runtime::ZRuntime,
traits::{
EncodingBuilderTrait, QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait,
},
TerminatableTask,
},
key_expr::{keyexpr, KeyExpr},
liveliness::LivelinessToken,
pubsub::{
PublicationBuilder, PublicationBuilderDelete, PublicationBuilderPut, Publisher,
PublisherBuilder,
},
qos::{CongestionControl, Priority, Reliability},
sample::{Locality, SourceInfo},
session::EntityGlobalId,
Resolvable, Resolve, Result as ZResult, Session, Wait, KE_ADV_PREFIX, KE_AT, KE_EMPTY,
};
use zenoh_macros::ke;
use crate::{
advanced_cache::{AdvancedCache, AdvancedCacheBuilder, CacheConfig, KE_UHLC},
z_serialize,
};
pub(crate) static KE_PUB: &keyexpr = ke!("pub");
#[derive(PartialEq)]
#[zenoh_macros::unstable]
pub(crate) enum Sequencing {
None,
Timestamp,
SequenceNumber,
}
#[derive(Default)]
#[zenoh_macros::unstable]
pub struct MissDetectionConfig {
pub(crate) state_publisher: Option<Duration>,
}
#[zenoh_macros::unstable]
impl MissDetectionConfig {
#[zenoh_macros::unstable]
pub fn heartbeat(mut self, period: Duration) -> Self {
self.state_publisher = Some(period);
self
}
}
#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"]
#[zenoh_macros::unstable]
pub struct AdvancedPublisherBuilder<'a, 'b, 'c> {
session: &'a Session,
pub_key_expr: ZResult<KeyExpr<'b>>,
encoding: Encoding,
destination: Locality,
reliability: Reliability,
congestion_control: CongestionControl,
priority: Priority,
is_express: bool,
meta_key_expr: Option<ZResult<KeyExpr<'c>>>,
sequencing: Sequencing,
miss_config: Option<MissDetectionConfig>,
liveliness: bool,
cache: bool,
history: CacheConfig,
}
#[zenoh_macros::unstable]
impl<'a, 'b, 'c> AdvancedPublisherBuilder<'a, 'b, 'c> {
#[zenoh_macros::unstable]
pub(crate) fn new(builder: PublisherBuilder<'a, 'b>) -> AdvancedPublisherBuilder<'a, 'b, 'c> {
AdvancedPublisherBuilder {
session: builder.session,
pub_key_expr: builder.key_expr,
encoding: builder.encoding,
destination: builder.destination,
reliability: builder.reliability,
congestion_control: builder.congestion_control,
priority: builder.priority,
is_express: builder.is_express,
meta_key_expr: None,
sequencing: Sequencing::None,
miss_config: None,
liveliness: false,
cache: false,
history: CacheConfig::default(),
}
}
#[zenoh_macros::unstable]
#[inline]
pub fn allowed_destination(mut self, destination: Locality) -> Self {
self.destination = destination;
self
}
#[zenoh_macros::unstable]
#[inline]
pub fn reliability(self, reliability: Reliability) -> Self {
Self {
reliability,
..self
}
}
#[zenoh_macros::unstable]
pub fn sample_miss_detection(mut self, config: MissDetectionConfig) -> Self {
self.sequencing = Sequencing::SequenceNumber;
self.miss_config = Some(config);
self
}
#[zenoh_macros::unstable]
pub fn cache(mut self, config: CacheConfig) -> Self {
self.cache = true;
if self.sequencing == Sequencing::None {
self.sequencing = Sequencing::Timestamp;
}
self.history = config;
self
}
#[zenoh_macros::unstable]
pub fn publisher_detection(mut self) -> Self {
self.liveliness = true;
self
}
#[zenoh_macros::unstable]
pub fn publisher_detection_metadata<TryIntoKeyExpr>(mut self, meta: TryIntoKeyExpr) -> Self
where
TryIntoKeyExpr: TryInto<KeyExpr<'c>>,
<TryIntoKeyExpr as TryInto<KeyExpr<'c>>>::Error: Into<zenoh::Error>,
{
self.meta_key_expr = Some(meta.try_into().map_err(Into::into));
self
}
}
#[zenoh_macros::internal_trait]
#[zenoh_macros::unstable]
impl EncodingBuilderTrait for AdvancedPublisherBuilder<'_, '_, '_> {
#[zenoh_macros::unstable]
fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
Self {
encoding: encoding.into(),
..self
}
}
}
#[zenoh_macros::internal_trait]
#[zenoh_macros::unstable]
impl QoSBuilderTrait for AdvancedPublisherBuilder<'_, '_, '_> {
#[inline]
#[zenoh_macros::unstable]
fn congestion_control(self, congestion_control: CongestionControl) -> Self {
Self {
congestion_control,
..self
}
}
#[inline]
#[zenoh_macros::unstable]
fn priority(self, priority: Priority) -> Self {
Self { priority, ..self }
}
#[inline]
#[zenoh_macros::unstable]
fn express(self, is_express: bool) -> Self {
Self { is_express, ..self }
}
}
#[zenoh_macros::unstable]
impl<'b> Resolvable for AdvancedPublisherBuilder<'_, 'b, '_> {
type To = ZResult<AdvancedPublisher<'b>>;
}
#[zenoh_macros::unstable]
impl Wait for AdvancedPublisherBuilder<'_, '_, '_> {
#[zenoh_macros::unstable]
fn wait(self) -> <Self as Resolvable>::To {
AdvancedPublisher::new(self)
}
}
#[zenoh_macros::unstable]
impl IntoFuture for AdvancedPublisherBuilder<'_, '_, '_> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;
#[zenoh_macros::unstable]
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
}
#[zenoh_macros::unstable]
pub struct AdvancedPublisher<'a> {
publisher: Publisher<'a>,
seqnum: Option<Arc<AtomicU32>>,
cache: Option<AdvancedCache>,
_token: Option<LivelinessToken>,
_state_publisher: Option<TerminatableTask>,
}
#[zenoh_macros::unstable]
impl<'a> AdvancedPublisher<'a> {
#[zenoh_macros::unstable]
fn new(conf: AdvancedPublisherBuilder<'_, 'a, '_>) -> ZResult<Self> {
let key_expr = conf.pub_key_expr?;
let meta = match conf.meta_key_expr {
Some(meta) => Some(meta?),
None => None,
};
let publisher = conf
.session
.declare_publisher(key_expr.clone())
.encoding(conf.encoding)
.allowed_destination(conf.destination)
.reliability(conf.reliability)
.congestion_control(conf.congestion_control)
.priority(conf.priority)
.express(conf.is_express)
.wait()?;
let id = publisher.id();
let prefix = KE_ADV_PREFIX / KE_PUB / &id.zid().into_keyexpr();
let prefix = match conf.sequencing {
Sequencing::SequenceNumber => {
prefix / &KeyExpr::try_from(id.eid().to_string()).unwrap()
}
_ => prefix / KE_UHLC,
};
let prefix = match meta {
Some(meta) => prefix / &meta / KE_AT,
_ => prefix / KE_EMPTY / KE_AT,
};
let seqnum = match conf.sequencing {
Sequencing::SequenceNumber => Some(Arc::new(AtomicU32::new(0))),
Sequencing::Timestamp => {
if conf.session.hlc().is_none() {
bail!(
"Cannot create AdvancedPublisher {} with Sequencing::Timestamp: \
the 'timestamping' setting must be enabled in the Zenoh configuration.",
key_expr,
)
}
None
}
_ => None,
};
let cache = if conf.cache {
Some(
AdvancedCacheBuilder::new(conf.session, Ok(key_expr.clone()))
.history(conf.history)
.queryable_prefix(&prefix)
.wait()?,
)
} else {
None
};
let token = if conf.liveliness {
Some(
conf.session
.liveliness()
.declare_token(&prefix / &key_expr)
.wait()?,
)
} else {
None
};
let state_publisher = if let Some(period) = conf.miss_config.and_then(|c| c.state_publisher)
{
if let Some(seqnum) = seqnum.as_ref() {
let seqnum = seqnum.clone();
let publisher = conf.session.declare_publisher(prefix / &key_expr).wait()?;
Some(TerminatableTask::spawn_abortable(
ZRuntime::Net,
async move {
loop {
tokio::time::sleep(period).await;
let seqnum = seqnum.load(Ordering::Relaxed);
if seqnum > 0 {
let _ = publisher.put(z_serialize(&(seqnum - 1))).await;
}
}
},
))
} else {
None
}
} else {
None
};
Ok(AdvancedPublisher {
publisher,
seqnum,
cache,
_token: token,
_state_publisher: state_publisher,
})
}
#[zenoh_macros::unstable]
pub fn id(&self) -> EntityGlobalId {
self.publisher.id()
}
#[inline]
#[zenoh_macros::unstable]
pub fn key_expr(&self) -> &KeyExpr<'a> {
self.publisher.key_expr()
}
#[inline]
#[zenoh_macros::unstable]
pub fn encoding(&self) -> &Encoding {
self.publisher.encoding()
}
#[inline]
#[zenoh_macros::unstable]
pub fn congestion_control(&self) -> CongestionControl {
self.publisher.congestion_control()
}
#[inline]
#[zenoh_macros::unstable]
pub fn priority(&self) -> Priority {
self.publisher.priority()
}
#[inline]
#[zenoh_macros::unstable]
pub fn put<IntoZBytes>(&self, payload: IntoZBytes) -> AdvancedPublisherPutBuilder<'_>
where
IntoZBytes: Into<ZBytes>,
{
let mut builder = self.publisher.put(payload);
if let Some(seqnum) = &self.seqnum {
builder = builder.source_info(SourceInfo::new(
Some(self.publisher.id()),
Some(seqnum.fetch_add(1, Ordering::Relaxed)),
));
}
if let Some(hlc) = self.publisher.session().hlc() {
builder = builder.timestamp(hlc.new_timestamp());
}
AdvancedPublisherPutBuilder {
builder,
cache: self.cache.as_ref(),
}
}
#[zenoh_macros::unstable]
pub fn delete(&self) -> AdvancedPublisherDeleteBuilder<'_> {
let mut builder = self.publisher.delete();
if let Some(seqnum) = &self.seqnum {
builder = builder.source_info(SourceInfo::new(
Some(self.publisher.id()),
Some(seqnum.fetch_add(1, Ordering::Relaxed)),
));
}
if let Some(hlc) = self.publisher.session().hlc() {
builder = builder.timestamp(hlc.new_timestamp());
}
AdvancedPublisherDeleteBuilder {
builder,
cache: self.cache.as_ref(),
}
}
#[zenoh_macros::unstable]
pub fn matching_status(&self) -> impl Resolve<ZResult<zenoh::matching::MatchingStatus>> + '_ {
self.publisher.matching_status()
}
#[zenoh_macros::unstable]
pub fn matching_listener(
&self,
) -> zenoh::matching::MatchingListenerBuilder<'_, zenoh::handlers::DefaultHandler> {
self.publisher.matching_listener()
}
#[zenoh_macros::unstable]
pub fn undeclare(self) -> impl Resolve<ZResult<()>> + 'a {
self.publisher.undeclare()
}
}
#[zenoh_macros::unstable]
pub type AdvancedPublisherPutBuilder<'a> = AdvancedPublicationBuilder<'a, PublicationBuilderPut>;
#[zenoh_macros::unstable]
pub type AdvancedPublisherDeleteBuilder<'a> =
AdvancedPublicationBuilder<'a, PublicationBuilderDelete>;
#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
#[derive(Clone)]
#[zenoh_macros::unstable]
pub struct AdvancedPublicationBuilder<'a, P> {
pub(crate) builder: PublicationBuilder<&'a Publisher<'a>, P>,
pub(crate) cache: Option<&'a AdvancedCache>,
}
#[zenoh_macros::internal_trait]
#[zenoh_macros::unstable]
impl EncodingBuilderTrait for AdvancedPublicationBuilder<'_, PublicationBuilderPut> {
#[zenoh_macros::unstable]
fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
Self {
builder: self.builder.encoding(encoding),
..self
}
}
}
#[zenoh_macros::internal_trait]
#[zenoh_macros::unstable]
impl<P> SampleBuilderTrait for AdvancedPublicationBuilder<'_, P> {
#[zenoh_macros::unstable]
fn source_info(self, source_info: SourceInfo) -> Self {
Self {
builder: self.builder.source_info(source_info),
..self
}
}
#[zenoh_macros::unstable]
fn attachment<TA: Into<OptionZBytes>>(self, attachment: TA) -> Self {
let attachment: OptionZBytes = attachment.into();
Self {
builder: self.builder.attachment(attachment),
..self
}
}
}
#[zenoh_macros::internal_trait]
#[zenoh_macros::unstable]
impl<P> TimestampBuilderTrait for AdvancedPublicationBuilder<'_, P> {
#[zenoh_macros::unstable]
fn timestamp<TS: Into<Option<uhlc::Timestamp>>>(self, timestamp: TS) -> Self {
Self {
builder: self.builder.timestamp(timestamp),
..self
}
}
}
#[zenoh_macros::unstable]
impl<P> Resolvable for AdvancedPublicationBuilder<'_, P> {
type To = ZResult<()>;
}
#[zenoh_macros::unstable]
impl Wait for AdvancedPublisherPutBuilder<'_> {
#[inline]
#[zenoh_macros::unstable]
fn wait(self) -> <Self as Resolvable>::To {
if let Some(cache) = self.cache {
cache.cache_sample(zenoh::sample::Sample::from(&self.builder));
}
self.builder.wait()
}
}
#[zenoh_macros::unstable]
impl Wait for AdvancedPublisherDeleteBuilder<'_> {
#[inline]
#[zenoh_macros::unstable]
fn wait(self) -> <Self as Resolvable>::To {
if let Some(cache) = self.cache {
cache.cache_sample(zenoh::sample::Sample::from(&self.builder));
}
self.builder.wait()
}
}
#[zenoh_macros::unstable]
impl IntoFuture for AdvancedPublisherPutBuilder<'_> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;
#[zenoh_macros::unstable]
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
}
#[zenoh_macros::unstable]
impl IntoFuture for AdvancedPublisherDeleteBuilder<'_> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;
#[zenoh_macros::unstable]
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
}