1use std::{
2 array,
3 sync::{Arc, OnceLock},
4};
5
6use prometheus_client::metrics::counter::Counter;
7use zenoh_protocol::{core::Priority, network::NetworkMessageExt};
8
9use crate::{
10 labels::{
11 BytesLabels, LinkLabels, MessageLabel, NetworkMessageLabels, ProtocolLabel, ReasonLabel,
12 TransportMessageLabels,
13 },
14 DropStats, StatsDirection, TransportStats, Tx,
15};
16
17#[derive(Debug, Clone)]
18pub struct LinkStats(Arc<LinkStatsInner>);
19
20impl LinkStats {
21 pub(crate) fn new(transport_stats: TransportStats, link: LinkLabels) -> Self {
22 let registry = transport_stats.registry();
23 let transport = transport_stats.transport();
24 let protocol = link.protocol();
25 let bytes = array::from_fn(|dir| {
26 let labels = BytesLabels {
27 protocol: protocol.clone(),
28 };
29 registry
30 .bytes(StatsDirection::from_index(dir))
31 .get_or_create_owned(transport, Some(&link), &labels)
32 });
33 let transport_message = array::from_fn(|dir| {
34 let labels = TransportMessageLabels {
35 protocol: protocol.clone(),
36 };
37 registry
38 .transport_message(StatsDirection::from_index(dir))
39 .get_or_create_owned(transport, Some(&link), &labels)
40 });
41 let tx_congestion = DropStats::new(
42 registry.clone(),
43 transport.clone(),
44 ReasonLabel::Congestion,
45 Some(protocol.clone()),
46 );
47 Self(Arc::new(LinkStatsInner {
48 transport_stats,
49 link,
50 protocol,
51 bytes,
52 transport_message,
53 network_message: Default::default(),
54 tx_congestion,
55 }))
56 }
57
58 pub(crate) fn link(&self) -> &LinkLabels {
59 &self.0.link
60 }
61
62 pub fn inc_bytes(&self, direction: StatsDirection, bytes: u64) {
63 self.0.bytes[direction as usize].inc_by(bytes);
64 }
65
66 pub fn inc_transport_message(&self, direction: StatsDirection, count: u64) {
67 self.0.transport_message[direction as usize].inc_by(count);
68 }
69
70 pub fn inc_network_message(&self, direction: StatsDirection, msg: impl NetworkMessageExt) {
71 let priority = msg.priority();
72 let message = msg.body().into();
73 #[cfg(feature = "shared-memory")]
74 let shm = msg.is_shm();
75 #[cfg(not(feature = "shared-memory"))]
76 let shm = false;
77 self.0.network_message[direction as usize][priority as usize][message as usize]
78 [shm as usize]
79 .get_or_init(|| {
80 let transport = self.0.transport_stats.transport();
81 let labels = NetworkMessageLabels {
82 message,
83 priority: priority.into(),
84 shm,
85 protocol: self.0.protocol.clone(),
86 };
87 self.0
88 .transport_stats
89 .registry()
90 .network_message(direction)
91 .get_or_create_owned(transport, Some(self.link()), &labels)
92 })
93 .inc();
94 }
95
96 pub fn tx_observe_congestion(&self, msg: impl NetworkMessageExt) {
97 self.0
98 .tx_congestion
99 .observe_network_message_dropped_payload(Tx, msg)
100 }
101}
102
103const SHM_NUM: usize = 2;
104
105#[derive(Debug)]
106struct LinkStatsInner {
107 transport_stats: TransportStats,
108 link: LinkLabels,
109 protocol: ProtocolLabel,
110 bytes: [Counter; StatsDirection::NUM],
111 transport_message: [Counter; StatsDirection::NUM],
112 #[allow(clippy::type_complexity)]
113 network_message:
114 [[[[OnceLock<Counter>; SHM_NUM]; MessageLabel::NUM]; Priority::NUM]; StatsDirection::NUM],
115 tx_congestion: DropStats,
116}
117
118impl Drop for LinkStatsInner {
119 fn drop(&mut self) {
120 self.transport_stats
121 .registry()
122 .remove_link(self.transport_stats.transport(), &self.link);
123 }
124}