Skip to main content

zenoh_stats/
transport.rs

1use std::sync::{Arc, OnceLock};
2
3use zenoh_protocol::{
4    core::{Locator, Priority, WhatAmI, ZenohIdProto},
5    network::{NetworkMessageExt, NetworkMessageRef},
6};
7
8use crate::{
9    histogram::Histogram,
10    keys::HistogramPerKey,
11    labels::{
12        MessageLabel, NetworkMessageDroppedPayloadLabels, NetworkMessagePayloadLabels,
13        ProtocolLabel, SpaceLabel, TransportLabels, SHM_NUM,
14    },
15    LinkStats, ReasonLabel, StatsDirection, StatsKeys, StatsRegistry, Tx,
16};
17
18#[derive(Debug, Clone)]
19pub struct TransportStats(Arc<TransportStatsInner>);
20
21impl TransportStats {
22    pub(crate) fn new(
23        registry: StatsRegistry,
24        zid: Option<ZenohIdProto>,
25        whatami: Option<WhatAmI>,
26        cn: Option<String>,
27        group: Option<String>,
28    ) -> Self {
29        let transport = TransportLabels {
30            remote_zid: zid.map(Into::into),
31            remote_whatami: whatami.map(Into::into),
32            remote_group: group,
33            remote_cn: cn,
34        };
35        let tx_no_link = DropStats::new(
36            registry.clone(),
37            transport.clone(),
38            ReasonLabel::NoLink,
39            None,
40        );
41        Self(Arc::new(TransportStatsInner {
42            registry,
43            transport,
44            network_message_payload: Default::default(),
45            tx_no_link,
46        }))
47    }
48
49    pub(crate) fn registry(&self) -> &StatsRegistry {
50        &self.0.registry
51    }
52
53    pub(crate) fn transport(&self) -> &TransportLabels {
54        &self.0.transport
55    }
56
57    pub fn link_stats(&self, src: &Locator, dst: &Locator) -> LinkStats {
58        let link = (src, dst).into();
59        self.registry().add_link(&link);
60        LinkStats::new(self.clone(), link)
61    }
62
63    pub fn peer_link_stats(
64        &self,
65        peer_zid: ZenohIdProto,
66        peer_whatami: WhatAmI,
67        link_stats: &LinkStats,
68    ) -> LinkStats {
69        assert!(self.transport().remote_group.is_some());
70        let stats = Self::new(
71            self.registry().clone(),
72            Some(peer_zid),
73            Some(peer_whatami),
74            None,
75            self.transport().remote_group.clone(),
76        );
77        LinkStats::new(stats, link_stats.link().clone())
78    }
79
80    pub fn drop_stats(&self, reason: ReasonLabel) -> DropStats {
81        DropStats::new(
82            self.0.registry.clone(),
83            self.0.transport.clone(),
84            reason,
85            None,
86        )
87    }
88
89    #[allow(clippy::too_many_arguments)]
90    pub fn observe_network_message_payload(
91        &self,
92        direction: StatsDirection,
93        message: MessageLabel,
94        priority: Priority,
95        payload_size: usize,
96        space: SpaceLabel,
97        keys: &StatsKeys,
98        shm: bool,
99    ) {
100        let (histogram, histogram_per_key) = self.0.network_message_payload[direction as usize]
101            [priority as usize][message as usize][shm as usize][space as usize]
102            .get_or_init(|| {
103                let labels = NetworkMessagePayloadLabels {
104                    space,
105                    message,
106                    priority: priority.into(),
107                    shm,
108                };
109                (
110                    self.registry()
111                        .network_message_payload(direction)
112                        .get_or_create_owned(self.transport(), None, &labels),
113                    self.registry()
114                        .network_message_payload_per_key(direction)
115                        .get_or_create_owned(self.transport(), None, &labels),
116                )
117            });
118        histogram.observe(payload_size as u64);
119        histogram_per_key.observe(keys, payload_size as u64);
120    }
121
122    pub fn tx_observe_no_link(&self, msg: NetworkMessageRef) {
123        self.0
124            .tx_no_link
125            .observe_network_message_dropped_payload(Tx, msg);
126    }
127}
128
129#[derive(Debug)]
130pub struct TransportStatsInner {
131    registry: StatsRegistry,
132    transport: TransportLabels,
133    #[allow(clippy::type_complexity)]
134    network_message_payload: [[[[[OnceLock<(Histogram, HistogramPerKey)>; SpaceLabel::NUM]; SHM_NUM];
135        MessageLabel::NUM]; Priority::NUM]; StatsDirection::NUM],
136    tx_no_link: DropStats,
137}
138
139impl Drop for TransportStatsInner {
140    fn drop(&mut self) {
141        self.registry.remove_transport(&self.transport)
142    }
143}
144
145#[derive(Clone, Debug)]
146pub struct DropStats(Arc<DropStatsInner>);
147
148impl DropStats {
149    pub(crate) fn new(
150        registry: StatsRegistry,
151        transport: TransportLabels,
152        reason: ReasonLabel,
153        protocol: Option<ProtocolLabel>,
154    ) -> Self {
155        Self(Arc::new(DropStatsInner {
156            registry,
157            transport,
158            reason,
159            protocol,
160            histograms: Default::default(),
161        }))
162    }
163
164    pub fn observe_network_message_dropped_payload(
165        &self,
166        direction: StatsDirection,
167        msg: impl NetworkMessageExt,
168    ) {
169        self.0.histograms[direction as usize][msg.priority() as usize]
170            [MessageLabel::from(msg.body()) as usize]
171            .get_or_init(|| {
172                let labels = NetworkMessageDroppedPayloadLabels {
173                    message: MessageLabel::from(msg.body()),
174                    priority: msg.priority().into(),
175                    protocol: self.0.protocol.clone(),
176                    reason: self.0.reason,
177                };
178                self.0
179                    .registry
180                    .network_message_dropped_payload(direction)
181                    .get_or_create_owned(&self.0.transport, None, &labels)
182            })
183            .observe(msg.payload_size().unwrap_or_default() as u64)
184    }
185}
186
187#[derive(Debug)]
188struct DropStatsInner {
189    registry: StatsRegistry,
190    transport: TransportLabels,
191    reason: ReasonLabel,
192    protocol: Option<ProtocolLabel>,
193    histograms: [[[OnceLock<Histogram>; MessageLabel::NUM]; Priority::NUM]; StatsDirection::NUM],
194}