Skip to main content

zenoh_stats/
link.rs

1use std::{
2    array,
3    sync::{Arc, OnceLock},
4};
5
6use prometheus_client::metrics::counter::Counter;
7use zenoh_protocol::{core::Priority, network::NetworkMessageExt};
8
9use crate::{
10    labels::{
11        BytesLabels, LinkLabels, MessageLabel, NetworkMessageLabels, ProtocolLabel, ReasonLabel,
12        TransportMessageLabels,
13    },
14    DropStats, StatsDirection, TransportStats, Tx,
15};
16
17#[derive(Debug, Clone)]
18pub struct LinkStats(Arc<LinkStatsInner>);
19
20impl LinkStats {
21    pub(crate) fn new(transport_stats: TransportStats, link: LinkLabels) -> Self {
22        let registry = transport_stats.registry();
23        let transport = transport_stats.transport();
24        let protocol = link.protocol();
25        let bytes = array::from_fn(|dir| {
26            let labels = BytesLabels {
27                protocol: protocol.clone(),
28            };
29            registry
30                .bytes(StatsDirection::from_index(dir))
31                .get_or_create_owned(transport, Some(&link), &labels)
32        });
33        let transport_message = array::from_fn(|dir| {
34            let labels = TransportMessageLabels {
35                protocol: protocol.clone(),
36            };
37            registry
38                .transport_message(StatsDirection::from_index(dir))
39                .get_or_create_owned(transport, Some(&link), &labels)
40        });
41        let tx_congestion = DropStats::new(
42            registry.clone(),
43            transport.clone(),
44            ReasonLabel::Congestion,
45            Some(protocol.clone()),
46        );
47        Self(Arc::new(LinkStatsInner {
48            transport_stats,
49            link,
50            protocol,
51            bytes,
52            transport_message,
53            network_message: Default::default(),
54            tx_congestion,
55        }))
56    }
57
58    pub(crate) fn link(&self) -> &LinkLabels {
59        &self.0.link
60    }
61
62    pub fn inc_bytes(&self, direction: StatsDirection, bytes: u64) {
63        self.0.bytes[direction as usize].inc_by(bytes);
64    }
65
66    pub fn inc_transport_message(&self, direction: StatsDirection, count: u64) {
67        self.0.transport_message[direction as usize].inc_by(count);
68    }
69
70    pub fn inc_network_message(&self, direction: StatsDirection, msg: impl NetworkMessageExt) {
71        let priority = msg.priority();
72        let message = msg.body().into();
73        #[cfg(feature = "shared-memory")]
74        let shm = msg.is_shm();
75        #[cfg(not(feature = "shared-memory"))]
76        let shm = false;
77        self.0.network_message[direction as usize][priority as usize][message as usize]
78            [shm as usize]
79            .get_or_init(|| {
80                let transport = self.0.transport_stats.transport();
81                let labels = NetworkMessageLabels {
82                    message,
83                    priority: priority.into(),
84                    shm,
85                    protocol: self.0.protocol.clone(),
86                };
87                self.0
88                    .transport_stats
89                    .registry()
90                    .network_message(direction)
91                    .get_or_create_owned(transport, Some(self.link()), &labels)
92            })
93            .inc();
94    }
95
96    pub fn tx_observe_congestion(&self, msg: impl NetworkMessageExt) {
97        self.0
98            .tx_congestion
99            .observe_network_message_dropped_payload(Tx, msg)
100    }
101}
102
103const SHM_NUM: usize = 2;
104
105#[derive(Debug)]
106struct LinkStatsInner {
107    transport_stats: TransportStats,
108    link: LinkLabels,
109    protocol: ProtocolLabel,
110    bytes: [Counter; StatsDirection::NUM],
111    transport_message: [Counter; StatsDirection::NUM],
112    #[allow(clippy::type_complexity)]
113    network_message:
114        [[[[OnceLock<Counter>; SHM_NUM]; MessageLabel::NUM]; Priority::NUM]; StatsDirection::NUM],
115    tx_congestion: DropStats,
116}
117
118impl Drop for LinkStatsInner {
119    fn drop(&mut self) {
120        self.transport_stats
121            .registry()
122            .remove_link(self.transport_stats.transport(), &self.link);
123    }
124}