use std::future::{IntoFuture, Ready};
use zenoh_core::{Resolvable, Result as ZResult, Wait};
use zenoh_protocol::core::CongestionControl;
#[cfg(feature = "unstable")]
use zenoh_protocol::core::Reliability;
#[cfg(feature = "unstable")]
use crate::api::sample::SourceInfo;
use crate::{
api::{
builders::sample::{
EncodingBuilderTrait, QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait,
},
bytes::{OptionZBytes, ZBytes},
cancellation::SyncGroup,
encoding::Encoding,
key_expr::KeyExpr,
publisher::{Priority, Publisher},
sample::{Locality, SampleKind},
},
Session,
};
pub type SessionPutBuilder<'a, 'b> =
PublicationBuilder<PublisherBuilder<'a, 'b>, PublicationBuilderPut>;
pub type SessionDeleteBuilder<'a, 'b> =
PublicationBuilder<PublisherBuilder<'a, 'b>, PublicationBuilderDelete>;
pub type PublisherPutBuilder<'a> = PublicationBuilder<&'a Publisher<'a>, PublicationBuilderPut>;
pub type PublisherDeleteBuilder<'a> =
PublicationBuilder<&'a Publisher<'a>, PublicationBuilderDelete>;
#[derive(Debug, Clone)]
pub struct PublicationBuilderPut {
pub(crate) payload: ZBytes,
pub(crate) encoding: Encoding,
}
#[derive(Debug, Clone)]
pub struct PublicationBuilderDelete;
#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
#[derive(Debug, Clone)]
pub struct PublicationBuilder<P, T> {
pub(crate) publisher: P,
pub(crate) kind: T,
pub(crate) timestamp: Option<uhlc::Timestamp>,
#[cfg(feature = "unstable")]
pub(crate) source_info: Option<SourceInfo>,
pub(crate) attachment: Option<ZBytes>,
}
#[zenoh_macros::internal_trait]
impl<T> QoSBuilderTrait for PublicationBuilder<PublisherBuilder<'_, '_>, T> {
#[inline]
fn congestion_control(self, congestion_control: CongestionControl) -> Self {
Self {
publisher: self.publisher.congestion_control(congestion_control),
..self
}
}
#[inline]
fn priority(self, priority: Priority) -> Self {
Self {
publisher: self.publisher.priority(priority),
..self
}
}
#[inline]
fn express(self, is_express: bool) -> Self {
Self {
publisher: self.publisher.express(is_express),
..self
}
}
}
impl<T> PublicationBuilder<PublisherBuilder<'_, '_>, T> {
#[inline]
pub fn allowed_destination(mut self, destination: Locality) -> Self {
self.publisher = self.publisher.allowed_destination(destination);
self
}
#[zenoh_macros::unstable]
#[inline]
pub fn reliability(self, reliability: Reliability) -> Self {
Self {
publisher: self.publisher.reliability(reliability),
..self
}
}
}
#[zenoh_macros::internal_trait]
impl EncodingBuilderTrait for PublisherBuilder<'_, '_> {
fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
Self {
encoding: encoding.into(),
..self
}
}
}
#[zenoh_macros::internal_trait]
impl<P> EncodingBuilderTrait for PublicationBuilder<P, PublicationBuilderPut> {
fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
Self {
kind: PublicationBuilderPut {
encoding: encoding.into(),
..self.kind
},
..self
}
}
}
#[zenoh_macros::internal_trait]
impl<P, T> SampleBuilderTrait for PublicationBuilder<P, T> {
#[zenoh_macros::unstable]
fn source_info<TS: Into<Option<SourceInfo>>>(self, source_info: TS) -> Self {
Self {
source_info: source_info.into(),
..self
}
}
fn attachment<TA: Into<OptionZBytes>>(self, attachment: TA) -> Self {
let attachment: OptionZBytes = attachment.into();
Self {
attachment: attachment.into(),
..self
}
}
}
#[zenoh_macros::internal_trait]
impl<P, T> TimestampBuilderTrait for PublicationBuilder<P, T> {
fn timestamp<TS: Into<Option<uhlc::Timestamp>>>(self, timestamp: TS) -> Self {
Self {
timestamp: timestamp.into(),
..self
}
}
}
impl<P, T> Resolvable for PublicationBuilder<P, T> {
type To = ZResult<()>;
}
impl Wait for PublicationBuilder<PublisherBuilder<'_, '_>, PublicationBuilderPut> {
#[inline]
fn wait(mut self) -> <Self as Resolvable>::To {
self.publisher = self.publisher.apply_qos_overwrites();
self.publisher.session.resolve_put(
&self.publisher.key_expr?,
self.kind.payload,
SampleKind::Put,
self.kind.encoding,
self.publisher.congestion_control,
self.publisher.priority,
self.publisher.is_express,
self.publisher.destination,
#[cfg(feature = "unstable")]
self.publisher.reliability,
self.timestamp,
#[cfg(feature = "unstable")]
self.source_info,
self.attachment,
)
}
}
impl Wait for PublicationBuilder<PublisherBuilder<'_, '_>, PublicationBuilderDelete> {
#[inline]
fn wait(mut self) -> <Self as Resolvable>::To {
self.publisher = self.publisher.apply_qos_overwrites();
self.publisher.session.resolve_put(
&self.publisher.key_expr?,
ZBytes::new(),
SampleKind::Delete,
Encoding::ZENOH_BYTES,
self.publisher.congestion_control,
self.publisher.priority,
self.publisher.is_express,
self.publisher.destination,
#[cfg(feature = "unstable")]
self.publisher.reliability,
self.timestamp,
#[cfg(feature = "unstable")]
self.source_info,
self.attachment,
)
}
}
impl IntoFuture for PublicationBuilder<PublisherBuilder<'_, '_>, PublicationBuilderPut> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
}
impl IntoFuture for PublicationBuilder<PublisherBuilder<'_, '_>, PublicationBuilderDelete> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
}
#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
#[derive(Debug)]
pub struct PublisherBuilder<'a, 'b> {
#[cfg(feature = "internal")]
pub session: &'a Session,
#[cfg(not(feature = "internal"))]
pub(crate) session: &'a Session,
#[cfg(feature = "internal")]
pub key_expr: ZResult<KeyExpr<'b>>,
#[cfg(not(feature = "internal"))]
pub(crate) key_expr: ZResult<KeyExpr<'b>>,
#[cfg(feature = "internal")]
pub encoding: Encoding,
#[cfg(not(feature = "internal"))]
pub(crate) encoding: Encoding,
#[cfg(feature = "internal")]
pub congestion_control: CongestionControl,
#[cfg(not(feature = "internal"))]
pub(crate) congestion_control: CongestionControl,
#[cfg(feature = "internal")]
pub priority: Priority,
#[cfg(not(feature = "internal"))]
pub(crate) priority: Priority,
#[cfg(feature = "internal")]
pub is_express: bool,
#[cfg(not(feature = "internal"))]
pub(crate) is_express: bool,
#[cfg(feature = "internal")]
#[cfg(feature = "unstable")]
pub reliability: Reliability,
#[cfg(not(feature = "internal"))]
#[cfg(feature = "unstable")]
pub(crate) reliability: Reliability,
#[cfg(feature = "internal")]
pub destination: Locality,
#[cfg(not(feature = "internal"))]
pub(crate) destination: Locality,
}
impl Clone for PublisherBuilder<'_, '_> {
fn clone(&self) -> Self {
Self {
session: self.session,
key_expr: match &self.key_expr {
Ok(k) => Ok(k.clone()),
Err(e) => Err(zerror!("Cloned KE Error: {}", e).into()),
},
encoding: self.encoding.clone(),
congestion_control: self.congestion_control,
priority: self.priority,
is_express: self.is_express,
#[cfg(feature = "unstable")]
reliability: self.reliability,
destination: self.destination,
}
}
}
#[zenoh_macros::internal_trait]
impl QoSBuilderTrait for PublisherBuilder<'_, '_> {
#[inline]
fn congestion_control(self, congestion_control: CongestionControl) -> Self {
Self {
congestion_control,
..self
}
}
#[inline]
fn priority(self, priority: Priority) -> Self {
Self { priority, ..self }
}
#[inline]
fn express(self, is_express: bool) -> Self {
Self { is_express, ..self }
}
}
impl PublisherBuilder<'_, '_> {
pub(crate) fn apply_qos_overwrites(self) -> Self {
let qos_overwrites = self.key_expr.as_ref().map_or(Default::default(), |ke| {
self.session.get_publisher_qos_overwrite(ke)
});
Self {
congestion_control: qos_overwrites
.congestion_control
.map(|cc| cc.into())
.unwrap_or(self.congestion_control),
priority: qos_overwrites
.priority
.map(|p| p.into())
.unwrap_or(self.priority),
is_express: qos_overwrites.express.unwrap_or(self.is_express),
#[cfg(feature = "unstable")]
reliability: qos_overwrites
.reliability
.map(|r| r.into())
.unwrap_or(self.reliability),
#[cfg(feature = "unstable")]
destination: qos_overwrites
.allowed_destination
.map(|d| d.into())
.unwrap_or(self.destination),
..self
}
}
#[inline]
pub fn allowed_destination(mut self, destination: Locality) -> Self {
self.destination = destination;
self
}
#[zenoh_macros::unstable]
#[inline]
pub fn reliability(self, reliability: Reliability) -> Self {
Self {
reliability,
..self
}
}
}
impl<'b> Resolvable for PublisherBuilder<'_, 'b> {
type To = ZResult<Publisher<'b>>;
}
impl Wait for PublisherBuilder<'_, '_> {
fn wait(mut self) -> <Self as Resolvable>::To {
self = self.apply_qos_overwrites();
let mut key_expr = self.key_expr?;
key_expr = self.session.declare_keyexpr(key_expr).wait()?;
let id = self
.session
.declare_publisher_inner(key_expr.clone(), self.destination)?;
Ok(Publisher {
session: self.session.downgrade(),
id,
key_expr,
encoding: self.encoding,
congestion_control: self.congestion_control,
priority: self.priority,
is_express: self.is_express,
destination: self.destination,
#[cfg(feature = "unstable")]
reliability: self.reliability,
matching_listeners: Default::default(),
undeclare_on_drop: true,
sync_group: SyncGroup::default(),
})
}
}
impl IntoFuture for PublisherBuilder<'_, '_> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
}
impl Wait for PublicationBuilder<&Publisher<'_>, PublicationBuilderPut> {
fn wait(self) -> <Self as Resolvable>::To {
self.publisher.session.resolve_put(
&self.publisher.key_expr,
self.kind.payload,
SampleKind::Put,
self.kind.encoding,
self.publisher.congestion_control,
self.publisher.priority,
self.publisher.is_express,
self.publisher.destination,
#[cfg(feature = "unstable")]
self.publisher.reliability,
self.timestamp,
#[cfg(feature = "unstable")]
self.source_info,
self.attachment,
)
}
}
impl Wait for PublicationBuilder<&Publisher<'_>, PublicationBuilderDelete> {
fn wait(self) -> <Self as Resolvable>::To {
self.publisher.session.resolve_put(
&self.publisher.key_expr,
ZBytes::new(),
SampleKind::Delete,
Encoding::ZENOH_BYTES,
self.publisher.congestion_control,
self.publisher.priority,
self.publisher.is_express,
self.publisher.destination,
#[cfg(feature = "unstable")]
self.publisher.reliability,
self.timestamp,
#[cfg(feature = "unstable")]
self.source_info,
self.attachment,
)
}
}
impl IntoFuture for PublicationBuilder<&Publisher<'_>, PublicationBuilderPut> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
}
impl IntoFuture for PublicationBuilder<&Publisher<'_>, PublicationBuilderDelete> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
}