#[cfg(feature = "stats")]
use super::protocol::proto::ZenohBody;
use super::protocol::proto::ZenohMessage;
use super::transport::TransportUnicastInner;
#[cfg(feature = "stats")]
use zenoh_buffers::SplitBuffer;
use zenoh_core::zread;
impl TransportUnicastInner {
fn schedule_on_link(&self, msg: ZenohMessage) -> bool {
macro_rules! zpush {
($guard:expr, $pipeline:expr, $msg:expr) => {
let pl = $pipeline.clone();
drop($guard);
return pl.push_zenoh_message($msg);
};
}
let guard = zread!(self.links);
if let Some(pl) = guard
.iter()
.filter_map(|tl| {
if msg.is_reliable() == tl.link.is_reliable() {
tl.pipeline.as_ref()
} else {
None
}
})
.next()
{
zpush!(guard, pl, msg);
}
if let Some(pl) = guard.iter().filter_map(|tl| tl.pipeline.as_ref()).next() {
zpush!(guard, pl, msg);
}
log::trace!(
"Message dropped because the transport has no links: {}",
msg
);
false
}
#[allow(clippy::let_and_return)] #[inline(always)]
pub(super) fn schedule_first_fit(&self, msg: ZenohMessage) -> bool {
#[cfg(feature = "stats")]
match &msg.body {
ZenohBody::Data(data) => match data.reply_context {
Some(_) => {
self.stats.inc_tx_z_data_reply_msgs(1);
self.stats
.inc_tx_z_data_reply_payload_bytes(data.payload.len());
}
None => {
self.stats.inc_tx_z_data_msgs(1);
self.stats.inc_tx_z_data_payload_bytes(data.payload.len());
}
},
ZenohBody::Unit(unit) => match unit.reply_context {
Some(_) => self.stats.inc_tx_z_unit_reply_msgs(1),
None => self.stats.inc_tx_z_unit_msgs(1),
},
ZenohBody::Pull(_) => self.stats.inc_tx_z_pull_msgs(1),
ZenohBody::Query(_) => self.stats.inc_tx_z_query_msgs(1),
ZenohBody::Declare(_) => self.stats.inc_tx_z_declare_msgs(1),
ZenohBody::LinkStateList(_) => self.stats.inc_tx_z_linkstate_msgs(1),
}
let res = self.schedule_on_link(msg);
#[cfg(feature = "stats")]
if res {
self.stats.inc_tx_z_msgs(1);
} else {
self.stats.inc_tx_z_dropped(1);
}
res
}
}