Skip to main content

zenoh_stats/
registry.rs

1use std::{
2    array, fmt,
3    fmt::Write,
4    hash::Hash,
5    iter,
6    sync::{Arc, RwLock},
7};
8
9use prometheus_client::{
10    encoding::text::encode,
11    metrics::{
12        counter::Counter,
13        family::{Family, MetricConstructor},
14        gauge::Gauge,
15        info::Info,
16    },
17    registry::{Registry, Unit},
18};
19use zenoh_keyexpr::keyexpr;
20use zenoh_protocol::core::{WhatAmI, ZenohIdProto};
21
22use crate::{
23    family::{
24        TransportFamily, TransportFamilyCollector, TransportMetric, COLLECT_DISCONNECTED,
25        COLLECT_PER_KEY, COLLECT_PER_LINK, COLLECT_PER_TRANSPORT,
26    },
27    histogram::{Histogram, HistogramBuckets, PAYLOAD_SIZE_BUCKETS},
28    keys::{HistogramPerKey, StatsKeysRegistry},
29    labels::{
30        BytesLabels, LinkLabels, LocalityLabel, NetworkMessageDroppedPayloadLabels,
31        NetworkMessageLabels, NetworkMessagePayloadLabels, ProtocolLabels, ResourceDeclaredLabels,
32        ResourceLabel, TransportLabels, TransportMessageLabels,
33    },
34    stats::{init_stats, StatsPath},
35    Rx, StatsDirection, StatsKeysTree, TransportStats, Tx,
36};
37
38#[derive(Debug, Clone)]
39pub struct StatsRegistry(Arc<StatsRegistryInner>);
40
41impl StatsRegistry {
42    pub fn new(zid: ZenohIdProto, whatami: WhatAmI, build_version: impl Into<String>) -> Self {
43        let stats_keys = StatsKeysRegistry::default();
44        let mut registry = Registry::with_prefix_and_labels(
45            "zenoh",
46            [
47                ("local_id".into(), zid.to_string().into()),
48                ("local_whatami".into(), whatami.to_string().into()),
49            ]
50            .into_iter(),
51        );
52        registry.register(
53            "build",
54            "Zenoh build version",
55            Info::new([("version", build_version.into())]),
56        );
57        let transports_opened = Gauge::default();
58        registry.register(
59            "transports_opened",
60            "Count of transports currently opened",
61            transports_opened.clone(),
62        );
63        let links_opened = Family::default();
64        registry.register(
65            "links_opened",
66            "Count of transports currently opened",
67            links_opened.clone(),
68        );
69        let resources_declared = Family::default();
70        registry.register(
71            "resources_declared",
72            "Count of resources currently declared",
73            resources_declared.clone(),
74        );
75        let bytes = array::from_fn(|_dir| TransportFamily::default());
76        let transport_message = array::from_fn(|_dir| TransportFamily::default());
77        let network_message = array::from_fn(|_dir| TransportFamily::default());
78        let network_message_payload =
79            array::from_fn(|_dir| TransportFamily::new_with_constructor(PAYLOAD_SIZE_BUCKETS));
80        let network_message_dropped_payload =
81            array::from_fn(|_dir| TransportFamily::new_with_constructor(PAYLOAD_SIZE_BUCKETS));
82        let network_message_payload_per_key = array::from_fn(|_dir| {
83            TransportFamily::new_with_constructor((PAYLOAD_SIZE_BUCKETS, stats_keys.clone()))
84        });
85        for dir in [Tx, Rx] {
86            let action = match dir {
87                Tx => "sent",
88                Rx => "received",
89            };
90            registry.register_collector(Box::new(TransportFamilyCollector {
91                name: format!("{dir}"),
92                help: format!("Count of transport messages bytes {action}"),
93                unit: Some(Unit::Bytes),
94                family: bytes[dir as usize].clone(),
95            }));
96            registry.register_collector(Box::new(TransportFamilyCollector {
97                name: format!("{dir}_transport_message"),
98                help: format!("Count of transport messages {action}"),
99                unit: None,
100                family: transport_message[dir as usize].clone(),
101            }));
102            registry.register_collector(Box::new(TransportFamilyCollector {
103                name: format!("{dir}_network_message"),
104                help: format!("Count of network messages {action}"),
105                unit: None,
106                family: network_message[dir as usize].clone(),
107            }));
108            registry.register_collector(Box::new(TransportFamilyCollector {
109                name: format!("{dir}_network_message_payload"),
110                help: format!("Histogram of network messages payload {action}"),
111                unit: Some(Unit::Bytes),
112                family: network_message_payload[dir as usize].clone(),
113            }));
114            registry.register_collector(Box::new(TransportFamilyCollector {
115                name: format!("{dir}_network_message_dropped_payload"),
116                help: format!("Histogram of network messages payload dropped while {action}"),
117                unit: Some(Unit::Bytes),
118                family: network_message_dropped_payload[dir as usize].clone(),
119            }));
120            registry.register_collector(Box::new(TransportFamilyCollector {
121                name: format!("{dir}_network_message_payload_per_key"),
122                help: format!("Histogram of network messages payload {action} per key"),
123                unit: Some(Unit::Bytes),
124                family: network_message_payload_per_key[dir as usize].clone(),
125            }));
126        }
127        Self(Arc::new(StatsRegistryInner {
128            registry: RwLock::new(registry),
129            transports_opened,
130            links_opened,
131            resources_declared,
132            bytes,
133            transport_message,
134            network_message,
135            network_message_payload,
136            network_message_dropped_payload,
137            network_message_payload_per_key,
138            stats_keys,
139        }))
140    }
141
142    pub fn inc_resource_declared(&self, resource: ResourceLabel, locality: LocalityLabel) {
143        let labels = ResourceDeclaredLabels { resource, locality };
144        self.0.resources_declared.get_or_create(&labels).inc();
145    }
146
147    pub fn dec_resource_declared(&self, resource: ResourceLabel, locality: LocalityLabel) {
148        let labels = ResourceDeclaredLabels { resource, locality };
149        self.0.resources_declared.get_or_create(&labels).dec();
150    }
151
152    pub fn encode_metrics(
153        &self,
154        writer: &mut impl Write,
155        per_transport: bool,
156        per_link: bool,
157        disconnected: bool,
158        per_key: bool,
159    ) -> fmt::Result {
160        let registry = self.0.registry.read().unwrap();
161        COLLECT_PER_TRANSPORT.set(per_transport);
162        COLLECT_PER_LINK.set(per_link);
163        COLLECT_DISCONNECTED.set(disconnected);
164        COLLECT_PER_KEY.set(per_key);
165        encode(writer, &registry)?;
166        Ok(())
167    }
168
169    pub(crate) fn bytes(
170        &self,
171        direction: StatsDirection,
172    ) -> &TransportFamily<BytesLabels, Counter> {
173        &self.0.bytes[direction as usize]
174    }
175
176    pub(crate) fn transport_message(
177        &self,
178        direction: StatsDirection,
179    ) -> &TransportFamily<TransportMessageLabels, Counter> {
180        &self.0.transport_message[direction as usize]
181    }
182
183    pub(crate) fn network_message(
184        &self,
185        direction: StatsDirection,
186    ) -> &TransportFamily<NetworkMessageLabels, Counter> {
187        &self.0.network_message[direction as usize]
188    }
189
190    pub(crate) fn network_message_payload(
191        &self,
192        direction: StatsDirection,
193    ) -> &TransportFamily<NetworkMessagePayloadLabels, Histogram, HistogramBuckets> {
194        &self.0.network_message_payload[direction as usize]
195    }
196
197    pub(crate) fn network_message_dropped_payload(
198        &self,
199        direction: StatsDirection,
200    ) -> &TransportFamily<NetworkMessageDroppedPayloadLabels, Histogram, HistogramBuckets> {
201        &self.0.network_message_dropped_payload[direction as usize]
202    }
203
204    pub(crate) fn network_message_payload_per_key(
205        &self,
206        direction: StatsDirection,
207    ) -> &TransportFamily<
208        NetworkMessagePayloadLabels,
209        HistogramPerKey,
210        (HistogramBuckets, StatsKeysRegistry),
211    > {
212        &self.0.network_message_payload_per_key[direction as usize]
213    }
214
215    fn families(&self) -> impl Iterator<Item = (StatsDirection, &dyn TransportFamilyAny)> {
216        [Tx, Rx].into_iter().flat_map(|dir| {
217            iter::repeat(dir).zip([
218                &self.0.bytes[dir as usize] as &dyn TransportFamilyAny,
219                &self.0.transport_message[dir as usize],
220                &self.0.network_message[dir as usize],
221                &self.0.network_message_payload[dir as usize],
222                &self.0.network_message_dropped_payload[dir as usize],
223                &self.0.network_message_payload_per_key[dir as usize],
224            ])
225        })
226    }
227
228    pub fn merge_stats(&self, json: &mut serde_json::Value) {
229        init_stats(json, &self.0.stats_keys.keys());
230        for (dir, family) in self.families() {
231            family.merge_stats(dir, json);
232        }
233    }
234
235    pub fn unicast_transport_stats(
236        &self,
237        zid: ZenohIdProto,
238        whatami: WhatAmI,
239        cn: Option<String>,
240    ) -> TransportStats {
241        self.0.transports_opened.inc();
242        TransportStats::new(self.clone(), Some(zid), Some(whatami), cn, None)
243    }
244
245    pub fn multicast_transport_stats(&self, group: String) -> TransportStats {
246        self.0.transports_opened.inc();
247        TransportStats::new(self.clone(), None, None, None, Some(group))
248    }
249
250    pub(crate) fn add_link(&self, link: &LinkLabels) {
251        let protocol = ProtocolLabels {
252            protocol: link.protocol(),
253        };
254        self.0.links_opened.get_or_create(&protocol).inc();
255    }
256
257    pub(crate) fn remove_transport(&self, transport: &TransportLabels) {
258        for (_, family) in self.families() {
259            family.remove_transport(transport);
260        }
261        self.0.transports_opened.dec();
262    }
263
264    pub(crate) fn remove_link(&self, transport: &TransportLabels, link: &LinkLabels) {
265        for (_, family) in self.families() {
266            family.remove_link(transport, link);
267        }
268        if transport.remote_group.is_none() || transport.remote_zid.is_none() {
269            let protocol = ProtocolLabels {
270                protocol: link.protocol(),
271            };
272            self.0.links_opened.get_or_create(&protocol).dec();
273        }
274    }
275
276    pub fn update_keys<'a>(
277        &self,
278        tree: &mut StatsKeysTree,
279        keyexprs: impl IntoIterator<Item = &'a keyexpr>,
280    ) {
281        self.0.stats_keys.update_keys(tree, keyexprs)
282    }
283}
284
285#[derive(Debug)]
286struct StatsRegistryInner {
287    registry: RwLock<Registry>,
288    transports_opened: Gauge,
289    links_opened: Family<ProtocolLabels, Gauge>,
290    resources_declared: Family<ResourceDeclaredLabels, Gauge>,
291    bytes: [TransportFamily<BytesLabels, Counter>; StatsDirection::NUM],
292    transport_message: [TransportFamily<TransportMessageLabels, Counter>; StatsDirection::NUM],
293    network_message: [TransportFamily<NetworkMessageLabels, Counter>; StatsDirection::NUM],
294    network_message_payload:
295        [TransportFamily<NetworkMessagePayloadLabels, Histogram, HistogramBuckets>;
296            StatsDirection::NUM],
297    network_message_dropped_payload:
298        [TransportFamily<NetworkMessageDroppedPayloadLabels, Histogram, HistogramBuckets>;
299            StatsDirection::NUM],
300    #[allow(clippy::type_complexity)]
301    network_message_payload_per_key: [TransportFamily<
302        NetworkMessagePayloadLabels,
303        HistogramPerKey,
304        (HistogramBuckets, StatsKeysRegistry),
305    >; StatsDirection::NUM],
306    stats_keys: StatsKeysRegistry,
307}
308
309pub(crate) trait TransportFamilyAny {
310    fn remove_link(&self, transport: &TransportLabels, link: &LinkLabels);
311    fn remove_transport(&self, transport: &TransportLabels);
312    fn merge_stats(&self, direction: StatsDirection, json: &mut serde_json::Value);
313}
314
315impl<S: StatsPath<M> + Clone + Hash + Eq, M: TransportMetric + Clone, C: MetricConstructor<M>>
316    TransportFamilyAny for TransportFamily<S, M, C>
317{
318    fn remove_link(&self, transport: &TransportLabels, link: &LinkLabels) {
319        self.remove_link(transport, link);
320    }
321
322    fn remove_transport(&self, transport: &TransportLabels) {
323        self.remove_transport(transport);
324    }
325
326    fn merge_stats(&self, direction: StatsDirection, json: &mut serde_json::Value) {
327        self.merge_stats(direction, json);
328    }
329}