turn_server/statistics.rs
1use std::sync::{
2 Arc,
3 atomic::{AtomicUsize, Ordering},
4};
5
6use ahash::AHashMap;
7use parking_lot::RwLock;
8
9use crate::{stun::Transport, turn::SessionAddr};
10
11/// [issue](https://github.com/mycrl/turn-rs/issues/101)
12///
13/// Integrated Prometheus Metrics Exporter
14pub mod prometheus {
15 use std::sync::LazyLock;
16
17 use anyhow::Result;
18 use prometheus::{Encoder, IntCounter, IntGauge, TextEncoder, register_int_counter, register_int_gauge};
19
20 use super::{Counts, Number, Stats};
21 use crate::stun::Transport;
22
23 // The `register_int_counter` macro would be too long if written out in full,
24 // with too many line breaks after formatting, and this is wrapped directly into
25 // a macro again.
26 macro_rules! counter {
27 ($prefix:expr, $operation:expr, $dst:expr) => {
28 register_int_counter!(
29 format!("{}_{}_{}", $prefix, $operation, $dst),
30 format!("The {} amount of {} {}", $prefix, $dst, $operation)
31 )
32 };
33 }
34
35 pub static METRICS: LazyLock<Metrics> = LazyLock::new(|| Metrics::default());
36
37 /// # Example
38 ///
39 /// ```
40 /// use prometheus::register_int_counter;
41 /// use turn_server::statistics::{Number, prometheus::*};
42 ///
43 /// let count = register_int_counter!("test", "test").unwrap();
44 ///
45 /// count.add(1);
46 /// assert_eq!(count.get(), 1);
47 ///
48 /// count.add(1);
49 /// assert_eq!(count.get(), 2);
50 /// ```
51 impl Number for IntCounter {
52 fn add(&self, value: usize) {
53 self.inc_by(value as u64);
54 }
55
56 fn get(&self) -> usize {
57 self.get() as usize
58 }
59 }
60
61 impl Counts<IntCounter> {
62 fn new(prefix: &str) -> Result<Self> {
63 Ok(Self {
64 received_bytes: counter!(prefix, "received", "bytes")?,
65 send_bytes: counter!(prefix, "sent", "bytes")?,
66 received_pkts: counter!(prefix, "received", "packets")?,
67 send_pkts: counter!(prefix, "sent", "packets")?,
68 error_pkts: counter!(prefix, "error", "packets")?,
69 })
70 }
71 }
72
73 /// Summarized metrics data for Global/TCP/UDP.
74 pub struct Metrics {
75 pub allocated: IntGauge,
76 pub total: Counts<IntCounter>,
77 pub tcp: Counts<IntCounter>,
78 pub udp: Counts<IntCounter>,
79 }
80
81 impl Default for Metrics {
82 fn default() -> Self {
83 Self::new().expect("Unable to initialize Prometheus metrics data!")
84 }
85 }
86
87 impl Metrics {
88 pub fn new() -> Result<Self> {
89 Ok(Self {
90 total: Counts::new("total")?,
91 tcp: Counts::new("tcp")?,
92 udp: Counts::new("udp")?,
93 allocated: register_int_gauge!("allocated", "The number of allocated ports, count = 16383")?,
94 })
95 }
96
97 /// # Example
98 ///
99 /// ```
100 /// use turn_server::statistics::{prometheus::*, *};
101 /// use turn_server::stun::Transport;
102 ///
103 /// METRICS.add(Transport::TCP, &Stats::ReceivedBytes(1));
104 /// assert_eq!(METRICS.tcp.received_bytes.get(), 1);
105 /// assert_eq!(METRICS.total.received_bytes.get(), 1);
106 /// assert_eq!(METRICS.udp.received_bytes.get(), 0);
107 /// ```
108 pub fn add(&self, transport: Transport, payload: &Stats) {
109 self.total.add(payload);
110
111 if transport == Transport::TCP {
112 self.tcp.add(payload);
113 } else {
114 self.udp.add(payload);
115 }
116 }
117 }
118
119 /// Generate prometheus metrics data that externally needs to be exposed to
120 /// the `/metrics` route.
121 pub fn generate_metrics(buf: &mut Vec<u8>) -> Result<()> {
122 TextEncoder::new().encode(&prometheus::gather(), buf)?;
123 Ok(())
124 }
125}
126
127/// The type of information passed in the statisticsing channel
128#[derive(Debug, Clone, Copy)]
129pub enum Stats {
130 ReceivedBytes(usize),
131 SendBytes(usize),
132 ReceivedPkts(usize),
133 SendPkts(usize),
134 ErrorPkts(usize),
135}
136
137pub trait Number {
138 fn add(&self, value: usize);
139 fn get(&self) -> usize;
140}
141
142#[derive(Default)]
143pub struct Count(AtomicUsize);
144
145impl Number for Count {
146 fn add(&self, value: usize) {
147 self.0.fetch_add(value, Ordering::Relaxed);
148 }
149
150 fn get(&self) -> usize {
151 self.0.load(Ordering::Relaxed)
152 }
153}
154
155/// Worker independent statisticsing statistics
156pub struct Counts<T> {
157 pub received_bytes: T,
158 pub send_bytes: T,
159 pub received_pkts: T,
160 pub send_pkts: T,
161 pub error_pkts: T,
162}
163
164impl<T: Number> Counts<T> {
165 /// # Example
166 ///
167 /// ```
168 /// use turn_server::statistics::*;
169 ///
170 /// let counts = Counts {
171 /// received_bytes: Count::default(),
172 /// send_bytes: Count::default(),
173 /// received_pkts: Count::default(),
174 /// send_pkts: Count::default(),
175 /// error_pkts: Count::default(),
176 /// };
177 ///
178 /// counts.add(&Stats::ReceivedBytes(1));
179 /// assert_eq!(counts.received_bytes.get(), 1);
180 ///
181 /// counts.add(&Stats::ReceivedPkts(1));
182 /// assert_eq!(counts.received_pkts.get(), 1);
183 ///
184 /// counts.add(&Stats::SendBytes(1));
185 /// assert_eq!(counts.send_bytes.get(), 1);
186 ///
187 /// counts.add(&Stats::SendPkts(1));
188 /// assert_eq!(counts.send_pkts.get(), 1);
189 /// ```
190 pub fn add(&self, payload: &Stats) {
191 match payload {
192 Stats::ReceivedBytes(v) => self.received_bytes.add(*v),
193 Stats::ReceivedPkts(v) => self.received_pkts.add(*v),
194 Stats::SendBytes(v) => self.send_bytes.add(*v),
195 Stats::SendPkts(v) => self.send_pkts.add(*v),
196 Stats::ErrorPkts(v) => self.error_pkts.add(*v),
197 }
198 }
199}
200
201/// worker cluster statistics
202#[derive(Clone)]
203pub struct Statistics(Arc<RwLock<AHashMap<SessionAddr, Counts<Count>>>>);
204
205impl Default for Statistics {
206 #[cfg(feature = "api")]
207 fn default() -> Self {
208 Self(Arc::new(RwLock::new(AHashMap::with_capacity(1024))))
209 }
210
211 // There's no need to take up so much memory when you don't have stats enabled.
212 #[cfg(not(feature = "api"))]
213 fn default() -> Self {
214 Self(Default::default())
215 }
216}
217
218impl Statistics {
219 /// get signal sender
220 ///
221 /// The signal sender can notify the statisticsing instance to update
222 /// internal statistics.
223 ///
224 /// # Example
225 ///
226 /// ```
227 /// use std::net::SocketAddr;
228 /// use turn_server::statistics::*;
229 /// use turn_server::stun::Transport;
230 /// use turn_server::turn::*;
231 ///
232 /// let statistics = Statistics::default();
233 /// let sender = statistics.get_reporter(Transport::UDP);
234 ///
235 /// let addr = SessionAddr {
236 /// address: "127.0.0.1:8080".parse().unwrap(),
237 /// interface: "127.0.0.1:3478".parse().unwrap(),
238 /// };
239 ///
240 /// sender.send(&addr, &[Stats::ReceivedBytes(100)]);
241 /// ```
242 pub fn get_reporter(&self, transport: Transport) -> StatisticsReporter {
243 StatisticsReporter {
244 table: self.0.clone(),
245 transport,
246 }
247 }
248
249 /// Add an address to the watch list
250 ///
251 /// # Example
252 ///
253 /// ```
254 /// use std::net::SocketAddr;
255 /// use turn_server::statistics::*;
256 /// use turn_server::turn::*;
257 ///
258 /// let statistics = Statistics::default();
259 ///
260 /// let addr = SessionAddr {
261 /// address: "127.0.0.1:8080".parse().unwrap(),
262 /// interface: "127.0.0.1:3478".parse().unwrap(),
263 /// };
264 ///
265 /// statistics.register(addr.clone());
266 /// assert_eq!(statistics.get(&addr).is_some(), true);
267 /// ```
268 pub fn register(&self, addr: SessionAddr) {
269 #[cfg(feature = "prometheus")]
270 {
271 self::prometheus::METRICS.allocated.inc();
272 }
273
274 self.0.write().insert(
275 addr,
276 Counts {
277 received_bytes: Count::default(),
278 send_bytes: Count::default(),
279 received_pkts: Count::default(),
280 send_pkts: Count::default(),
281 error_pkts: Count::default(),
282 },
283 );
284 }
285
286 /// Remove an address from the watch list
287 ///
288 /// # Example
289 ///
290 /// ```
291 /// use std::net::SocketAddr;
292 /// use turn_server::statistics::*;
293 /// use turn_server::turn::*;
294 ///
295 /// let statistics = Statistics::default();
296 ///
297 /// let addr = SessionAddr {
298 /// address: "127.0.0.1:8080".parse().unwrap(),
299 /// interface: "127.0.0.1:3478".parse().unwrap(),
300 /// };
301 ///
302 /// statistics.register(addr.clone());
303 /// assert_eq!(statistics.get(&addr).is_some(), true);
304 ///
305 /// statistics.unregister(&addr);
306 /// assert_eq!(statistics.get(&addr).is_some(), false);
307 /// ```
308 pub fn unregister(&self, addr: &SessionAddr) {
309 #[cfg(feature = "prometheus")]
310 {
311 self::prometheus::METRICS.allocated.dec();
312 }
313
314 self.0.write().remove(addr);
315 }
316
317 /// Obtain a list of statistics from statisticsing
318 ///
319 /// The obtained list is in the same order as it was added.
320 ///
321 /// # Example
322 ///
323 /// ```
324 /// use std::net::SocketAddr;
325 /// use turn_server::statistics::*;
326 /// use turn_server::turn::*;
327 ///
328 /// let statistics = Statistics::default();
329 ///
330 /// let addr = SessionAddr {
331 /// address: "127.0.0.1:8080".parse().unwrap(),
332 /// interface: "127.0.0.1:3478".parse().unwrap(),
333 /// };
334 ///
335 /// statistics.register(addr.clone());
336 /// assert_eq!(statistics.get(&addr).is_some(), true);
337 /// ```
338 pub fn get(&self, addr: &SessionAddr) -> Option<Counts<usize>> {
339 self.0.read().get(addr).map(|counts| Counts {
340 received_bytes: counts.received_bytes.get(),
341 received_pkts: counts.received_pkts.get(),
342 send_bytes: counts.send_bytes.get(),
343 send_pkts: counts.send_pkts.get(),
344 error_pkts: counts.error_pkts.get(),
345 })
346 }
347}
348
349/// statistics reporter
350///
351/// It is held by each worker, and status information can be sent to the
352/// statisticsing instance through this instance to update the internal
353/// statistical information of the statistics.
354#[derive(Clone)]
355#[allow(unused)]
356pub struct StatisticsReporter {
357 table: Arc<RwLock<AHashMap<SessionAddr, Counts<Count>>>>,
358 transport: Transport,
359}
360
361impl StatisticsReporter {
362 #[allow(unused_variables)]
363 pub fn send(&self, addr: &SessionAddr, reports: &[Stats]) {
364 #[cfg(feature = "api")]
365 {
366 #[cfg(feature = "prometheus")]
367 {
368 for report in reports {
369 self::prometheus::METRICS.add(self.transport, report);
370 }
371 }
372
373 if let Some(counts) = self.table.read().get(addr) {
374 for item in reports {
375 counts.add(item);
376 }
377 }
378 }
379 }
380}