use std::sync::{Arc, OnceLock};
use zenoh_protocol::{
core::{Locator, Priority, WhatAmI, ZenohIdProto},
network::{NetworkMessageExt, NetworkMessageRef},
};
use crate::{
histogram::Histogram,
keys::HistogramPerKey,
labels::{
MessageLabel, NetworkMessageDroppedPayloadLabels, NetworkMessagePayloadLabels,
ProtocolLabel, SpaceLabel, TransportLabels, SHM_NUM,
},
LinkStats, ReasonLabel, StatsDirection, StatsKeys, StatsRegistry, Tx,
};
#[derive(Debug, Clone)]
pub struct TransportStats(Arc<TransportStatsInner>);
impl TransportStats {
pub(crate) fn new(
registry: StatsRegistry,
zid: Option<ZenohIdProto>,
whatami: Option<WhatAmI>,
cn: Option<String>,
group: Option<String>,
) -> Self {
let transport = TransportLabels {
remote_zid: zid.map(Into::into),
remote_whatami: whatami.map(Into::into),
remote_group: group,
remote_cn: cn,
};
let tx_no_link = DropStats::new(
registry.clone(),
transport.clone(),
ReasonLabel::NoLink,
None,
);
Self(Arc::new(TransportStatsInner {
registry,
transport,
network_message_payload: Default::default(),
tx_no_link,
}))
}
pub(crate) fn registry(&self) -> &StatsRegistry {
&self.0.registry
}
pub(crate) fn transport(&self) -> &TransportLabels {
&self.0.transport
}
pub fn link_stats(&self, src: &Locator, dst: &Locator) -> LinkStats {
let link = (src, dst).into();
self.registry().add_link(&link);
LinkStats::new(self.clone(), link)
}
pub fn peer_link_stats(
&self,
peer_zid: ZenohIdProto,
peer_whatami: WhatAmI,
link_stats: &LinkStats,
) -> LinkStats {
assert!(self.transport().remote_group.is_some());
let stats = Self::new(
self.registry().clone(),
Some(peer_zid),
Some(peer_whatami),
None,
self.transport().remote_group.clone(),
);
LinkStats::new(stats, link_stats.link().clone())
}
pub fn drop_stats(&self, reason: ReasonLabel) -> DropStats {
DropStats::new(
self.0.registry.clone(),
self.0.transport.clone(),
reason,
None,
)
}
#[allow(clippy::too_many_arguments)]
pub fn observe_network_message_payload(
&self,
direction: StatsDirection,
message: MessageLabel,
priority: Priority,
payload_size: usize,
space: SpaceLabel,
keys: &StatsKeys,
shm: bool,
) {
let (histogram, histogram_per_key) = self.0.network_message_payload[direction as usize]
[priority as usize][message as usize][shm as usize][space as usize]
.get_or_init(|| {
let labels = NetworkMessagePayloadLabels {
space,
message,
priority: priority.into(),
shm,
};
(
self.registry()
.network_message_payload(direction)
.get_or_create_owned(self.transport(), None, &labels),
self.registry()
.network_message_payload_per_key(direction)
.get_or_create_owned(self.transport(), None, &labels),
)
});
histogram.observe(payload_size as u64);
histogram_per_key.observe(keys, payload_size as u64);
}
pub fn tx_observe_no_link(&self, msg: NetworkMessageRef) {
self.0
.tx_no_link
.observe_network_message_dropped_payload(Tx, msg);
}
}
#[derive(Debug)]
pub struct TransportStatsInner {
registry: StatsRegistry,
transport: TransportLabels,
#[allow(clippy::type_complexity)]
network_message_payload: [[[[[OnceLock<(Histogram, HistogramPerKey)>; SpaceLabel::NUM]; SHM_NUM];
MessageLabel::NUM]; Priority::NUM]; StatsDirection::NUM],
tx_no_link: DropStats,
}
impl Drop for TransportStatsInner {
fn drop(&mut self) {
self.registry.remove_transport(&self.transport)
}
}
#[derive(Clone, Debug)]
pub struct DropStats(Arc<DropStatsInner>);
impl DropStats {
pub(crate) fn new(
registry: StatsRegistry,
transport: TransportLabels,
reason: ReasonLabel,
protocol: Option<ProtocolLabel>,
) -> Self {
Self(Arc::new(DropStatsInner {
registry,
transport,
reason,
protocol,
histograms: Default::default(),
}))
}
pub fn observe_network_message_dropped_payload(
&self,
direction: StatsDirection,
msg: impl NetworkMessageExt,
) {
self.0.histograms[direction as usize][msg.priority() as usize]
[MessageLabel::from(msg.body()) as usize]
.get_or_init(|| {
let labels = NetworkMessageDroppedPayloadLabels {
message: MessageLabel::from(msg.body()),
priority: msg.priority().into(),
protocol: self.0.protocol.clone(),
reason: self.0.reason,
};
self.0
.registry
.network_message_dropped_payload(direction)
.get_or_create_owned(&self.0.transport, None, &labels)
})
.observe(msg.payload_size().unwrap_or_default() as u64)
}
}
#[derive(Debug)]
struct DropStatsInner {
registry: StatsRegistry,
transport: TransportLabels,
reason: ReasonLabel,
protocol: Option<ProtocolLabel>,
histograms: [[[OnceLock<Histogram>; MessageLabel::NUM]; Priority::NUM]; StatsDirection::NUM],
}