zenoh_protocol/transport/
mod.rspub mod close;
pub mod fragment;
pub mod frame;
pub mod init;
pub mod join;
pub mod keepalive;
pub mod oam;
pub mod open;
use core::fmt;
pub use close::Close;
pub use fragment::{Fragment, FragmentHeader};
pub use frame::{Frame, FrameHeader};
pub use init::{InitAck, InitSyn};
pub use join::Join;
pub use keepalive::KeepAlive;
pub use oam::Oam;
pub use open::{OpenAck, OpenSyn};
use crate::network::NetworkMessage;
pub type BatchSize = u16;
pub type AtomicBatchSize = core::sync::atomic::AtomicU16;
pub mod batch_size {
use super::BatchSize;
pub const UNICAST: BatchSize = BatchSize::MAX;
pub const MULTICAST: BatchSize = 8_192;
}
pub mod id {
pub const OAM: u8 = 0x00;
pub const INIT: u8 = 0x01; pub const OPEN: u8 = 0x02; pub const CLOSE: u8 = 0x03;
pub const KEEP_ALIVE: u8 = 0x04;
pub const FRAME: u8 = 0x05;
pub const FRAGMENT: u8 = 0x06;
pub const JOIN: u8 = 0x07; }
#[derive(Debug)]
pub struct TransportMessageLowLatency {
pub body: TransportBodyLowLatency,
}
impl TryFrom<NetworkMessage> for TransportMessageLowLatency {
type Error = zenoh_result::Error;
fn try_from(msg: NetworkMessage) -> Result<Self, Self::Error> {
Ok(Self {
body: TransportBodyLowLatency::Network(msg),
})
}
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum TransportBodyLowLatency {
Close(Close),
KeepAlive(KeepAlive),
Network(NetworkMessage),
}
pub type TransportSn = u32;
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct PrioritySn {
pub reliable: TransportSn,
pub best_effort: TransportSn,
}
impl PrioritySn {
pub const DEFAULT: Self = Self {
reliable: TransportSn::MIN,
best_effort: TransportSn::MIN,
};
#[cfg(feature = "test")]
pub fn rand() -> Self {
use rand::Rng;
let mut rng = rand::thread_rng();
Self {
reliable: rng.gen(),
best_effort: rng.gen(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TransportBody {
InitSyn(InitSyn),
InitAck(InitAck),
OpenSyn(OpenSyn),
OpenAck(OpenAck),
Close(Close),
KeepAlive(KeepAlive),
Frame(Frame),
Fragment(Fragment),
OAM(Oam),
Join(Join),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TransportMessage {
pub body: TransportBody,
#[cfg(feature = "stats")]
pub size: Option<core::num::NonZeroUsize>,
}
impl TransportMessage {
#[cfg(feature = "test")]
pub fn rand() -> Self {
use rand::Rng;
let mut rng = rand::thread_rng();
let body = match rng.gen_range(0..10) {
0 => TransportBody::InitSyn(InitSyn::rand()),
1 => TransportBody::InitAck(InitAck::rand()),
2 => TransportBody::OpenSyn(OpenSyn::rand()),
3 => TransportBody::OpenAck(OpenAck::rand()),
4 => TransportBody::Close(Close::rand()),
5 => TransportBody::KeepAlive(KeepAlive::rand()),
6 => TransportBody::Frame(Frame::rand()),
7 => TransportBody::Fragment(Fragment::rand()),
8 => TransportBody::OAM(Oam::rand()),
9 => TransportBody::Join(Join::rand()),
_ => unreachable!(),
};
Self {
body,
#[cfg(feature = "stats")]
size: None,
}
}
}
impl From<TransportBody> for TransportMessage {
fn from(body: TransportBody) -> Self {
Self {
body,
#[cfg(feature = "stats")]
size: None,
}
}
}
impl From<InitSyn> for TransportMessage {
fn from(init_syn: InitSyn) -> Self {
TransportBody::InitSyn(init_syn).into()
}
}
impl From<InitAck> for TransportMessage {
fn from(init_ack: InitAck) -> Self {
TransportBody::InitAck(init_ack).into()
}
}
impl From<OpenSyn> for TransportMessage {
fn from(open_syn: OpenSyn) -> Self {
TransportBody::OpenSyn(open_syn).into()
}
}
impl From<OpenAck> for TransportMessage {
fn from(open_ack: OpenAck) -> Self {
TransportBody::OpenAck(open_ack).into()
}
}
impl From<Close> for TransportMessage {
fn from(close: Close) -> Self {
TransportBody::Close(close).into()
}
}
impl From<KeepAlive> for TransportMessage {
fn from(keep_alive: KeepAlive) -> Self {
TransportBody::KeepAlive(keep_alive).into()
}
}
impl From<Frame> for TransportMessage {
fn from(frame: Frame) -> Self {
TransportBody::Frame(frame).into()
}
}
impl From<Fragment> for TransportMessage {
fn from(fragment: Fragment) -> Self {
TransportBody::Fragment(fragment).into()
}
}
impl From<Join> for TransportMessage {
fn from(join: Join) -> Self {
TransportBody::Join(join).into()
}
}
impl fmt::Display for TransportMessage {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
use TransportBody::*;
match &self.body {
OAM(_) => write!(f, "OAM"),
InitSyn(_) => write!(f, "InitSyn"),
InitAck(_) => write!(f, "InitAck"),
OpenSyn(_) => write!(f, "OpenSyn"),
OpenAck(_) => write!(f, "OpenAck"),
Close(_) => write!(f, "Close"),
KeepAlive(_) => write!(f, "KeepAlive"),
Frame(m) => {
write!(f, "Frame[")?;
let mut netmsgs = m.payload.iter().peekable();
while let Some(m) = netmsgs.next() {
m.fmt(f)?;
if netmsgs.peek().is_some() {
write!(f, ", ")?;
}
}
write!(f, "]")
}
Fragment(_) => write!(f, "Fragment"),
Join(_) => write!(f, "Join"),
}
}
}
pub mod ext {
use crate::{common::ZExtZ64, core::Priority};
#[repr(transparent)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct QoSType<const ID: u8> {
inner: u8,
}
impl<const ID: u8> QoSType<{ ID }> {
const P_MASK: u8 = 0b00000111;
pub const DEFAULT: Self = Self::new(Priority::DEFAULT);
pub const fn new(priority: Priority) -> Self {
Self {
inner: priority as u8,
}
}
pub const fn priority(&self) -> Priority {
unsafe { core::mem::transmute(self.inner & Self::P_MASK) }
}
#[cfg(feature = "test")]
pub fn rand() -> Self {
use rand::Rng;
let mut rng = rand::thread_rng();
let inner: u8 = rng.gen();
Self { inner }
}
}
impl<const ID: u8> Default for QoSType<{ ID }> {
fn default() -> Self {
Self::DEFAULT
}
}
impl<const ID: u8> From<ZExtZ64<{ ID }>> for QoSType<{ ID }> {
fn from(ext: ZExtZ64<{ ID }>) -> Self {
Self {
inner: ext.value as u8,
}
}
}
impl<const ID: u8> From<QoSType<{ ID }>> for ZExtZ64<{ ID }> {
fn from(ext: QoSType<{ ID }>) -> Self {
ZExtZ64::new(ext.inner as u64)
}
}
}