use super::core::Encoding;
use super::core::*;
use super::defaults::SEQ_NUM_RES;
use super::io::{ZBuf, ZSlice};
use std::fmt;
use std::time::Duration;
use zenoh_protocol_core::whatami::WhatAmIMatcher;
pub(crate) mod imsg {
use super::ZInt;
pub(crate) mod id {
pub(crate) const JOIN: u8 = 0x00; pub(crate) const SCOUT: u8 = 0x01;
pub(crate) const HELLO: u8 = 0x02;
pub(crate) const INIT: u8 = 0x03; pub(crate) const OPEN: u8 = 0x04; pub(crate) const CLOSE: u8 = 0x05;
pub(crate) const SYNC: u8 = 0x06;
pub(crate) const ACK_NACK: u8 = 0x07;
pub(crate) const KEEP_ALIVE: u8 = 0x08;
pub(crate) const PING_PONG: u8 = 0x09;
pub(crate) const FRAME: u8 = 0x0a;
pub(crate) const DECLARE: u8 = 0x0b;
pub(crate) const DATA: u8 = 0x0c;
pub(crate) const QUERY: u8 = 0x0d;
pub(crate) const PULL: u8 = 0x0e;
pub(crate) const UNIT: u8 = 0x0f;
pub(crate) const LINK_STATE_LIST: u8 = 0x10;
pub(crate) const PRIORITY: u8 = 0x1c;
pub(crate) const ROUTING_CONTEXT: u8 = 0x1d;
pub(crate) const REPLY_CONTEXT: u8 = 0x1e;
pub(crate) const ATTACHMENT: u8 = 0x1f;
}
pub const HEADER_BITS: u8 = 5;
pub const HEADER_MASK: u8 = !(0xff << HEADER_BITS);
pub fn mid(header: u8) -> u8 {
header & HEADER_MASK
}
pub fn flags(header: u8) -> u8 {
header & !HEADER_MASK
}
pub fn has_flag(byte: u8, flag: u8) -> bool {
byte & flag != 0
}
pub fn has_option(options: ZInt, flag: ZInt) -> bool {
options & flag != 0
}
}
pub mod tmsg {
use super::{imsg, Priority, ZInt};
pub mod id {
use super::imsg;
pub const SCOUT: u8 = imsg::id::SCOUT;
pub const HELLO: u8 = imsg::id::HELLO;
pub const INIT: u8 = imsg::id::INIT;
pub const OPEN: u8 = imsg::id::OPEN;
pub const CLOSE: u8 = imsg::id::CLOSE;
pub const SYNC: u8 = imsg::id::SYNC;
pub const ACK_NACK: u8 = imsg::id::ACK_NACK;
pub const KEEP_ALIVE: u8 = imsg::id::KEEP_ALIVE;
pub const PING_PONG: u8 = imsg::id::PING_PONG;
pub const FRAME: u8 = imsg::id::FRAME;
pub const JOIN: u8 = imsg::id::JOIN;
pub const PRIORITY: u8 = imsg::id::PRIORITY;
pub const ATTACHMENT: u8 = imsg::id::ATTACHMENT;
}
pub mod flag {
pub const A: u8 = 1 << 5; pub const C: u8 = 1 << 6; pub const E: u8 = 1 << 7; pub const F: u8 = 1 << 6; pub const I: u8 = 1 << 5; pub const K: u8 = 1 << 6; pub const L: u8 = 1 << 7; pub const M: u8 = 1 << 5; pub const O: u8 = 1 << 7; pub const P: u8 = 1 << 5; pub const R: u8 = 1 << 5; pub const S: u8 = 1 << 6; pub const T1: u8 = 1 << 5; pub const T2: u8 = 1 << 6; pub const W: u8 = 1 << 6; pub const Z: u8 = 1 << 5;
pub const X: u8 = 0; }
pub mod init_options {
use super::ZInt;
pub const QOS: ZInt = 1 << 0; }
pub mod join_options {
use super::ZInt;
pub const QOS: ZInt = 1 << 0; }
pub mod close_reason {
pub const GENERIC: u8 = 0x00;
pub const UNSUPPORTED: u8 = 0x01;
pub const INVALID: u8 = 0x02;
pub const MAX_SESSIONS: u8 = 0x03;
pub const MAX_LINKS: u8 = 0x04;
pub const EXPIRED: u8 = 0x05;
}
pub fn close_reason_to_str(reason: u8) -> &'static str {
match reason {
close_reason::GENERIC => "GENERIC",
close_reason::UNSUPPORTED => "UNSUPPORTED",
close_reason::INVALID => "INVALID",
close_reason::MAX_SESSIONS => "MAX_SESSIONS",
close_reason::MAX_LINKS => "MAX_LINKS",
close_reason::EXPIRED => "EXPIRED",
_ => "UNKNOWN",
}
}
pub mod conduit {
use super::{imsg, Priority};
pub const CONTROL: u8 = (Priority::Control as u8) << imsg::HEADER_BITS;
pub const REAL_TIME: u8 = (Priority::RealTime as u8) << imsg::HEADER_BITS;
pub const INTERACTIVE_HIGH: u8 = (Priority::InteractiveHigh as u8) << imsg::HEADER_BITS;
pub const INTERACTIVE_LOW: u8 = (Priority::InteractiveLow as u8) << imsg::HEADER_BITS;
pub const DATA_HIGH: u8 = (Priority::DataHigh as u8) << imsg::HEADER_BITS;
pub const DATA: u8 = (Priority::Data as u8) << imsg::HEADER_BITS;
pub const DATA_LOW: u8 = (Priority::DataLow as u8) << imsg::HEADER_BITS;
pub const BACKGROUND: u8 = (Priority::Background as u8) << imsg::HEADER_BITS;
}
}
pub mod zmsg {
use super::{imsg, Channel, CongestionControl, Priority, Reliability, ZInt};
pub mod id {
use super::imsg;
pub const DECLARE: u8 = imsg::id::DECLARE;
pub const DATA: u8 = imsg::id::DATA;
pub const QUERY: u8 = imsg::id::QUERY;
pub const PULL: u8 = imsg::id::PULL;
pub const UNIT: u8 = imsg::id::UNIT;
pub const LINK_STATE_LIST: u8 = imsg::id::LINK_STATE_LIST;
pub const PRIORITY: u8 = imsg::id::PRIORITY;
pub const REPLY_CONTEXT: u8 = imsg::id::REPLY_CONTEXT;
pub const ATTACHMENT: u8 = imsg::id::ATTACHMENT;
pub const ROUTING_CONTEXT: u8 = imsg::id::ROUTING_CONTEXT;
}
pub mod flag {
pub const B: u8 = 1 << 6; pub const D: u8 = 1 << 5; pub const F: u8 = 1 << 5; pub const I: u8 = 1 << 6; pub const K: u8 = 1 << 7; pub const N: u8 = 1 << 6; pub const P: u8 = 1 << 0; pub const Q: u8 = 1 << 6; pub const R: u8 = 1 << 5; pub const S: u8 = 1 << 6; pub const T: u8 = 1 << 5;
pub const X: u8 = 0; }
pub mod data {
use super::ZInt;
pub mod info {
use super::ZInt;
#[cfg(feature = "shared-memory")]
pub const SLICED: ZInt = 1 << 0; pub const KIND: ZInt = 1 << 1; pub const ENCODING: ZInt = 1 << 2; pub const TIMESTAMP: ZInt = 1 << 3; pub const SRCID: ZInt = 1 << 7; pub const SRCSN: ZInt = 1 << 8; pub const RTRID: ZInt = 1 << 9; pub const RTRSN: ZInt = 1 << 10; }
}
pub mod declaration {
pub mod id {
pub const RESOURCE: u8 = 0x01;
pub const PUBLISHER: u8 = 0x02;
pub const SUBSCRIBER: u8 = 0x03;
pub const QUERYABLE: u8 = 0x04;
pub const FORGET_RESOURCE: u8 = 0x11;
pub const FORGET_PUBLISHER: u8 = 0x12;
pub const FORGET_SUBSCRIBER: u8 = 0x13;
pub const FORGET_QUERYABLE: u8 = 0x14;
pub const MODE_PUSH: u8 = 0x00;
pub const MODE_PULL: u8 = 0x01;
}
pub mod flag {
pub const PERIOD: u8 = 0x80;
}
}
pub mod link_state {
use super::ZInt;
pub const PID: ZInt = 1; pub const WAI: ZInt = 1 << 1; pub const LOC: ZInt = 1 << 2; }
pub mod conduit {
use super::{imsg, Priority};
pub const CONTROL: u8 = (Priority::Control as u8) << imsg::HEADER_BITS;
pub const REAL_TIME: u8 = (Priority::RealTime as u8) << imsg::HEADER_BITS;
pub const INTERACTIVE_HIGH: u8 = (Priority::InteractiveHigh as u8) << imsg::HEADER_BITS;
pub const INTERACTIVE_LOW: u8 = (Priority::InteractiveLow as u8) << imsg::HEADER_BITS;
pub const DATA_HIGH: u8 = (Priority::DataHigh as u8) << imsg::HEADER_BITS;
pub const DATA: u8 = (Priority::Data as u8) << imsg::HEADER_BITS;
pub const DATA_LOW: u8 = (Priority::DataLow as u8) << imsg::HEADER_BITS;
pub const BACKGROUND: u8 = (Priority::Background as u8) << imsg::HEADER_BITS;
}
pub mod default_channel {
use super::{Channel, Priority, Reliability};
pub const DECLARE: Channel = Channel {
priority: Priority::RealTime,
reliability: Reliability::Reliable,
};
pub const DATA: Channel = Channel {
priority: Priority::Data,
reliability: Reliability::BestEffort,
};
pub const QUERY: Channel = Channel {
priority: Priority::Data,
reliability: Reliability::Reliable,
};
pub const PULL: Channel = Channel {
priority: Priority::Data,
reliability: Reliability::Reliable,
};
pub const REPLY: Channel = Channel {
priority: Priority::Data,
reliability: Reliability::Reliable,
};
pub const UNIT: Channel = Channel {
priority: Priority::Data,
reliability: Reliability::BestEffort,
};
pub const LINK_STATE_LIST: Channel = Channel {
priority: Priority::Control,
reliability: Reliability::Reliable,
};
}
pub mod default_congestion_control {
use super::CongestionControl;
pub const DECLARE: CongestionControl = CongestionControl::Block;
pub const DATA: CongestionControl = CongestionControl::Drop;
pub const QUERY: CongestionControl = CongestionControl::Block;
pub const PULL: CongestionControl = CongestionControl::Block;
pub const REPLY: CongestionControl = CongestionControl::Block;
pub const UNIT: CongestionControl = CongestionControl::Block;
pub const LINK_STATE_LIST: CongestionControl = CongestionControl::Block;
}
}
pub trait Header {
fn header(&self) -> u8;
}
pub trait Options {
fn options(&self) -> ZInt;
fn has_options(&self) -> bool;
}
#[derive(Debug, Clone, PartialEq)]
pub struct Attachment {
pub buffer: ZBuf,
}
impl Header for Attachment {
#[inline(always)]
fn header(&self) -> u8 {
#[allow(unused_mut)]
let mut header = tmsg::id::ATTACHMENT;
#[cfg(feature = "shared-memory")]
if self.buffer.has_shminfo() {
header |= tmsg::flag::Z;
}
header
}
}
impl Attachment {
#[inline(always)]
pub fn new(buffer: ZBuf) -> Attachment {
Attachment { buffer }
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReplierInfo {
pub id: ZenohId,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReplyContext {
pub qid: ZInt,
pub replier: Option<ReplierInfo>,
}
impl Header for ReplyContext {
#[inline(always)]
fn header(&self) -> u8 {
let mut header = zmsg::id::REPLY_CONTEXT;
if self.is_final() {
header |= zmsg::flag::F;
}
header
}
}
impl ReplyContext {
#[inline(always)]
pub fn new(qid: ZInt, replier: Option<ReplierInfo>) -> ReplyContext {
ReplyContext { qid, replier }
}
#[inline(always)]
pub fn is_final(&self) -> bool {
self.replier.is_none()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RoutingContext {
pub tree_id: ZInt,
}
impl Header for RoutingContext {
#[inline(always)]
fn header(&self) -> u8 {
zmsg::id::ROUTING_CONTEXT
}
}
impl RoutingContext {
#[inline(always)]
pub fn new(tree_id: ZInt) -> RoutingContext {
RoutingContext { tree_id }
}
}
impl Header for Priority {
fn header(&self) -> u8 {
tmsg::id::PRIORITY | ((*self as u8) << imsg::HEADER_BITS)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct DataInfo {
#[cfg(feature = "shared-memory")]
pub sliced: bool,
pub kind: SampleKind,
pub encoding: Option<Encoding>,
pub timestamp: Option<Timestamp>,
pub source_id: Option<ZenohId>,
pub source_sn: Option<ZInt>,
}
impl DataInfo {
pub fn new() -> DataInfo {
DataInfo::default()
}
}
impl Options for DataInfo {
fn options(&self) -> ZInt {
let mut options = 0;
#[cfg(feature = "shared-memory")]
if self.sliced {
options |= zmsg::data::info::SLICED;
}
if self.kind != SampleKind::Put {
options |= zmsg::data::info::KIND;
}
if self.encoding.is_some() {
options |= zmsg::data::info::ENCODING;
}
if self.timestamp.is_some() {
options |= zmsg::data::info::TIMESTAMP;
}
if self.source_id.is_some() {
options |= zmsg::data::info::SRCID;
}
if self.source_sn.is_some() {
options |= zmsg::data::info::SRCSN;
}
options
}
fn has_options(&self) -> bool {
macro_rules! sliced {
($info:expr) => {{
#[cfg(feature = "shared-memory")]
{
$info.sliced
}
#[cfg(not(feature = "shared-memory"))]
{
false
}
}};
}
sliced!(self)
|| self.kind != SampleKind::Put
|| self.encoding.is_some()
|| self.timestamp.is_some()
|| self.source_id.is_some()
|| self.source_sn.is_some()
}
}
impl PartialOrd for DataInfo {
fn partial_cmp(&self, other: &DataInfo) -> Option<std::cmp::Ordering> {
self.timestamp.partial_cmp(&other.timestamp)
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct Data {
pub key: WireExpr<'static>,
pub data_info: Option<DataInfo>,
pub payload: ZBuf,
pub congestion_control: CongestionControl,
pub reply_context: Option<ReplyContext>,
}
impl Header for Data {
#[inline(always)]
fn header(&self) -> u8 {
let mut header = zmsg::id::DATA;
if self.data_info.is_some() {
header |= zmsg::flag::I;
}
if self.key.has_suffix() {
header |= zmsg::flag::K;
}
if self.congestion_control == CongestionControl::Drop {
header |= zmsg::flag::D;
}
header
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Unit {
pub congestion_control: CongestionControl,
pub reply_context: Option<ReplyContext>,
}
impl Header for Unit {
#[inline(always)]
fn header(&self) -> u8 {
let mut header = zmsg::id::UNIT;
if self.congestion_control == CongestionControl::Drop {
header |= zmsg::flag::D;
}
header
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Declaration {
Resource(Resource),
ForgetResource(ForgetResource),
Publisher(Publisher),
ForgetPublisher(ForgetPublisher),
Subscriber(Subscriber),
ForgetSubscriber(ForgetSubscriber),
Queryable(Queryable),
ForgetQueryable(ForgetQueryable),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Resource {
pub expr_id: ZInt,
pub key: WireExpr<'static>,
}
impl Header for Resource {
#[inline(always)]
fn header(&self) -> u8 {
let mut header = zmsg::declaration::id::RESOURCE;
if self.key.has_suffix() {
header |= zmsg::flag::K;
}
header
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ForgetResource {
pub expr_id: ZInt,
}
impl Header for ForgetResource {
#[inline(always)]
fn header(&self) -> u8 {
zmsg::declaration::id::FORGET_RESOURCE
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Publisher {
pub key: WireExpr<'static>,
}
impl Header for Publisher {
#[inline(always)]
fn header(&self) -> u8 {
let mut header = zmsg::declaration::id::PUBLISHER;
if self.key.has_suffix() {
header |= zmsg::flag::K;
}
header
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ForgetPublisher {
pub key: WireExpr<'static>,
}
impl Header for ForgetPublisher {
#[inline(always)]
fn header(&self) -> u8 {
let mut header = zmsg::declaration::id::FORGET_PUBLISHER;
if self.key.has_suffix() {
header |= zmsg::flag::K;
}
header
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Subscriber {
pub key: WireExpr<'static>,
pub info: SubInfo,
}
impl Header for Subscriber {
#[inline(always)]
fn header(&self) -> u8 {
let mut header = zmsg::declaration::id::SUBSCRIBER;
if self.info.reliability == Reliability::Reliable {
header |= zmsg::flag::R;
}
if self.info.mode != SubMode::Push {
header |= zmsg::flag::S;
}
if self.key.has_suffix() {
header |= zmsg::flag::K;
}
header
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ForgetSubscriber {
pub key: WireExpr<'static>,
}
impl Header for ForgetSubscriber {
#[inline(always)]
fn header(&self) -> u8 {
let mut header = zmsg::declaration::id::FORGET_SUBSCRIBER;
if self.key.has_suffix() {
header |= zmsg::flag::K;
}
header
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Queryable {
pub key: WireExpr<'static>,
pub info: QueryableInfo,
}
impl Header for Queryable {
#[inline(always)]
fn header(&self) -> u8 {
let mut header = zmsg::declaration::id::QUERYABLE;
if self.info != QueryableInfo::default() {
header |= zmsg::flag::Q;
}
if self.key.has_suffix() {
header |= zmsg::flag::K;
}
header
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ForgetQueryable {
pub key: WireExpr<'static>,
}
impl Header for ForgetQueryable {
#[inline(always)]
fn header(&self) -> u8 {
let mut header = zmsg::declaration::id::FORGET_QUERYABLE;
if self.key.has_suffix() {
header |= zmsg::flag::K
}
header
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Declare {
pub declarations: Vec<Declaration>,
}
impl Header for Declare {
#[inline(always)]
fn header(&self) -> u8 {
zmsg::id::DECLARE
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Pull {
pub key: WireExpr<'static>,
pub pull_id: ZInt,
pub max_samples: Option<ZInt>,
pub is_final: bool,
}
impl Header for Pull {
#[inline(always)]
fn header(&self) -> u8 {
let mut header = zmsg::id::PULL;
if self.is_final {
header |= zmsg::flag::F;
}
if self.max_samples.is_some() {
header |= zmsg::flag::N;
}
if self.key.has_suffix() {
header |= zmsg::flag::K;
}
header
}
}
#[derive(Debug, Clone, PartialEq, Default)]
pub struct QueryBody {
pub data_info: DataInfo,
pub payload: ZBuf,
}
#[derive(Debug, Clone, PartialEq)]
pub struct Query {
pub key: WireExpr<'static>,
pub parameters: String,
pub qid: ZInt,
pub target: Option<QueryTarget>,
pub consolidation: ConsolidationMode,
pub body: Option<QueryBody>,
}
impl Header for Query {
#[inline(always)]
fn header(&self) -> u8 {
let mut header = zmsg::id::QUERY;
if self.target.is_some() {
header |= zmsg::flag::T;
}
if self.body.is_some() {
header |= zmsg::flag::B;
}
if self.key.has_suffix() {
header |= zmsg::flag::K;
}
header
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct LinkState {
pub psid: ZInt,
pub sn: ZInt,
pub zid: Option<ZenohId>,
pub whatami: Option<WhatAmI>,
pub locators: Option<Vec<Locator>>,
pub links: Vec<ZInt>,
}
impl Options for LinkState {
fn options(&self) -> ZInt {
let mut opts = 0;
if self.zid.is_some() {
opts |= zmsg::link_state::PID;
}
if self.whatami.is_some() {
opts |= zmsg::link_state::WAI;
}
if self.locators.is_some() {
opts |= zmsg::link_state::LOC;
}
opts
}
fn has_options(&self) -> bool {
self.options() > 0
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct LinkStateList {
pub link_states: Vec<LinkState>,
}
impl Header for LinkStateList {
#[inline(always)]
fn header(&self) -> u8 {
zmsg::id::LINK_STATE_LIST
}
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug, Clone, PartialEq)]
pub enum ZenohBody {
Data(Data),
Declare(Declare),
Query(Query),
Pull(Pull),
Unit(Unit),
LinkStateList(LinkStateList),
}
#[derive(Clone, PartialEq)]
pub struct ZenohMessage {
pub body: ZenohBody,
pub channel: Channel,
pub routing_context: Option<RoutingContext>,
pub attachment: Option<Attachment>,
#[cfg(feature = "stats")]
pub size: Option<std::num::NonZeroUsize>,
}
impl fmt::Debug for ZenohMessage {
#[cfg(feature = "stats")]
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"{:?} {:?} {:?} {:?} {:?}",
self.body, self.channel, self.routing_context, self.attachment, self.size
)
}
#[cfg(not(feature = "stats"))]
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"{:?} {:?} {:?} {:?}",
self.body, self.channel, self.routing_context, self.attachment,
)
}
}
impl fmt::Display for ZenohMessage {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(self, f)
}
}
impl ZenohMessage {
pub fn make_declare(
declarations: Vec<Declaration>,
routing_context: Option<RoutingContext>,
attachment: Option<Attachment>,
) -> ZenohMessage {
ZenohMessage {
body: ZenohBody::Declare(Declare { declarations }),
channel: zmsg::default_channel::DECLARE,
routing_context,
attachment,
#[cfg(feature = "stats")]
size: None,
}
}
#[allow(clippy::too_many_arguments)]
#[inline(always)]
pub fn make_data(
key: WireExpr<'static>,
payload: ZBuf,
channel: Channel,
congestion_control: CongestionControl,
data_info: Option<DataInfo>,
routing_context: Option<RoutingContext>,
reply_context: Option<ReplyContext>,
attachment: Option<Attachment>,
) -> ZenohMessage {
ZenohMessage {
body: ZenohBody::Data(Data {
key,
data_info,
payload,
congestion_control,
reply_context,
}),
channel,
routing_context,
attachment,
#[cfg(feature = "stats")]
size: None,
}
}
pub fn make_unit(
channel: Channel,
congestion_control: CongestionControl,
reply_context: Option<ReplyContext>,
attachment: Option<Attachment>,
) -> ZenohMessage {
ZenohMessage {
body: ZenohBody::Unit(Unit {
congestion_control,
reply_context,
}),
channel,
routing_context: None,
attachment,
#[cfg(feature = "stats")]
size: None,
}
}
pub fn make_pull(
is_final: bool,
key: WireExpr<'static>,
pull_id: ZInt,
max_samples: Option<ZInt>,
attachment: Option<Attachment>,
) -> ZenohMessage {
ZenohMessage {
body: ZenohBody::Pull(Pull {
key,
pull_id,
max_samples,
is_final,
}),
channel: zmsg::default_channel::PULL,
routing_context: None,
attachment,
#[cfg(feature = "stats")]
size: None,
}
}
#[allow(clippy::too_many_arguments)]
#[inline(always)]
pub fn make_query(
key: WireExpr<'static>,
parameters: String,
qid: ZInt,
target: Option<QueryTarget>,
consolidation: ConsolidationMode,
body: Option<QueryBody>,
routing_context: Option<RoutingContext>,
attachment: Option<Attachment>,
) -> ZenohMessage {
ZenohMessage {
body: ZenohBody::Query(Query {
key,
parameters,
qid,
target,
consolidation,
body,
}),
channel: zmsg::default_channel::QUERY,
routing_context,
attachment,
#[cfg(feature = "stats")]
size: None,
}
}
pub fn make_link_state_list(
link_states: Vec<LinkState>,
attachment: Option<Attachment>,
) -> ZenohMessage {
ZenohMessage {
body: ZenohBody::LinkStateList(LinkStateList { link_states }),
channel: zmsg::default_channel::LINK_STATE_LIST,
routing_context: None,
attachment,
#[cfg(feature = "stats")]
size: None,
}
}
#[inline]
pub fn is_reliable(&self) -> bool {
self.channel.reliability == Reliability::Reliable
}
#[inline]
pub fn is_droppable(&self) -> bool {
if !self.is_reliable() {
return true;
}
let cc = match &self.body {
ZenohBody::Data(data) => data.congestion_control,
ZenohBody::Unit(unit) => unit.congestion_control,
ZenohBody::Declare(_) => zmsg::default_congestion_control::DECLARE,
ZenohBody::Pull(_) => zmsg::default_congestion_control::PULL,
ZenohBody::Query(_) => zmsg::default_congestion_control::QUERY,
ZenohBody::LinkStateList(_) => zmsg::default_congestion_control::LINK_STATE_LIST,
};
cc == CongestionControl::Drop
}
}
#[derive(Debug, Clone)]
pub enum TransportMode {
Push,
Pull,
PeriodicPush(u32),
PeriodicPull(u32),
PushPull,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Scout {
pub what: Option<WhatAmIMatcher>,
pub zid_request: bool,
}
impl Header for Scout {
#[inline(always)]
fn header(&self) -> u8 {
let mut header = tmsg::id::SCOUT;
if self.zid_request {
header |= tmsg::flag::I;
}
if self.what.is_some() {
header |= tmsg::flag::W;
}
header
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Hello {
pub zid: Option<ZenohId>,
pub whatami: Option<WhatAmI>,
pub locators: Option<Vec<Locator>>,
}
impl Header for Hello {
#[inline(always)]
fn header(&self) -> u8 {
let mut header = tmsg::id::HELLO;
if self.zid.is_some() {
header |= tmsg::flag::I
}
if self.whatami.is_some() && self.whatami.unwrap() != WhatAmI::Router {
header |= tmsg::flag::W;
}
if self.locators.is_some() {
header |= tmsg::flag::L;
}
header
}
}
impl fmt::Display for Hello {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let what = match self.whatami {
Some(what) => what.to_str(),
None => WhatAmI::Router.to_str(),
};
let locators = match &self.locators {
Some(locators) => locators
.iter()
.map(|locator| locator.to_string())
.collect::<Vec<String>>(),
None => vec![],
};
f.debug_struct("Hello")
.field("zid", &self.zid)
.field("whatami", &what)
.field("locators", &locators)
.finish()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct InitSyn {
pub version: u8,
pub whatami: WhatAmI,
pub zid: ZenohId,
pub sn_resolution: ZInt,
pub is_qos: bool,
}
impl Header for InitSyn {
#[inline(always)]
fn header(&self) -> u8 {
let mut header = tmsg::id::INIT;
if self.sn_resolution != SEQ_NUM_RES {
header |= tmsg::flag::S;
}
if self.has_options() {
header |= tmsg::flag::O;
}
header
}
}
impl Options for InitSyn {
fn options(&self) -> ZInt {
let mut options = 0;
if self.is_qos {
options |= tmsg::init_options::QOS;
}
options
}
fn has_options(&self) -> bool {
self.is_qos
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct InitAck {
pub whatami: WhatAmI,
pub zid: ZenohId,
pub sn_resolution: Option<ZInt>,
pub is_qos: bool,
pub cookie: ZSlice,
}
impl Header for InitAck {
#[inline(always)]
fn header(&self) -> u8 {
let mut header = tmsg::id::INIT;
header |= tmsg::flag::A;
if self.sn_resolution.is_some() {
header |= tmsg::flag::S;
}
if self.has_options() {
header |= tmsg::flag::O;
}
header
}
}
impl Options for InitAck {
fn options(&self) -> ZInt {
let mut options = 0;
if self.is_qos {
options |= tmsg::init_options::QOS;
}
options
}
fn has_options(&self) -> bool {
self.is_qos
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct OpenSyn {
pub lease: Duration,
pub initial_sn: ZInt,
pub cookie: ZSlice,
}
impl Header for OpenSyn {
#[inline(always)]
fn header(&self) -> u8 {
let mut header = tmsg::id::OPEN;
if self.lease.as_millis() % 1_000 == 0 {
header |= tmsg::flag::T2;
}
header
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct OpenAck {
pub lease: Duration,
pub initial_sn: ZInt,
}
impl Header for OpenAck {
#[inline(always)]
fn header(&self) -> u8 {
let mut header = tmsg::id::OPEN;
header |= tmsg::flag::A;
if self.lease.as_millis() % 1_000 == 0 {
header |= tmsg::flag::T2;
}
header
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Join {
pub version: u8,
pub whatami: WhatAmI,
pub zid: ZenohId,
pub lease: Duration,
pub sn_resolution: ZInt,
pub next_sns: ConduitSnList,
}
impl Join {
pub fn is_qos(&self) -> bool {
match self.next_sns {
ConduitSnList::QoS(_) => true,
ConduitSnList::Plain(_) => false,
}
}
}
impl Header for Join {
#[inline(always)]
fn header(&self) -> u8 {
let mut header = tmsg::id::JOIN;
if self.lease.as_millis() % 1_000 == 0 {
header |= tmsg::flag::T1;
}
if self.sn_resolution != SEQ_NUM_RES {
header |= tmsg::flag::S;
}
if self.has_options() {
header |= tmsg::flag::O;
}
header
}
}
impl Options for Join {
fn options(&self) -> ZInt {
let mut options = 0;
if self.is_qos() {
options |= tmsg::join_options::QOS;
}
options
}
fn has_options(&self) -> bool {
self.options() > 0
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Close {
pub zid: Option<ZenohId>,
pub reason: u8,
pub link_only: bool,
}
impl Header for Close {
#[inline(always)]
fn header(&self) -> u8 {
let mut header = tmsg::id::CLOSE;
if self.zid.is_some() {
header |= tmsg::flag::I;
}
if self.link_only {
header |= tmsg::flag::K;
}
header
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Sync {
pub reliability: Reliability,
pub sn: ZInt,
pub count: Option<ZInt>,
}
impl Header for Sync {
#[inline(always)]
fn header(&self) -> u8 {
let mut header = tmsg::id::SYNC;
if let Reliability::Reliable = self.reliability {
header |= tmsg::flag::R;
}
if self.count.is_some() {
header |= tmsg::flag::C;
}
header
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AckNack {
pub sn: ZInt,
pub mask: Option<ZInt>,
}
impl Header for AckNack {
#[inline(always)]
fn header(&self) -> u8 {
let mut header = tmsg::id::ACK_NACK;
if self.mask.is_some() {
header |= tmsg::flag::M;
}
header
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct KeepAlive {
pub zid: Option<ZenohId>,
}
impl Header for KeepAlive {
#[inline(always)]
fn header(&self) -> u8 {
let mut header = tmsg::id::KEEP_ALIVE;
if self.zid.is_some() {
header |= tmsg::flag::I;
}
header
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Ping {
pub hash: ZInt,
}
impl Header for Ping {
#[inline(always)]
fn header(&self) -> u8 {
let mut header = tmsg::id::PING_PONG;
header |= tmsg::flag::P;
header
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Pong {
pub hash: ZInt,
}
impl Header for Pong {
#[inline(always)]
fn header(&self) -> u8 {
tmsg::id::PING_PONG
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct Frame {
pub channel: Channel,
pub sn: ZInt,
pub payload: FramePayload,
}
impl Header for Frame {
#[inline(always)]
fn header(&self) -> u8 {
let mut header = tmsg::id::FRAME;
if let Reliability::Reliable = self.channel.reliability {
header |= tmsg::flag::R;
}
if let FramePayload::Fragment { is_final, .. } = self.payload {
header |= tmsg::flag::F;
if is_final {
header |= tmsg::flag::E;
}
}
header
}
}
impl Frame {
pub fn make_header(reliability: Reliability, is_fragment: Option<bool>) -> u8 {
let mut header = tmsg::id::FRAME;
if let Reliability::Reliable = reliability {
header |= tmsg::flag::R;
}
if let Some(is_final) = is_fragment {
header |= tmsg::flag::F;
if is_final {
header |= tmsg::flag::E;
}
}
header
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum FramePayload {
Fragment { buffer: ZSlice, is_final: bool },
Messages { messages: Vec<ZenohMessage> },
}
#[derive(Debug, Clone, PartialEq)]
pub enum TransportBody {
Scout(Scout),
Hello(Hello),
InitSyn(InitSyn),
InitAck(InitAck),
OpenSyn(OpenSyn),
OpenAck(OpenAck),
Join(Join),
Close(Close),
Sync(Sync),
AckNack(AckNack),
KeepAlive(KeepAlive),
Ping(Ping),
Pong(Pong),
Frame(Frame),
}
#[derive(Debug, Clone)]
pub struct TransportMessage {
pub body: TransportBody,
pub attachment: Option<Attachment>,
#[cfg(feature = "stats")]
pub size: Option<std::num::NonZeroUsize>,
}
impl TransportMessage {
pub fn make_scout(
what: Option<WhatAmIMatcher>,
zid_request: bool,
attachment: Option<Attachment>,
) -> TransportMessage {
TransportMessage {
body: TransportBody::Scout(Scout { what, zid_request }),
attachment,
#[cfg(feature = "stats")]
size: None,
}
}
pub fn make_hello(
zid: Option<ZenohId>,
whatami: Option<WhatAmI>,
locators: Option<Vec<Locator>>,
attachment: Option<Attachment>,
) -> TransportMessage {
TransportMessage {
body: TransportBody::Hello(Hello {
zid,
whatami,
locators,
}),
attachment,
#[cfg(feature = "stats")]
size: None,
}
}
pub fn make_init_syn(
version: u8,
whatami: WhatAmI,
zid: ZenohId,
sn_resolution: ZInt,
is_qos: bool,
attachment: Option<Attachment>,
) -> TransportMessage {
TransportMessage {
body: TransportBody::InitSyn(InitSyn {
version,
whatami,
zid,
sn_resolution,
is_qos,
}),
attachment,
#[cfg(feature = "stats")]
size: None,
}
}
pub fn make_init_ack(
whatami: WhatAmI,
zid: ZenohId,
sn_resolution: Option<ZInt>,
is_qos: bool,
cookie: ZSlice,
attachment: Option<Attachment>,
) -> TransportMessage {
TransportMessage {
body: TransportBody::InitAck(InitAck {
whatami,
zid,
sn_resolution,
is_qos,
cookie,
}),
attachment,
#[cfg(feature = "stats")]
size: None,
}
}
pub fn make_open_syn(
lease: Duration,
initial_sn: ZInt,
cookie: ZSlice,
attachment: Option<Attachment>,
) -> TransportMessage {
TransportMessage {
body: TransportBody::OpenSyn(OpenSyn {
lease,
initial_sn,
cookie,
}),
attachment,
#[cfg(feature = "stats")]
size: None,
}
}
pub fn make_open_ack(
lease: Duration,
initial_sn: ZInt,
attachment: Option<Attachment>,
) -> TransportMessage {
TransportMessage {
body: TransportBody::OpenAck(OpenAck { lease, initial_sn }),
attachment,
#[cfg(feature = "stats")]
size: None,
}
}
pub fn make_join(
version: u8,
whatami: WhatAmI,
zid: ZenohId,
lease: Duration,
sn_resolution: ZInt,
next_sns: ConduitSnList,
attachment: Option<Attachment>,
) -> TransportMessage {
TransportMessage {
body: TransportBody::Join(Join {
version,
whatami,
zid,
lease,
sn_resolution,
next_sns,
}),
attachment,
#[cfg(feature = "stats")]
size: None,
}
}
pub fn make_close(
zid: Option<ZenohId>,
reason: u8,
link_only: bool,
attachment: Option<Attachment>,
) -> TransportMessage {
TransportMessage {
body: TransportBody::Close(Close {
zid,
reason,
link_only,
}),
attachment,
#[cfg(feature = "stats")]
size: None,
}
}
pub fn make_sync(
reliability: Reliability,
sn: ZInt,
count: Option<ZInt>,
attachment: Option<Attachment>,
) -> TransportMessage {
TransportMessage {
body: TransportBody::Sync(Sync {
reliability,
sn,
count,
}),
attachment,
#[cfg(feature = "stats")]
size: None,
}
}
pub fn make_ack_nack(
sn: ZInt,
mask: Option<ZInt>,
attachment: Option<Attachment>,
) -> TransportMessage {
TransportMessage {
body: TransportBody::AckNack(AckNack { sn, mask }),
attachment,
#[cfg(feature = "stats")]
size: None,
}
}
pub fn make_keep_alive(
zid: Option<ZenohId>,
attachment: Option<Attachment>,
) -> TransportMessage {
TransportMessage {
body: TransportBody::KeepAlive(KeepAlive { zid }),
attachment,
#[cfg(feature = "stats")]
size: None,
}
}
pub fn make_ping(hash: ZInt, attachment: Option<Attachment>) -> TransportMessage {
TransportMessage {
body: TransportBody::Ping(Ping { hash }),
attachment,
#[cfg(feature = "stats")]
size: None,
}
}
pub fn make_pong(hash: ZInt, attachment: Option<Attachment>) -> TransportMessage {
TransportMessage {
body: TransportBody::Pong(Pong { hash }),
attachment,
#[cfg(feature = "stats")]
size: None,
}
}
pub fn make_frame(
channel: Channel,
sn: ZInt,
payload: FramePayload,
attachment: Option<Attachment>,
) -> TransportMessage {
TransportMessage {
body: TransportBody::Frame(Frame {
channel,
sn,
payload,
}),
attachment,
#[cfg(feature = "stats")]
size: None,
}
}
}
impl PartialEq for TransportMessage {
fn eq(&self, other: &Self) -> bool {
self.body.eq(&other.body) && self.attachment.eq(&other.attachment)
}
}