1use std::sync::{Arc, OnceLock};
2
3use zenoh_protocol::{
4 core::{Locator, Priority, WhatAmI, ZenohIdProto},
5 network::{NetworkMessageExt, NetworkMessageRef},
6};
7
8use crate::{
9 histogram::Histogram,
10 keys::HistogramPerKey,
11 labels::{
12 MessageLabel, NetworkMessageDroppedPayloadLabels, NetworkMessagePayloadLabels,
13 ProtocolLabel, SpaceLabel, TransportLabels, SHM_NUM,
14 },
15 LinkStats, ReasonLabel, StatsDirection, StatsKeys, StatsRegistry, Tx,
16};
17
18#[derive(Debug, Clone)]
19pub struct TransportStats(Arc<TransportStatsInner>);
20
21impl TransportStats {
22 pub(crate) fn new(
23 registry: StatsRegistry,
24 zid: Option<ZenohIdProto>,
25 whatami: Option<WhatAmI>,
26 cn: Option<String>,
27 group: Option<String>,
28 ) -> Self {
29 let transport = TransportLabels {
30 remote_zid: zid.map(Into::into),
31 remote_whatami: whatami.map(Into::into),
32 remote_group: group,
33 remote_cn: cn,
34 };
35 let tx_no_link = DropStats::new(
36 registry.clone(),
37 transport.clone(),
38 ReasonLabel::NoLink,
39 None,
40 );
41 Self(Arc::new(TransportStatsInner {
42 registry,
43 transport,
44 network_message_payload: Default::default(),
45 tx_no_link,
46 }))
47 }
48
49 pub(crate) fn registry(&self) -> &StatsRegistry {
50 &self.0.registry
51 }
52
53 pub(crate) fn transport(&self) -> &TransportLabels {
54 &self.0.transport
55 }
56
57 pub fn link_stats(&self, src: &Locator, dst: &Locator) -> LinkStats {
58 let link = (src, dst).into();
59 self.registry().add_link(&link);
60 LinkStats::new(self.clone(), link)
61 }
62
63 pub fn peer_link_stats(
64 &self,
65 peer_zid: ZenohIdProto,
66 peer_whatami: WhatAmI,
67 link_stats: &LinkStats,
68 ) -> LinkStats {
69 assert!(self.transport().remote_group.is_some());
70 let stats = Self::new(
71 self.registry().clone(),
72 Some(peer_zid),
73 Some(peer_whatami),
74 None,
75 self.transport().remote_group.clone(),
76 );
77 LinkStats::new(stats, link_stats.link().clone())
78 }
79
80 pub fn drop_stats(&self, reason: ReasonLabel) -> DropStats {
81 DropStats::new(
82 self.0.registry.clone(),
83 self.0.transport.clone(),
84 reason,
85 None,
86 )
87 }
88
89 #[allow(clippy::too_many_arguments)]
90 pub fn observe_network_message_payload(
91 &self,
92 direction: StatsDirection,
93 message: MessageLabel,
94 priority: Priority,
95 payload_size: usize,
96 space: SpaceLabel,
97 keys: &StatsKeys,
98 shm: bool,
99 ) {
100 let (histogram, histogram_per_key) = self.0.network_message_payload[direction as usize]
101 [priority as usize][message as usize][shm as usize][space as usize]
102 .get_or_init(|| {
103 let labels = NetworkMessagePayloadLabels {
104 space,
105 message,
106 priority: priority.into(),
107 shm,
108 };
109 (
110 self.registry()
111 .network_message_payload(direction)
112 .get_or_create_owned(self.transport(), None, &labels),
113 self.registry()
114 .network_message_payload_per_key(direction)
115 .get_or_create_owned(self.transport(), None, &labels),
116 )
117 });
118 histogram.observe(payload_size as u64);
119 histogram_per_key.observe(keys, payload_size as u64);
120 }
121
122 pub fn tx_observe_no_link(&self, msg: NetworkMessageRef) {
123 self.0
124 .tx_no_link
125 .observe_network_message_dropped_payload(Tx, msg);
126 }
127}
128
129#[derive(Debug)]
130pub struct TransportStatsInner {
131 registry: StatsRegistry,
132 transport: TransportLabels,
133 #[allow(clippy::type_complexity)]
134 network_message_payload: [[[[[OnceLock<(Histogram, HistogramPerKey)>; SpaceLabel::NUM]; SHM_NUM];
135 MessageLabel::NUM]; Priority::NUM]; StatsDirection::NUM],
136 tx_no_link: DropStats,
137}
138
139impl Drop for TransportStatsInner {
140 fn drop(&mut self) {
141 self.registry.remove_transport(&self.transport)
142 }
143}
144
145#[derive(Clone, Debug)]
146pub struct DropStats(Arc<DropStatsInner>);
147
148impl DropStats {
149 pub(crate) fn new(
150 registry: StatsRegistry,
151 transport: TransportLabels,
152 reason: ReasonLabel,
153 protocol: Option<ProtocolLabel>,
154 ) -> Self {
155 Self(Arc::new(DropStatsInner {
156 registry,
157 transport,
158 reason,
159 protocol,
160 histograms: Default::default(),
161 }))
162 }
163
164 pub fn observe_network_message_dropped_payload(
165 &self,
166 direction: StatsDirection,
167 msg: impl NetworkMessageExt,
168 ) {
169 self.0.histograms[direction as usize][msg.priority() as usize]
170 [MessageLabel::from(msg.body()) as usize]
171 .get_or_init(|| {
172 let labels = NetworkMessageDroppedPayloadLabels {
173 message: MessageLabel::from(msg.body()),
174 priority: msg.priority().into(),
175 protocol: self.0.protocol.clone(),
176 reason: self.0.reason,
177 };
178 self.0
179 .registry
180 .network_message_dropped_payload(direction)
181 .get_or_create_owned(&self.0.transport, None, &labels)
182 })
183 .observe(msg.payload_size().unwrap_or_default() as u64)
184 }
185}
186
187#[derive(Debug)]
188struct DropStatsInner {
189 registry: StatsRegistry,
190 transport: TransportLabels,
191 reason: ReasonLabel,
192 protocol: Option<ProtocolLabel>,
193 histograms: [[[OnceLock<Histogram>; MessageLabel::NUM]; Priority::NUM]; StatsDirection::NUM],
194}