use std::{convert::TryFrom, fmt, mem};
use serde::{Deserialize, Serialize};
use zenoh_config::qos::PublisherLocalityConf;
#[cfg(feature = "unstable")]
use zenoh_config::wrappers::EntityGlobalId;
#[cfg(feature = "unstable")]
use zenoh_protocol::core::Reliability;
use zenoh_protocol::{
core::{CongestionControl, Timestamp},
network::{declare::ext::QoSType, push},
zenoh::PushBody,
};
use crate::api::{
builders::sample::QoSBuilderTrait, bytes::ZBytes, encoding::Encoding,
handlers::CallbackParameter, key_expr::KeyExpr, publisher::Priority,
};
#[zenoh_macros::unstable]
pub type SourceSn = u32;
#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
pub enum Locality {
SessionLocal,
Remote,
#[default]
Any,
}
impl From<PublisherLocalityConf> for Locality {
fn from(value: PublisherLocalityConf) -> Self {
match value {
PublisherLocalityConf::SessionLocal => Self::SessionLocal,
PublisherLocalityConf::Remote => Self::Remote,
PublisherLocalityConf::Any => Self::Any,
}
}
}
impl From<Locality> for PublisherLocalityConf {
fn from(value: Locality) -> Self {
match value {
Locality::SessionLocal => Self::SessionLocal,
Locality::Remote => Self::Remote,
Locality::Any => Self::Any,
}
}
}
#[zenoh_macros::unstable]
#[derive(Debug, Clone)]
pub struct SourceInfo {
pub(crate) source_id: EntityGlobalId,
pub(crate) source_sn: SourceSn,
}
#[zenoh_macros::unstable]
impl SourceInfo {
#[zenoh_macros::unstable]
pub fn new(source_id: EntityGlobalId, source_sn: SourceSn) -> Self {
Self {
source_id,
source_sn,
}
}
#[zenoh_macros::unstable]
pub fn source_id(&self) -> &EntityGlobalId {
&self.source_id
}
#[zenoh_macros::unstable]
pub fn source_sn(&self) -> SourceSn {
self.source_sn
}
}
#[zenoh_macros::unstable]
impl<const ID: u8> From<zenoh_protocol::zenoh::ext::SourceInfoType<ID>> for SourceInfo {
fn from(value: zenoh_protocol::zenoh::ext::SourceInfoType<ID>) -> Self {
SourceInfo {
source_id: value.id.into(),
source_sn: value.sn,
}
}
}
#[zenoh_macros::unstable]
impl<const ID: u8> From<SourceInfo> for zenoh_protocol::zenoh::ext::SourceInfoType<ID> {
fn from(value: SourceInfo) -> Self {
zenoh_protocol::zenoh::ext::SourceInfoType {
id: value.source_id.into(),
sn: value.source_sn,
}
}
}
#[repr(u8)]
#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, Deserialize, Serialize)]
pub enum SampleKind {
#[default]
Put = 0,
Delete = 1,
}
impl fmt::Display for SampleKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
SampleKind::Put => write!(f, "PUT"),
SampleKind::Delete => write!(f, "DELETE"),
}
}
}
impl TryFrom<u64> for SampleKind {
type Error = u64;
fn try_from(kind: u64) -> Result<Self, u64> {
match kind {
0 => Ok(SampleKind::Put),
1 => Ok(SampleKind::Delete),
_ => Err(kind),
}
}
}
#[derive(Debug, Clone)]
pub struct SampleFields {
pub key_expr: KeyExpr<'static>,
pub payload: ZBytes,
pub kind: SampleKind,
pub encoding: Encoding,
pub timestamp: Option<Timestamp>,
pub express: bool,
pub priority: Priority,
pub congestion_control: CongestionControl,
#[cfg(feature = "unstable")]
pub reliability: Reliability,
#[cfg(feature = "unstable")]
pub source_info: Option<SourceInfo>,
pub attachment: Option<ZBytes>,
}
impl From<Sample> for SampleFields {
fn from(sample: Sample) -> Self {
SampleFields {
key_expr: sample.key_expr,
payload: sample.payload,
kind: sample.kind,
encoding: sample.encoding,
timestamp: sample.timestamp,
express: sample.qos.express(),
priority: sample.qos.priority(),
congestion_control: sample.qos.congestion_control(),
#[cfg(feature = "unstable")]
reliability: sample.reliability,
#[cfg(feature = "unstable")]
source_info: sample.source_info,
attachment: sample.attachment,
}
}
}
#[non_exhaustive]
#[derive(Clone, Debug)]
pub struct Sample {
pub(crate) key_expr: KeyExpr<'static>,
pub(crate) payload: ZBytes,
pub(crate) kind: SampleKind,
pub(crate) encoding: Encoding,
pub(crate) timestamp: Option<Timestamp>,
pub(crate) qos: QoS,
#[cfg(feature = "unstable")]
pub(crate) reliability: Reliability,
#[cfg(feature = "unstable")]
pub(crate) source_info: Option<SourceInfo>,
pub(crate) attachment: Option<ZBytes>,
}
impl Sample {
#[inline]
pub fn key_expr(&self) -> &KeyExpr<'static> {
&self.key_expr
}
#[inline]
pub fn payload(&self) -> &ZBytes {
&self.payload
}
#[inline]
pub fn payload_mut(&mut self) -> &mut ZBytes {
&mut self.payload
}
#[inline]
pub fn kind(&self) -> SampleKind {
self.kind
}
#[inline]
pub fn encoding(&self) -> &Encoding {
&self.encoding
}
#[inline]
pub fn timestamp(&self) -> Option<&Timestamp> {
self.timestamp.as_ref()
}
pub fn congestion_control(&self) -> CongestionControl {
self.qos.congestion_control()
}
pub fn priority(&self) -> Priority {
self.qos.priority()
}
#[zenoh_macros::unstable]
pub fn reliability(&self) -> Reliability {
self.reliability
}
pub fn express(&self) -> bool {
self.qos.express()
}
#[zenoh_macros::unstable]
#[inline]
pub fn source_info(&self) -> Option<&SourceInfo> {
self.source_info.as_ref()
}
#[inline]
pub fn attachment(&self) -> Option<&ZBytes> {
self.attachment.as_ref()
}
#[inline]
pub fn attachment_mut(&mut self) -> Option<&mut ZBytes> {
self.attachment.as_mut()
}
#[zenoh_macros::internal]
pub fn empty() -> Self {
Sample {
key_expr: KeyExpr::dummy(),
payload: ZBytes::new(),
kind: SampleKind::Put,
encoding: Encoding::default(),
timestamp: None,
qos: QoS::default(),
#[cfg(feature = "unstable")]
reliability: Reliability::default(),
#[cfg(feature = "unstable")]
source_info: None,
attachment: None,
}
}
pub(crate) fn from_push(
key_expr: KeyExpr<'static>,
qos: push::ext::QoSType,
body: &mut PushBody,
#[cfg(feature = "unstable")] reliability: Reliability,
) -> Self {
match body {
PushBody::Put(put) => Self {
key_expr,
payload: mem::take(&mut put.payload).into(),
kind: SampleKind::Put,
encoding: mem::take(&mut put.encoding).into(),
timestamp: put.timestamp,
qos: qos.into(),
#[cfg(feature = "unstable")]
reliability,
#[cfg(feature = "unstable")]
source_info: put.ext_sinfo.map(Into::into),
attachment: mem::take(&mut put.ext_attachment).map(Into::into),
},
PushBody::Del(del) => Self {
key_expr,
payload: Default::default(),
kind: SampleKind::Delete,
encoding: Default::default(),
timestamp: del.timestamp,
qos: qos.into(),
#[cfg(feature = "unstable")]
reliability,
#[cfg(feature = "unstable")]
source_info: del.ext_sinfo.map(Into::into),
attachment: mem::take(&mut del.ext_attachment).map(Into::into),
},
}
}
}
impl CallbackParameter for Sample {
#[cfg(feature = "unstable")]
type Message<'a> = (
KeyExpr<'static>,
push::ext::QoSType,
&'a mut PushBody,
Reliability,
);
#[cfg(not(feature = "unstable"))]
type Message<'a> = (KeyExpr<'static>, push::ext::QoSType, &'a mut PushBody);
fn from_message(msg: Self::Message<'_>) -> Self {
Self::from_push(
msg.0,
msg.1,
msg.2,
#[cfg(feature = "unstable")]
msg.3,
)
}
}
#[derive(Debug, Default, Copy, Clone, Eq, PartialEq)]
pub(crate) struct QoS {
inner: QoSType,
}
#[derive(Debug)]
pub(crate) struct QoSBuilder(QoS);
impl From<QoS> for QoSBuilder {
fn from(qos: QoS) -> Self {
QoSBuilder(qos)
}
}
impl From<QoSType> for QoSBuilder {
fn from(qos: QoSType) -> Self {
QoSBuilder(QoS { inner: qos })
}
}
impl From<QoSBuilder> for QoS {
fn from(builder: QoSBuilder) -> Self {
builder.0
}
}
#[zenoh_macros::internal_trait]
impl QoSBuilderTrait for QoSBuilder {
fn congestion_control(self, congestion_control: CongestionControl) -> Self {
let mut inner = self.0.inner;
inner.set_congestion_control(congestion_control);
Self(QoS { inner })
}
fn priority(self, priority: Priority) -> Self {
let mut inner = self.0.inner;
inner.set_priority(priority.into());
Self(QoS { inner })
}
fn express(self, is_express: bool) -> Self {
let mut inner = self.0.inner;
inner.set_is_express(is_express);
Self(QoS { inner })
}
}
impl QoS {
pub fn priority(&self) -> Priority {
match Priority::try_from(self.inner.get_priority()) {
Ok(p) => p,
Err(e) => {
tracing::trace!(
"Failed to convert priority: {}; replacing with default value",
e.to_string()
);
Priority::default()
}
}
}
pub fn congestion_control(&self) -> CongestionControl {
self.inner.get_congestion_control()
}
pub fn express(&self) -> bool {
self.inner.is_express()
}
}
impl From<QoSType> for QoS {
fn from(qos: QoSType) -> Self {
QoS { inner: qos }
}
}
impl From<QoS> for QoSType {
fn from(qos: QoS) -> Self {
qos.inner
}
}