use std::{
array, fmt,
fmt::Write,
hash::Hash,
iter,
sync::{Arc, RwLock},
};
use prometheus_client::{
encoding::text::encode,
metrics::{
counter::Counter,
family::{Family, MetricConstructor},
gauge::Gauge,
info::Info,
},
registry::{Registry, Unit},
};
use zenoh_keyexpr::keyexpr;
use zenoh_protocol::core::{WhatAmI, ZenohIdProto};
use crate::{
family::{
TransportFamily, TransportFamilyCollector, TransportMetric, COLLECT_DISCONNECTED,
COLLECT_PER_KEY, COLLECT_PER_LINK, COLLECT_PER_TRANSPORT,
},
histogram::{Histogram, HistogramBuckets, PAYLOAD_SIZE_BUCKETS},
keys::{HistogramPerKey, StatsKeysRegistry},
labels::{
BytesLabels, LinkLabels, LocalityLabel, NetworkMessageDroppedPayloadLabels,
NetworkMessageLabels, NetworkMessagePayloadLabels, ProtocolLabels, ResourceDeclaredLabels,
ResourceLabel, TransportLabels, TransportMessageLabels,
},
stats::{init_stats, StatsPath},
Rx, StatsDirection, StatsKeysTree, TransportStats, Tx,
};
#[derive(Debug, Clone)]
pub struct StatsRegistry(Arc<StatsRegistryInner>);
impl StatsRegistry {
pub fn new(zid: ZenohIdProto, whatami: WhatAmI, build_version: impl Into<String>) -> Self {
let stats_keys = StatsKeysRegistry::default();
let mut registry = Registry::with_prefix_and_labels(
"zenoh",
[
("local_id".into(), zid.to_string().into()),
("local_whatami".into(), whatami.to_string().into()),
]
.into_iter(),
);
registry.register(
"build",
"Zenoh build version",
Info::new([("version", build_version.into())]),
);
let transports_opened = Gauge::default();
registry.register(
"transports_opened",
"Count of transports currently opened",
transports_opened.clone(),
);
let links_opened = Family::default();
registry.register(
"links_opened",
"Count of transports currently opened",
links_opened.clone(),
);
let resources_declared = Family::default();
registry.register(
"resources_declared",
"Count of resources currently declared",
resources_declared.clone(),
);
let bytes = array::from_fn(|_dir| TransportFamily::default());
let transport_message = array::from_fn(|_dir| TransportFamily::default());
let network_message = array::from_fn(|_dir| TransportFamily::default());
let network_message_payload =
array::from_fn(|_dir| TransportFamily::new_with_constructor(PAYLOAD_SIZE_BUCKETS));
let network_message_dropped_payload =
array::from_fn(|_dir| TransportFamily::new_with_constructor(PAYLOAD_SIZE_BUCKETS));
let network_message_payload_per_key = array::from_fn(|_dir| {
TransportFamily::new_with_constructor((PAYLOAD_SIZE_BUCKETS, stats_keys.clone()))
});
for dir in [Tx, Rx] {
let action = match dir {
Tx => "sent",
Rx => "received",
};
registry.register_collector(Box::new(TransportFamilyCollector {
name: format!("{dir}"),
help: format!("Count of transport messages bytes {action}"),
unit: Some(Unit::Bytes),
family: bytes[dir as usize].clone(),
}));
registry.register_collector(Box::new(TransportFamilyCollector {
name: format!("{dir}_transport_message"),
help: format!("Count of transport messages {action}"),
unit: None,
family: transport_message[dir as usize].clone(),
}));
registry.register_collector(Box::new(TransportFamilyCollector {
name: format!("{dir}_network_message"),
help: format!("Count of network messages {action}"),
unit: None,
family: network_message[dir as usize].clone(),
}));
registry.register_collector(Box::new(TransportFamilyCollector {
name: format!("{dir}_network_message_payload"),
help: format!("Histogram of network messages payload {action}"),
unit: Some(Unit::Bytes),
family: network_message_payload[dir as usize].clone(),
}));
registry.register_collector(Box::new(TransportFamilyCollector {
name: format!("{dir}_network_message_dropped_payload"),
help: format!("Histogram of network messages payload dropped while {action}"),
unit: Some(Unit::Bytes),
family: network_message_dropped_payload[dir as usize].clone(),
}));
registry.register_collector(Box::new(TransportFamilyCollector {
name: format!("{dir}_network_message_payload_per_key"),
help: format!("Histogram of network messages payload {action} per key"),
unit: Some(Unit::Bytes),
family: network_message_payload_per_key[dir as usize].clone(),
}));
}
Self(Arc::new(StatsRegistryInner {
registry: RwLock::new(registry),
transports_opened,
links_opened,
resources_declared,
bytes,
transport_message,
network_message,
network_message_payload,
network_message_dropped_payload,
network_message_payload_per_key,
stats_keys,
}))
}
pub fn inc_resource_declared(&self, resource: ResourceLabel, locality: LocalityLabel) {
let labels = ResourceDeclaredLabels { resource, locality };
self.0.resources_declared.get_or_create(&labels).inc();
}
pub fn dec_resource_declared(&self, resource: ResourceLabel, locality: LocalityLabel) {
let labels = ResourceDeclaredLabels { resource, locality };
self.0.resources_declared.get_or_create(&labels).dec();
}
pub fn encode_metrics(
&self,
writer: &mut impl Write,
per_transport: bool,
per_link: bool,
disconnected: bool,
per_key: bool,
) -> fmt::Result {
let registry = self.0.registry.read().unwrap();
COLLECT_PER_TRANSPORT.set(per_transport);
COLLECT_PER_LINK.set(per_link);
COLLECT_DISCONNECTED.set(disconnected);
COLLECT_PER_KEY.set(per_key);
encode(writer, ®istry)?;
Ok(())
}
pub(crate) fn bytes(
&self,
direction: StatsDirection,
) -> &TransportFamily<BytesLabels, Counter> {
&self.0.bytes[direction as usize]
}
pub(crate) fn transport_message(
&self,
direction: StatsDirection,
) -> &TransportFamily<TransportMessageLabels, Counter> {
&self.0.transport_message[direction as usize]
}
pub(crate) fn network_message(
&self,
direction: StatsDirection,
) -> &TransportFamily<NetworkMessageLabels, Counter> {
&self.0.network_message[direction as usize]
}
pub(crate) fn network_message_payload(
&self,
direction: StatsDirection,
) -> &TransportFamily<NetworkMessagePayloadLabels, Histogram, HistogramBuckets> {
&self.0.network_message_payload[direction as usize]
}
pub(crate) fn network_message_dropped_payload(
&self,
direction: StatsDirection,
) -> &TransportFamily<NetworkMessageDroppedPayloadLabels, Histogram, HistogramBuckets> {
&self.0.network_message_dropped_payload[direction as usize]
}
pub(crate) fn network_message_payload_per_key(
&self,
direction: StatsDirection,
) -> &TransportFamily<
NetworkMessagePayloadLabels,
HistogramPerKey,
(HistogramBuckets, StatsKeysRegistry),
> {
&self.0.network_message_payload_per_key[direction as usize]
}
fn families(&self) -> impl Iterator<Item = (StatsDirection, &dyn TransportFamilyAny)> {
[Tx, Rx].into_iter().flat_map(|dir| {
iter::repeat(dir).zip([
&self.0.bytes[dir as usize] as &dyn TransportFamilyAny,
&self.0.transport_message[dir as usize],
&self.0.network_message[dir as usize],
&self.0.network_message_payload[dir as usize],
&self.0.network_message_dropped_payload[dir as usize],
&self.0.network_message_payload_per_key[dir as usize],
])
})
}
pub fn merge_stats(&self, json: &mut serde_json::Value) {
init_stats(json, &self.0.stats_keys.keys());
for (dir, family) in self.families() {
family.merge_stats(dir, json);
}
}
pub fn unicast_transport_stats(
&self,
zid: ZenohIdProto,
whatami: WhatAmI,
cn: Option<String>,
) -> TransportStats {
self.0.transports_opened.inc();
TransportStats::new(self.clone(), Some(zid), Some(whatami), cn, None)
}
pub fn multicast_transport_stats(&self, group: String) -> TransportStats {
self.0.transports_opened.inc();
TransportStats::new(self.clone(), None, None, None, Some(group))
}
pub(crate) fn add_link(&self, link: &LinkLabels) {
let protocol = ProtocolLabels {
protocol: link.protocol(),
};
self.0.links_opened.get_or_create(&protocol).inc();
}
pub(crate) fn remove_transport(&self, transport: &TransportLabels) {
for (_, family) in self.families() {
family.remove_transport(transport);
}
self.0.transports_opened.dec();
}
pub(crate) fn remove_link(&self, transport: &TransportLabels, link: &LinkLabels) {
for (_, family) in self.families() {
family.remove_link(transport, link);
}
if transport.remote_group.is_none() || transport.remote_zid.is_none() {
let protocol = ProtocolLabels {
protocol: link.protocol(),
};
self.0.links_opened.get_or_create(&protocol).dec();
}
}
pub fn update_keys<'a>(
&self,
tree: &mut StatsKeysTree,
keyexprs: impl IntoIterator<Item = &'a keyexpr>,
) {
self.0.stats_keys.update_keys(tree, keyexprs)
}
}
#[derive(Debug)]
struct StatsRegistryInner {
registry: RwLock<Registry>,
transports_opened: Gauge,
links_opened: Family<ProtocolLabels, Gauge>,
resources_declared: Family<ResourceDeclaredLabels, Gauge>,
bytes: [TransportFamily<BytesLabels, Counter>; StatsDirection::NUM],
transport_message: [TransportFamily<TransportMessageLabels, Counter>; StatsDirection::NUM],
network_message: [TransportFamily<NetworkMessageLabels, Counter>; StatsDirection::NUM],
network_message_payload:
[TransportFamily<NetworkMessagePayloadLabels, Histogram, HistogramBuckets>;
StatsDirection::NUM],
network_message_dropped_payload:
[TransportFamily<NetworkMessageDroppedPayloadLabels, Histogram, HistogramBuckets>;
StatsDirection::NUM],
#[allow(clippy::type_complexity)]
network_message_payload_per_key: [TransportFamily<
NetworkMessagePayloadLabels,
HistogramPerKey,
(HistogramBuckets, StatsKeysRegistry),
>; StatsDirection::NUM],
stats_keys: StatsKeysRegistry,
}
pub(crate) trait TransportFamilyAny {
fn remove_link(&self, transport: &TransportLabels, link: &LinkLabels);
fn remove_transport(&self, transport: &TransportLabels);
fn merge_stats(&self, direction: StatsDirection, json: &mut serde_json::Value);
}
impl<S: StatsPath<M> + Clone + Hash + Eq, M: TransportMetric + Clone, C: MetricConstructor<M>>
TransportFamilyAny for TransportFamily<S, M, C>
{
fn remove_link(&self, transport: &TransportLabels, link: &LinkLabels) {
self.remove_link(transport, link);
}
fn remove_transport(&self, transport: &TransportLabels) {
self.remove_transport(transport);
}
fn merge_stats(&self, direction: StatsDirection, json: &mut serde_json::Value) {
self.merge_stats(direction, json);
}
}