use std::{
collections::HashSet,
convert::TryFrom,
fmt,
future::{IntoFuture, Ready},
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll},
};
use futures::Sink;
use serde::Deserialize;
use tracing::error;
use zenoh_config::qos::PriorityConf;
use zenoh_core::{Resolvable, Resolve, Wait};
use zenoh_protocol::core::CongestionControl;
use zenoh_result::{Error, ZResult};
#[cfg(feature = "unstable")]
use {
zenoh_config::wrappers::EntityGlobalId, zenoh_protocol::core::EntityGlobalIdProto,
zenoh_protocol::core::Reliability,
};
use crate::api::{
builders::{
matching_listener::MatchingListenerBuilder,
publisher::{
PublicationBuilder, PublicationBuilderDelete, PublicationBuilderPut,
PublisherDeleteBuilder, PublisherPutBuilder,
},
},
bytes::ZBytes,
cancellation::SyncGroup,
encoding::Encoding,
handlers::DefaultHandler,
key_expr::KeyExpr,
matching::{MatchingStatus, MatchingStatusType},
sample::{Locality, Sample, SampleFields},
session::{UndeclarableSealed, WeakSession},
Id,
};
pub(crate) struct PublisherState {
pub(crate) id: Id,
pub(crate) remote_id: Id,
pub(crate) key_expr: KeyExpr<'static>,
pub(crate) destination: Locality,
}
impl fmt::Debug for PublisherState {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Publisher")
.field("id", &self.id)
.field("key_expr", &self.key_expr)
.finish()
}
}
#[derive(Debug)]
pub struct Publisher<'a> {
pub(crate) session: WeakSession,
pub(crate) id: Id,
pub(crate) key_expr: KeyExpr<'a>,
pub(crate) encoding: Encoding,
pub(crate) congestion_control: CongestionControl,
pub(crate) priority: Priority,
pub(crate) is_express: bool,
pub(crate) destination: Locality,
#[cfg(feature = "unstable")]
pub(crate) reliability: Reliability,
pub(crate) matching_listeners: Arc<Mutex<HashSet<Id>>>,
pub(crate) undeclare_on_drop: bool,
pub(crate) sync_group: SyncGroup,
}
impl<'a> Publisher<'a> {
#[zenoh_macros::unstable]
pub fn id(&self) -> EntityGlobalId {
EntityGlobalIdProto {
zid: self.session.zid().into(),
eid: self.id,
}
.into()
}
#[inline]
pub fn key_expr(&self) -> &KeyExpr<'a> {
&self.key_expr
}
#[inline]
pub fn encoding(&self) -> &Encoding {
&self.encoding
}
#[inline]
pub fn congestion_control(&self) -> CongestionControl {
self.congestion_control
}
#[inline]
pub fn priority(&self) -> Priority {
self.priority
}
#[zenoh_macros::unstable]
#[inline]
pub fn reliability(&self) -> Reliability {
self.reliability
}
#[inline]
pub fn put<IntoZBytes>(&self, payload: IntoZBytes) -> PublisherPutBuilder<'_>
where
IntoZBytes: Into<ZBytes>,
{
PublicationBuilder {
publisher: self,
kind: PublicationBuilderPut {
payload: payload.into(),
encoding: self.encoding.clone(),
},
timestamp: None,
#[cfg(feature = "unstable")]
source_info: None,
attachment: None,
}
}
pub fn delete(&self) -> PublisherDeleteBuilder<'_> {
PublicationBuilder {
publisher: self,
kind: PublicationBuilderDelete,
timestamp: None,
#[cfg(feature = "unstable")]
source_info: None,
attachment: None,
}
}
pub fn matching_status(&self) -> impl Resolve<ZResult<MatchingStatus>> + '_ {
zenoh_core::ResolveFuture::new(async move {
self.session.matching_status(
self.key_expr(),
self.destination,
MatchingStatusType::Subscribers,
)
})
}
pub fn matching_listener(&self) -> MatchingListenerBuilder<'_, DefaultHandler> {
MatchingListenerBuilder {
session: &self.session,
key_expr: &self.key_expr,
destination: self.destination,
matching_listeners: &self.matching_listeners,
matching_status_type: MatchingStatusType::Subscribers,
handler: DefaultHandler::default(),
parent_callback_sync_group_notifier: self.sync_group.notifier(),
}
}
pub fn undeclare(self) -> PublisherUndeclaration<'a> {
UndeclarableSealed::undeclare_inner(self, ())
}
fn undeclare_impl(&mut self) -> ZResult<()> {
self.undeclare_on_drop = false;
let ids: Vec<Id> = zlock!(self.matching_listeners).drain().collect();
for id in ids {
self.session.undeclare_matches_listener_inner(id)?
}
self.session.undeclare_publisher_inner(self.id)
}
#[zenoh_macros::internal]
pub fn session(&self) -> &WeakSession {
&self.session
}
}
impl<'a> UndeclarableSealed<()> for Publisher<'a> {
type Undeclaration = PublisherUndeclaration<'a>;
fn undeclare_inner(self, _: ()) -> Self::Undeclaration {
PublisherUndeclaration {
publisher: self,
wait_callbacks: false,
}
}
}
#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
pub struct PublisherUndeclaration<'a> {
publisher: Publisher<'a>,
wait_callbacks: bool,
}
impl<'a> PublisherUndeclaration<'a> {
#[zenoh_macros::internal_or_unstable]
pub fn wait_callbacks(mut self) -> Self {
self.wait_callbacks = true;
self
}
}
impl Resolvable for PublisherUndeclaration<'_> {
type To = ZResult<()>;
}
impl Wait for PublisherUndeclaration<'_> {
fn wait(mut self) -> <Self as Resolvable>::To {
self.publisher.undeclare_impl()?;
if self.wait_callbacks {
self.publisher.sync_group.wait();
}
Ok(())
}
}
impl IntoFuture for PublisherUndeclaration<'_> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
}
impl Drop for Publisher<'_> {
fn drop(&mut self) {
if self.undeclare_on_drop {
if let Err(error) = self.undeclare_impl() {
error!(error);
}
}
}
}
impl Sink<Sample> for Publisher<'_> {
type Error = Error;
#[inline]
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
#[inline]
fn start_send(self: Pin<&mut Self>, item: Sample) -> Result<(), Self::Error> {
let SampleFields {
payload,
kind,
encoding,
attachment,
..
} = item.into();
self.session.resolve_put(
&self.key_expr,
payload,
kind,
encoding,
self.congestion_control,
self.priority,
self.is_express,
self.destination,
#[cfg(feature = "unstable")]
self.reliability,
None,
#[cfg(feature = "unstable")]
None,
attachment,
)
}
#[inline]
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
#[inline]
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
}
#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, Hash, Deserialize)]
#[repr(u8)]
pub enum Priority {
RealTime = 1,
InteractiveHigh = 2,
InteractiveLow = 3,
DataHigh = 4,
#[default]
Data = 5,
DataLow = 6,
Background = 7,
}
impl Priority {
pub const DEFAULT: Self = Self::Data;
#[zenoh_macros::internal]
pub const MIN: Self = Self::MIN_;
const MIN_: Self = Self::Background;
#[zenoh_macros::internal]
pub const MAX: Self = Self::MAX_;
const MAX_: Self = Self::RealTime;
#[zenoh_macros::internal]
pub const NUM: usize = 1 + Self::MIN_ as usize - Self::MAX_ as usize;
}
impl TryFrom<u8> for Priority {
type Error = zenoh_result::Error;
fn try_from(priority: u8) -> Result<Self, Self::Error> {
match priority {
1 => Ok(Priority::RealTime),
2 => Ok(Priority::InteractiveHigh),
3 => Ok(Priority::InteractiveLow),
4 => Ok(Priority::DataHigh),
5 => Ok(Priority::Data),
6 => Ok(Priority::DataLow),
7 => Ok(Priority::Background),
unknown => bail!(
"{} is not a valid priority value. Admitted values are: [{}-{}].",
unknown,
Self::MAX_ as u8,
Self::MIN_ as u8
),
}
}
}
impl From<PriorityConf> for Priority {
fn from(value: PriorityConf) -> Self {
match value {
PriorityConf::RealTime => Self::RealTime,
PriorityConf::InteractiveHigh => Self::InteractiveHigh,
PriorityConf::InteractiveLow => Self::InteractiveLow,
PriorityConf::DataHigh => Self::DataHigh,
PriorityConf::Data => Self::Data,
PriorityConf::DataLow => Self::DataLow,
PriorityConf::Background => Self::Background,
}
}
}
impl From<Priority> for PriorityConf {
fn from(value: Priority) -> Self {
match value {
Priority::RealTime => Self::RealTime,
Priority::InteractiveHigh => Self::InteractiveHigh,
Priority::InteractiveLow => Self::InteractiveLow,
Priority::DataHigh => Self::DataHigh,
Priority::Data => Self::Data,
Priority::DataLow => Self::DataLow,
Priority::Background => Self::Background,
}
}
}
type ProtocolPriority = zenoh_protocol::core::Priority;
impl From<Priority> for ProtocolPriority {
fn from(prio: Priority) -> Self {
unsafe { std::mem::transmute::<Priority, zenoh_protocol::core::Priority>(prio) }
}
}
impl TryFrom<ProtocolPriority> for Priority {
type Error = zenoh_result::Error;
fn try_from(priority: ProtocolPriority) -> Result<Self, Self::Error> {
match priority {
ProtocolPriority::Control => bail!("'Control' is not a valid priority value."),
ProtocolPriority::RealTime => Ok(Priority::RealTime),
ProtocolPriority::InteractiveHigh => Ok(Priority::InteractiveHigh),
ProtocolPriority::InteractiveLow => Ok(Priority::InteractiveLow),
ProtocolPriority::DataHigh => Ok(Priority::DataHigh),
ProtocolPriority::Data => Ok(Priority::Data),
ProtocolPriority::DataLow => Ok(Priority::DataLow),
ProtocolPriority::Background => Ok(Priority::Background),
}
}
}
#[cfg(test)]
mod tests {
use crate::{sample::SampleKind, Config, Wait};
#[cfg(feature = "internal")]
#[test]
fn priority_from() {
use std::convert::TryInto;
use zenoh_protocol::core::Priority as TPrio;
use super::Priority as APrio;
for i in APrio::MAX as u8..=APrio::MIN as u8 {
let p: APrio = i.try_into().unwrap();
match p {
APrio::RealTime => assert_eq!(p as u8, TPrio::RealTime as u8),
APrio::InteractiveHigh => assert_eq!(p as u8, TPrio::InteractiveHigh as u8),
APrio::InteractiveLow => assert_eq!(p as u8, TPrio::InteractiveLow as u8),
APrio::DataHigh => assert_eq!(p as u8, TPrio::DataHigh as u8),
APrio::Data => assert_eq!(p as u8, TPrio::Data as u8),
APrio::DataLow => assert_eq!(p as u8, TPrio::DataLow as u8),
APrio::Background => assert_eq!(p as u8, TPrio::Background as u8),
}
let t: TPrio = p.into();
assert_eq!(p as u8, t as u8);
}
}
#[test]
fn sample_kind_integrity_in_publication() {
use crate::api::session::open;
const KEY_EXPR: &str = "test/sample_kind_integrity/publication";
const VALUE: &str = "zenoh";
fn sample_kind_integrity_in_publication_with(kind: SampleKind) {
let session = open(Config::default()).wait().unwrap();
let sub = session.declare_subscriber(KEY_EXPR).wait().unwrap();
let pub_ = session.declare_publisher(KEY_EXPR).wait().unwrap();
match kind {
SampleKind::Put => pub_.put(VALUE).wait().unwrap(),
SampleKind::Delete => pub_.delete().wait().unwrap(),
}
let sample = sub.recv().unwrap();
assert_eq!(sample.kind, kind);
if let SampleKind::Put = kind {
assert_eq!(sample.payload.try_to_string().unwrap(), VALUE);
}
}
sample_kind_integrity_in_publication_with(SampleKind::Put);
sample_kind_integrity_in_publication_with(SampleKind::Delete);
}
#[test]
fn sample_kind_integrity_in_put_builder() {
use crate::api::session::open;
const KEY_EXPR: &str = "test/sample_kind_integrity/put_builder";
const VALUE: &str = "zenoh";
fn sample_kind_integrity_in_put_builder_with(kind: SampleKind) {
let session = open(Config::default()).wait().unwrap();
let sub = session.declare_subscriber(KEY_EXPR).wait().unwrap();
match kind {
SampleKind::Put => session.put(KEY_EXPR, VALUE).wait().unwrap(),
SampleKind::Delete => session.delete(KEY_EXPR).wait().unwrap(),
}
let sample = sub.recv().unwrap();
assert_eq!(sample.kind, kind);
if let SampleKind::Put = kind {
assert_eq!(sample.payload.try_to_string().unwrap(), VALUE);
}
}
sample_kind_integrity_in_put_builder_with(SampleKind::Put);
sample_kind_integrity_in_put_builder_with(SampleKind::Delete);
}
}