pub mod declare;
pub mod interest;
pub mod oam;
pub mod push;
pub mod request;
pub mod response;
use core::fmt;
pub use declare::{
Declare, DeclareBody, DeclareFinal, DeclareKeyExpr, DeclareQueryable, DeclareSubscriber,
DeclareToken, UndeclareKeyExpr, UndeclareQueryable, UndeclareSubscriber, UndeclareToken,
};
pub use interest::Interest;
pub use oam::Oam;
pub use push::Push;
pub use request::{AtomicRequestId, Request, RequestId};
pub use response::{Response, ResponseFinal};
use crate::core::{CongestionControl, Priority, Reliability, WireExpr};
#[cfg(feature = "shared-memory")]
use crate::zenoh::{PushBody, RequestBody, ResponseBody};
pub mod id {
pub const OAM: u8 = 0x1f;
pub const DECLARE: u8 = 0x1e;
pub const PUSH: u8 = 0x1d;
pub const REQUEST: u8 = 0x1c;
pub const RESPONSE: u8 = 0x1b;
pub const RESPONSE_FINAL: u8 = 0x1a;
pub const INTEREST: u8 = 0x19;
}
#[repr(u8)]
#[derive(Debug, Default, Clone, Copy, Hash, PartialEq, Eq)]
pub enum Mapping {
#[default]
Receiver = 0,
Sender = 1,
}
impl Mapping {
pub const DEFAULT: Self = Self::Receiver;
#[cfg(feature = "test")]
#[doc(hidden)]
pub fn rand() -> Self {
use rand::Rng;
let mut rng = rand::thread_rng();
if rng.gen_bool(0.5) {
Mapping::Sender
} else {
Mapping::Receiver
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum NetworkBody {
Push(Push),
Request(Request),
Response(Response),
ResponseFinal(ResponseFinal),
Interest(Interest),
Declare(Declare),
OAM(Oam),
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum NetworkBodyRef<'a> {
Push(&'a Push),
Request(&'a Request),
Response(&'a Response),
ResponseFinal(&'a ResponseFinal),
Interest(&'a Interest),
Declare(&'a Declare),
OAM(&'a Oam),
}
#[derive(Debug, PartialEq, Eq)]
pub enum NetworkBodyMut<'a> {
Push(&'a mut Push),
Request(&'a mut Request),
Response(&'a mut Response),
ResponseFinal(&'a mut ResponseFinal),
Interest(&'a mut Interest),
Declare(&'a mut Declare),
OAM(&'a mut Oam),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NetworkMessage {
pub body: NetworkBody,
pub reliability: Reliability,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct NetworkMessageRef<'a> {
pub body: NetworkBodyRef<'a>,
pub reliability: Reliability,
}
#[derive(Debug, PartialEq, Eq)]
pub struct NetworkMessageMut<'a> {
pub body: NetworkBodyMut<'a>,
pub reliability: Reliability,
}
pub trait NetworkMessageExt {
#[doc(hidden)]
fn body(&self) -> NetworkBodyRef<'_>;
#[doc(hidden)]
fn reliability(&self) -> Reliability;
#[inline]
fn is_reliable(&self) -> bool {
self.reliability() == Reliability::Reliable
}
#[inline]
fn is_express(&self) -> bool {
match self.body() {
NetworkBodyRef::Push(msg) => msg.ext_qos.is_express(),
NetworkBodyRef::Request(msg) => msg.ext_qos.is_express(),
NetworkBodyRef::Response(msg) => msg.ext_qos.is_express(),
NetworkBodyRef::ResponseFinal(msg) => msg.ext_qos.is_express(),
NetworkBodyRef::Interest(msg) => msg.ext_qos.is_express(),
NetworkBodyRef::Declare(msg) => msg.ext_qos.is_express(),
NetworkBodyRef::OAM(msg) => msg.ext_qos.is_express(),
}
}
#[inline]
fn congestion_control(&self) -> CongestionControl {
match self.body() {
NetworkBodyRef::Push(msg) => msg.ext_qos.get_congestion_control(),
NetworkBodyRef::Request(msg) => msg.ext_qos.get_congestion_control(),
NetworkBodyRef::Response(msg) => msg.ext_qos.get_congestion_control(),
NetworkBodyRef::ResponseFinal(msg) => msg.ext_qos.get_congestion_control(),
NetworkBodyRef::Interest(msg) => msg.ext_qos.get_congestion_control(),
NetworkBodyRef::Declare(msg) => msg.ext_qos.get_congestion_control(),
NetworkBodyRef::OAM(msg) => msg.ext_qos.get_congestion_control(),
}
}
#[inline]
#[cfg(feature = "shared-memory")]
fn is_shm(&self) -> bool {
match self.body() {
NetworkBodyRef::Push(Push { payload, .. }) => match payload {
PushBody::Put(p) => p.ext_shm.is_some(),
PushBody::Del(_) => false,
},
NetworkBodyRef::Request(Request { payload, .. }) => match payload {
RequestBody::Query(b) => b.ext_body.as_ref().is_some_and(|b| b.ext_shm.is_some()),
},
NetworkBodyRef::Response(Response { payload, .. }) => match payload {
ResponseBody::Reply(b) => match &b.payload {
PushBody::Put(p) => p.ext_shm.is_some(),
PushBody::Del(_) => false,
},
ResponseBody::Err(e) => e.ext_shm.is_some(),
},
NetworkBodyRef::ResponseFinal(_)
| NetworkBodyRef::Interest(_)
| NetworkBodyRef::Declare(_)
| NetworkBodyRef::OAM(_) => false,
}
}
#[inline]
fn is_droppable(&self) -> bool {
!self.is_reliable() || self.congestion_control() == CongestionControl::Drop
}
#[inline]
fn priority(&self) -> Priority {
match self.body() {
NetworkBodyRef::Push(msg) => msg.ext_qos.get_priority(),
NetworkBodyRef::Request(msg) => msg.ext_qos.get_priority(),
NetworkBodyRef::Response(msg) => msg.ext_qos.get_priority(),
NetworkBodyRef::ResponseFinal(msg) => msg.ext_qos.get_priority(),
NetworkBodyRef::Interest(msg) => msg.ext_qos.get_priority(),
NetworkBodyRef::Declare(msg) => msg.ext_qos.get_priority(),
NetworkBodyRef::OAM(msg) => msg.ext_qos.get_priority(),
}
}
#[inline]
fn wire_expr(&self) -> Option<&WireExpr<'_>> {
match &self.body() {
NetworkBodyRef::Push(m) => Some(&m.wire_expr),
NetworkBodyRef::Request(m) => Some(&m.wire_expr),
NetworkBodyRef::Response(m) => Some(&m.wire_expr),
NetworkBodyRef::ResponseFinal(_) => None,
NetworkBodyRef::Interest(m) => m.wire_expr.as_ref(),
NetworkBodyRef::Declare(m) => match &m.body {
DeclareBody::DeclareKeyExpr(m) => Some(&m.wire_expr),
DeclareBody::UndeclareKeyExpr(_) => None,
DeclareBody::DeclareSubscriber(m) => Some(&m.wire_expr),
DeclareBody::UndeclareSubscriber(m) => Some(&m.ext_wire_expr.wire_expr),
DeclareBody::DeclareQueryable(m) => Some(&m.wire_expr),
DeclareBody::UndeclareQueryable(m) => Some(&m.ext_wire_expr.wire_expr),
DeclareBody::DeclareToken(m) => Some(&m.wire_expr),
DeclareBody::UndeclareToken(m) => Some(&m.ext_wire_expr.wire_expr),
DeclareBody::DeclareFinal(_) => None,
},
NetworkBodyRef::OAM(_) => None,
}
}
#[inline]
fn payload_size(&self) -> Option<usize> {
match &self.body() {
NetworkBodyRef::Push(p) => Some(p.payload_size()),
NetworkBodyRef::Request(r) => Some(r.payload_size()),
NetworkBodyRef::Response(r) => Some(r.payload_size()),
NetworkBodyRef::ResponseFinal(_)
| NetworkBodyRef::Interest(_)
| NetworkBodyRef::Declare(_)
| NetworkBodyRef::OAM(_) => None,
}
}
#[inline]
fn as_ref(&self) -> NetworkMessageRef<'_> {
NetworkMessageRef {
body: self.body(),
reliability: self.reliability(),
}
}
#[inline]
fn to_owned(&self) -> NetworkMessage {
NetworkMessage {
body: match self.body() {
NetworkBodyRef::Push(msg) => NetworkBody::Push(msg.clone()),
NetworkBodyRef::Request(msg) => NetworkBody::Request(msg.clone()),
NetworkBodyRef::Response(msg) => NetworkBody::Response(msg.clone()),
NetworkBodyRef::ResponseFinal(msg) => NetworkBody::ResponseFinal(msg.clone()),
NetworkBodyRef::Interest(msg) => NetworkBody::Interest(msg.clone()),
NetworkBodyRef::Declare(msg) => NetworkBody::Declare(msg.clone()),
NetworkBodyRef::OAM(msg) => NetworkBody::OAM(msg.clone()),
},
reliability: self.reliability(),
}
}
}
impl<M: NetworkMessageExt> NetworkMessageExt for &M {
fn body(&self) -> NetworkBodyRef<'_> {
(**self).body()
}
fn reliability(&self) -> Reliability {
(**self).reliability()
}
}
impl<M: NetworkMessageExt> NetworkMessageExt for &mut M {
fn body(&self) -> NetworkBodyRef<'_> {
(**self).body()
}
fn reliability(&self) -> Reliability {
(**self).reliability()
}
}
impl NetworkMessageExt for NetworkMessage {
fn body(&self) -> NetworkBodyRef<'_> {
match &self.body {
NetworkBody::Push(body) => NetworkBodyRef::Push(body),
NetworkBody::Request(body) => NetworkBodyRef::Request(body),
NetworkBody::Response(body) => NetworkBodyRef::Response(body),
NetworkBody::ResponseFinal(body) => NetworkBodyRef::ResponseFinal(body),
NetworkBody::Interest(body) => NetworkBodyRef::Interest(body),
NetworkBody::Declare(body) => NetworkBodyRef::Declare(body),
NetworkBody::OAM(body) => NetworkBodyRef::OAM(body),
}
}
fn reliability(&self) -> Reliability {
self.reliability
}
}
impl NetworkMessageExt for NetworkMessageRef<'_> {
fn body(&self) -> NetworkBodyRef<'_> {
self.body
}
fn reliability(&self) -> Reliability {
self.reliability
}
}
impl NetworkMessageExt for NetworkMessageMut<'_> {
fn body(&self) -> NetworkBodyRef<'_> {
match &self.body {
NetworkBodyMut::Push(body) => NetworkBodyRef::Push(body),
NetworkBodyMut::Request(body) => NetworkBodyRef::Request(body),
NetworkBodyMut::Response(body) => NetworkBodyRef::Response(body),
NetworkBodyMut::ResponseFinal(body) => NetworkBodyRef::ResponseFinal(body),
NetworkBodyMut::Interest(body) => NetworkBodyRef::Interest(body),
NetworkBodyMut::Declare(body) => NetworkBodyRef::Declare(body),
NetworkBodyMut::OAM(body) => NetworkBodyRef::OAM(body),
}
}
fn reliability(&self) -> Reliability {
self.reliability
}
}
impl NetworkMessage {
#[cfg(feature = "test")]
#[doc(hidden)]
pub fn rand() -> Self {
use rand::Rng;
let mut rng = rand::thread_rng();
let body = match rng.gen_range(0..6) {
0 => NetworkBody::Push(Push::rand()),
1 => NetworkBody::Request(Request::rand()),
2 => NetworkBody::Response(Response::rand()),
3 => NetworkBody::ResponseFinal(ResponseFinal::rand()),
4 => NetworkBody::Declare(Declare::rand()),
5 => NetworkBody::OAM(Oam::rand()),
_ => unreachable!(),
};
body.into()
}
#[inline]
pub fn as_mut(&mut self) -> NetworkMessageMut<'_> {
let body = match &mut self.body {
NetworkBody::Push(body) => NetworkBodyMut::Push(body),
NetworkBody::Request(body) => NetworkBodyMut::Request(body),
NetworkBody::Response(body) => NetworkBodyMut::Response(body),
NetworkBody::ResponseFinal(body) => NetworkBodyMut::ResponseFinal(body),
NetworkBody::Interest(body) => NetworkBodyMut::Interest(body),
NetworkBody::Declare(body) => NetworkBodyMut::Declare(body),
NetworkBody::OAM(body) => NetworkBodyMut::OAM(body),
};
NetworkMessageMut {
body,
reliability: self.reliability,
}
}
}
impl NetworkMessageMut<'_> {
#[inline]
pub fn as_mut(&mut self) -> NetworkMessageMut<'_> {
let body = match &mut self.body {
NetworkBodyMut::Push(body) => NetworkBodyMut::Push(body),
NetworkBodyMut::Request(body) => NetworkBodyMut::Request(body),
NetworkBodyMut::Response(body) => NetworkBodyMut::Response(body),
NetworkBodyMut::ResponseFinal(body) => NetworkBodyMut::ResponseFinal(body),
NetworkBodyMut::Interest(body) => NetworkBodyMut::Interest(body),
NetworkBodyMut::Declare(body) => NetworkBodyMut::Declare(body),
NetworkBodyMut::OAM(body) => NetworkBodyMut::OAM(body),
};
NetworkMessageMut {
body,
reliability: self.reliability,
}
}
}
impl fmt::Display for NetworkMessageRef<'_> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match &self.body {
NetworkBodyRef::OAM(_) => write!(f, "OAM"),
NetworkBodyRef::Push(_) => write!(f, "Push"),
NetworkBodyRef::Request(_) => write!(f, "Request"),
NetworkBodyRef::Response(_) => write!(f, "Response"),
NetworkBodyRef::ResponseFinal(_) => write!(f, "ResponseFinal"),
NetworkBodyRef::Interest(_) => write!(f, "Interest"),
NetworkBodyRef::Declare(_) => write!(f, "Declare"),
}
}
}
impl fmt::Display for NetworkMessage {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.as_ref().fmt(f)
}
}
impl fmt::Display for NetworkMessageMut<'_> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.as_ref().fmt(f)
}
}
impl From<NetworkBody> for NetworkMessage {
#[inline]
fn from(body: NetworkBody) -> Self {
Self {
body,
reliability: Reliability::DEFAULT,
}
}
}
#[cfg(feature = "test")]
impl From<Push> for NetworkMessage {
fn from(push: Push) -> Self {
NetworkBody::Push(push).into()
}
}
pub mod ext {
use core::fmt;
use crate::{
common::{imsg, ZExtZ64},
core::{CongestionControl, EntityId, Priority, ZenohIdProto},
};
#[repr(transparent)]
#[derive(Clone, Copy, PartialEq, Eq)]
pub struct QoSType<const ID: u8> {
inner: u8,
}
impl<const ID: u8> QoSType<{ ID }> {
const P_MASK: u8 = 0b00000111;
const D_FLAG: u8 = 0b00001000;
const E_FLAG: u8 = 0b00010000;
const F_FLAG: u8 = 0b00100000;
pub const DEFAULT: Self = Self::new(Priority::DEFAULT, CongestionControl::DEFAULT, false);
pub const DECLARE: Self =
Self::new(Priority::Control, CongestionControl::DEFAULT_DECLARE, false);
pub const INTEREST: Self = Self::new(
Priority::Control,
CongestionControl::DEFAULT_INTEREST,
false,
);
pub const PUSH: Self = Self::new(Priority::DEFAULT, CongestionControl::DEFAULT_PUSH, false);
pub const REQUEST: Self =
Self::new(Priority::DEFAULT, CongestionControl::DEFAULT_REQUEST, false);
pub const OAM: Self = Self::new(Priority::Control, CongestionControl::DEFAULT_OAM, false);
pub const fn new(
priority: Priority,
congestion_control: CongestionControl,
is_express: bool,
) -> Self {
let mut inner = priority as u8;
match congestion_control {
CongestionControl::Block => inner |= Self::D_FLAG,
#[cfg(feature = "unstable")]
CongestionControl::BlockFirst => inner |= Self::F_FLAG,
_ => {}
}
if is_express {
inner |= Self::E_FLAG;
}
Self { inner }
}
pub fn set_priority(&mut self, priority: Priority) {
self.inner = imsg::set_bitfield(self.inner, priority as u8, Self::P_MASK);
}
pub const fn get_priority(&self) -> Priority {
unsafe { core::mem::transmute(self.inner & Self::P_MASK) }
}
pub fn set_congestion_control(&mut self, cctrl: CongestionControl) {
match cctrl {
CongestionControl::Block => {
self.inner = imsg::set_flag(self.inner, Self::D_FLAG);
self.inner = imsg::unset_flag(self.inner, Self::F_FLAG);
}
CongestionControl::Drop => {
self.inner = imsg::unset_flag(self.inner, Self::D_FLAG);
self.inner = imsg::unset_flag(self.inner, Self::F_FLAG);
}
#[cfg(feature = "unstable")]
CongestionControl::BlockFirst => {
self.inner = imsg::unset_flag(self.inner, Self::D_FLAG);
self.inner = imsg::set_flag(self.inner, Self::F_FLAG);
}
}
}
pub const fn get_congestion_control(&self) -> CongestionControl {
match (
imsg::has_flag(self.inner, Self::D_FLAG),
imsg::has_flag(self.inner, Self::F_FLAG),
) {
(false, false) => CongestionControl::Drop,
#[cfg(feature = "unstable")]
(false, true) => CongestionControl::BlockFirst,
#[cfg(not(feature = "unstable"))]
(false, true) => CongestionControl::Drop,
(true, _) => CongestionControl::Block,
}
}
pub fn set_is_express(&mut self, is_express: bool) {
match is_express {
true => self.inner = imsg::set_flag(self.inner, Self::E_FLAG),
false => self.inner = imsg::unset_flag(self.inner, Self::E_FLAG),
}
}
pub const fn is_express(&self) -> bool {
imsg::has_flag(self.inner, Self::E_FLAG)
}
#[cfg(feature = "test")]
#[doc(hidden)]
pub fn rand() -> Self {
use rand::Rng;
let mut rng = rand::thread_rng();
let inner: u8 = rng.gen();
Self { inner }
}
}
impl<const ID: u8> Default for QoSType<{ ID }> {
fn default() -> Self {
Self::new(Priority::DEFAULT, CongestionControl::DEFAULT, false)
}
}
impl<const ID: u8> From<ZExtZ64<{ ID }>> for QoSType<{ ID }> {
fn from(ext: ZExtZ64<{ ID }>) -> Self {
Self {
inner: ext.value as u8,
}
}
}
impl<const ID: u8> From<QoSType<{ ID }>> for ZExtZ64<{ ID }> {
fn from(ext: QoSType<{ ID }>) -> Self {
ZExtZ64::new(ext.inner as u64)
}
}
impl<const ID: u8> fmt::Debug for QoSType<{ ID }> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("QoS")
.field("priority", &self.get_priority())
.field("congestion", &self.get_congestion_control())
.field("express", &self.is_express())
.finish()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct TimestampType<const ID: u8> {
pub timestamp: uhlc::Timestamp,
}
impl<const ID: u8> TimestampType<{ ID }> {
#[cfg(feature = "test")]
#[doc(hidden)]
pub fn rand() -> Self {
use rand::Rng;
let mut rng = rand::thread_rng();
let time = uhlc::NTP64(rng.gen());
let id = uhlc::ID::try_from(ZenohIdProto::rand().to_le_bytes()).unwrap();
let timestamp = uhlc::Timestamp::new(time, id);
Self { timestamp }
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct NodeIdType<const ID: u8> {
pub node_id: u16,
}
impl<const ID: u8> NodeIdType<{ ID }> {
pub const DEFAULT: Self = Self { node_id: 0 };
#[cfg(feature = "test")]
#[doc(hidden)]
pub fn rand() -> Self {
use rand::Rng;
let mut rng = rand::thread_rng();
let node_id = rng.gen();
Self { node_id }
}
}
impl<const ID: u8> Default for NodeIdType<{ ID }> {
fn default() -> Self {
Self::DEFAULT
}
}
impl<const ID: u8> From<ZExtZ64<{ ID }>> for NodeIdType<{ ID }> {
fn from(ext: ZExtZ64<{ ID }>) -> Self {
Self {
node_id: ext.value as u16,
}
}
}
impl<const ID: u8> From<NodeIdType<{ ID }>> for ZExtZ64<{ ID }> {
fn from(ext: NodeIdType<{ ID }>) -> Self {
ZExtZ64::new(ext.node_id as u64)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EntityGlobalIdType<const ID: u8> {
pub zid: ZenohIdProto,
pub eid: EntityId,
}
impl<const ID: u8> EntityGlobalIdType<{ ID }> {
#[cfg(feature = "test")]
#[doc(hidden)]
pub fn rand() -> Self {
use rand::Rng;
let mut rng = rand::thread_rng();
let zid = ZenohIdProto::rand();
let eid: EntityId = rng.gen();
Self { zid, eid }
}
}
}