1use std::{
2 array, fmt,
3 fmt::Write,
4 hash::Hash,
5 iter,
6 sync::{Arc, RwLock},
7};
8
9use prometheus_client::{
10 encoding::text::encode,
11 metrics::{
12 counter::Counter,
13 family::{Family, MetricConstructor},
14 gauge::Gauge,
15 info::Info,
16 },
17 registry::{Registry, Unit},
18};
19use zenoh_keyexpr::keyexpr;
20use zenoh_protocol::core::{WhatAmI, ZenohIdProto};
21
22use crate::{
23 family::{
24 TransportFamily, TransportFamilyCollector, TransportMetric, COLLECT_DISCONNECTED,
25 COLLECT_PER_KEY, COLLECT_PER_LINK, COLLECT_PER_TRANSPORT,
26 },
27 histogram::{Histogram, HistogramBuckets, PAYLOAD_SIZE_BUCKETS},
28 keys::{HistogramPerKey, StatsKeysRegistry},
29 labels::{
30 BytesLabels, LinkLabels, LocalityLabel, NetworkMessageDroppedPayloadLabels,
31 NetworkMessageLabels, NetworkMessagePayloadLabels, ProtocolLabels, ResourceDeclaredLabels,
32 ResourceLabel, TransportLabels, TransportMessageLabels,
33 },
34 stats::{init_stats, StatsPath},
35 Rx, StatsDirection, StatsKeysTree, TransportStats, Tx,
36};
37
38#[derive(Debug, Clone)]
39pub struct StatsRegistry(Arc<StatsRegistryInner>);
40
41impl StatsRegistry {
42 pub fn new(zid: ZenohIdProto, whatami: WhatAmI, build_version: impl Into<String>) -> Self {
43 let stats_keys = StatsKeysRegistry::default();
44 let mut registry = Registry::with_prefix_and_labels(
45 "zenoh",
46 [
47 ("local_id".into(), zid.to_string().into()),
48 ("local_whatami".into(), whatami.to_string().into()),
49 ]
50 .into_iter(),
51 );
52 registry.register(
53 "build",
54 "Zenoh build version",
55 Info::new([("version", build_version.into())]),
56 );
57 let transports_opened = Gauge::default();
58 registry.register(
59 "transports_opened",
60 "Count of transports currently opened",
61 transports_opened.clone(),
62 );
63 let links_opened = Family::default();
64 registry.register(
65 "links_opened",
66 "Count of transports currently opened",
67 links_opened.clone(),
68 );
69 let resources_declared = Family::default();
70 registry.register(
71 "resources_declared",
72 "Count of resources currently declared",
73 resources_declared.clone(),
74 );
75 let bytes = array::from_fn(|_dir| TransportFamily::default());
76 let transport_message = array::from_fn(|_dir| TransportFamily::default());
77 let network_message = array::from_fn(|_dir| TransportFamily::default());
78 let network_message_payload =
79 array::from_fn(|_dir| TransportFamily::new_with_constructor(PAYLOAD_SIZE_BUCKETS));
80 let network_message_dropped_payload =
81 array::from_fn(|_dir| TransportFamily::new_with_constructor(PAYLOAD_SIZE_BUCKETS));
82 let network_message_payload_per_key = array::from_fn(|_dir| {
83 TransportFamily::new_with_constructor((PAYLOAD_SIZE_BUCKETS, stats_keys.clone()))
84 });
85 for dir in [Tx, Rx] {
86 let action = match dir {
87 Tx => "sent",
88 Rx => "received",
89 };
90 registry.register_collector(Box::new(TransportFamilyCollector {
91 name: format!("{dir}"),
92 help: format!("Count of transport messages bytes {action}"),
93 unit: Some(Unit::Bytes),
94 family: bytes[dir as usize].clone(),
95 }));
96 registry.register_collector(Box::new(TransportFamilyCollector {
97 name: format!("{dir}_transport_message"),
98 help: format!("Count of transport messages {action}"),
99 unit: None,
100 family: transport_message[dir as usize].clone(),
101 }));
102 registry.register_collector(Box::new(TransportFamilyCollector {
103 name: format!("{dir}_network_message"),
104 help: format!("Count of network messages {action}"),
105 unit: None,
106 family: network_message[dir as usize].clone(),
107 }));
108 registry.register_collector(Box::new(TransportFamilyCollector {
109 name: format!("{dir}_network_message_payload"),
110 help: format!("Histogram of network messages payload {action}"),
111 unit: Some(Unit::Bytes),
112 family: network_message_payload[dir as usize].clone(),
113 }));
114 registry.register_collector(Box::new(TransportFamilyCollector {
115 name: format!("{dir}_network_message_dropped_payload"),
116 help: format!("Histogram of network messages payload dropped while {action}"),
117 unit: Some(Unit::Bytes),
118 family: network_message_dropped_payload[dir as usize].clone(),
119 }));
120 registry.register_collector(Box::new(TransportFamilyCollector {
121 name: format!("{dir}_network_message_payload_per_key"),
122 help: format!("Histogram of network messages payload {action} per key"),
123 unit: Some(Unit::Bytes),
124 family: network_message_payload_per_key[dir as usize].clone(),
125 }));
126 }
127 Self(Arc::new(StatsRegistryInner {
128 registry: RwLock::new(registry),
129 transports_opened,
130 links_opened,
131 resources_declared,
132 bytes,
133 transport_message,
134 network_message,
135 network_message_payload,
136 network_message_dropped_payload,
137 network_message_payload_per_key,
138 stats_keys,
139 }))
140 }
141
142 pub fn inc_resource_declared(&self, resource: ResourceLabel, locality: LocalityLabel) {
143 let labels = ResourceDeclaredLabels { resource, locality };
144 self.0.resources_declared.get_or_create(&labels).inc();
145 }
146
147 pub fn dec_resource_declared(&self, resource: ResourceLabel, locality: LocalityLabel) {
148 let labels = ResourceDeclaredLabels { resource, locality };
149 self.0.resources_declared.get_or_create(&labels).dec();
150 }
151
152 pub fn encode_metrics(
153 &self,
154 writer: &mut impl Write,
155 per_transport: bool,
156 per_link: bool,
157 disconnected: bool,
158 per_key: bool,
159 ) -> fmt::Result {
160 let registry = self.0.registry.read().unwrap();
161 COLLECT_PER_TRANSPORT.set(per_transport);
162 COLLECT_PER_LINK.set(per_link);
163 COLLECT_DISCONNECTED.set(disconnected);
164 COLLECT_PER_KEY.set(per_key);
165 encode(writer, ®istry)?;
166 Ok(())
167 }
168
169 pub(crate) fn bytes(
170 &self,
171 direction: StatsDirection,
172 ) -> &TransportFamily<BytesLabels, Counter> {
173 &self.0.bytes[direction as usize]
174 }
175
176 pub(crate) fn transport_message(
177 &self,
178 direction: StatsDirection,
179 ) -> &TransportFamily<TransportMessageLabels, Counter> {
180 &self.0.transport_message[direction as usize]
181 }
182
183 pub(crate) fn network_message(
184 &self,
185 direction: StatsDirection,
186 ) -> &TransportFamily<NetworkMessageLabels, Counter> {
187 &self.0.network_message[direction as usize]
188 }
189
190 pub(crate) fn network_message_payload(
191 &self,
192 direction: StatsDirection,
193 ) -> &TransportFamily<NetworkMessagePayloadLabels, Histogram, HistogramBuckets> {
194 &self.0.network_message_payload[direction as usize]
195 }
196
197 pub(crate) fn network_message_dropped_payload(
198 &self,
199 direction: StatsDirection,
200 ) -> &TransportFamily<NetworkMessageDroppedPayloadLabels, Histogram, HistogramBuckets> {
201 &self.0.network_message_dropped_payload[direction as usize]
202 }
203
204 pub(crate) fn network_message_payload_per_key(
205 &self,
206 direction: StatsDirection,
207 ) -> &TransportFamily<
208 NetworkMessagePayloadLabels,
209 HistogramPerKey,
210 (HistogramBuckets, StatsKeysRegistry),
211 > {
212 &self.0.network_message_payload_per_key[direction as usize]
213 }
214
215 fn families(&self) -> impl Iterator<Item = (StatsDirection, &dyn TransportFamilyAny)> {
216 [Tx, Rx].into_iter().flat_map(|dir| {
217 iter::repeat(dir).zip([
218 &self.0.bytes[dir as usize] as &dyn TransportFamilyAny,
219 &self.0.transport_message[dir as usize],
220 &self.0.network_message[dir as usize],
221 &self.0.network_message_payload[dir as usize],
222 &self.0.network_message_dropped_payload[dir as usize],
223 &self.0.network_message_payload_per_key[dir as usize],
224 ])
225 })
226 }
227
228 pub fn merge_stats(&self, json: &mut serde_json::Value) {
229 init_stats(json, &self.0.stats_keys.keys());
230 for (dir, family) in self.families() {
231 family.merge_stats(dir, json);
232 }
233 }
234
235 pub fn unicast_transport_stats(
236 &self,
237 zid: ZenohIdProto,
238 whatami: WhatAmI,
239 cn: Option<String>,
240 ) -> TransportStats {
241 self.0.transports_opened.inc();
242 TransportStats::new(self.clone(), Some(zid), Some(whatami), cn, None)
243 }
244
245 pub fn multicast_transport_stats(&self, group: String) -> TransportStats {
246 self.0.transports_opened.inc();
247 TransportStats::new(self.clone(), None, None, None, Some(group))
248 }
249
250 pub(crate) fn add_link(&self, link: &LinkLabels) {
251 let protocol = ProtocolLabels {
252 protocol: link.protocol(),
253 };
254 self.0.links_opened.get_or_create(&protocol).inc();
255 }
256
257 pub(crate) fn remove_transport(&self, transport: &TransportLabels) {
258 for (_, family) in self.families() {
259 family.remove_transport(transport);
260 }
261 self.0.transports_opened.dec();
262 }
263
264 pub(crate) fn remove_link(&self, transport: &TransportLabels, link: &LinkLabels) {
265 for (_, family) in self.families() {
266 family.remove_link(transport, link);
267 }
268 if transport.remote_group.is_none() || transport.remote_zid.is_none() {
269 let protocol = ProtocolLabels {
270 protocol: link.protocol(),
271 };
272 self.0.links_opened.get_or_create(&protocol).dec();
273 }
274 }
275
276 pub fn update_keys<'a>(
277 &self,
278 tree: &mut StatsKeysTree,
279 keyexprs: impl IntoIterator<Item = &'a keyexpr>,
280 ) {
281 self.0.stats_keys.update_keys(tree, keyexprs)
282 }
283}
284
285#[derive(Debug)]
286struct StatsRegistryInner {
287 registry: RwLock<Registry>,
288 transports_opened: Gauge,
289 links_opened: Family<ProtocolLabels, Gauge>,
290 resources_declared: Family<ResourceDeclaredLabels, Gauge>,
291 bytes: [TransportFamily<BytesLabels, Counter>; StatsDirection::NUM],
292 transport_message: [TransportFamily<TransportMessageLabels, Counter>; StatsDirection::NUM],
293 network_message: [TransportFamily<NetworkMessageLabels, Counter>; StatsDirection::NUM],
294 network_message_payload:
295 [TransportFamily<NetworkMessagePayloadLabels, Histogram, HistogramBuckets>;
296 StatsDirection::NUM],
297 network_message_dropped_payload:
298 [TransportFamily<NetworkMessageDroppedPayloadLabels, Histogram, HistogramBuckets>;
299 StatsDirection::NUM],
300 #[allow(clippy::type_complexity)]
301 network_message_payload_per_key: [TransportFamily<
302 NetworkMessagePayloadLabels,
303 HistogramPerKey,
304 (HistogramBuckets, StatsKeysRegistry),
305 >; StatsDirection::NUM],
306 stats_keys: StatsKeysRegistry,
307}
308
309pub(crate) trait TransportFamilyAny {
310 fn remove_link(&self, transport: &TransportLabels, link: &LinkLabels);
311 fn remove_transport(&self, transport: &TransportLabels);
312 fn merge_stats(&self, direction: StatsDirection, json: &mut serde_json::Value);
313}
314
315impl<S: StatsPath<M> + Clone + Hash + Eq, M: TransportMetric + Clone, C: MetricConstructor<M>>
316 TransportFamilyAny for TransportFamily<S, M, C>
317{
318 fn remove_link(&self, transport: &TransportLabels, link: &LinkLabels) {
319 self.remove_link(transport, link);
320 }
321
322 fn remove_transport(&self, transport: &TransportLabels) {
323 self.remove_transport(transport);
324 }
325
326 fn merge_stats(&self, direction: StatsDirection, json: &mut serde_json::Value) {
327 self.merge_stats(direction, json);
328 }
329}