turn_server/
statistics.rs

1use std::sync::{
2    Arc,
3    atomic::{AtomicUsize, Ordering},
4};
5
6use ahash::AHashMap;
7use parking_lot::RwLock;
8
9use crate::{stun::Transport, turn::SessionAddr};
10
11/// [issue](https://github.com/mycrl/turn-rs/issues/101)
12///
13/// Integrated Prometheus Metrics Exporter
14pub mod prometheus {
15    use std::sync::LazyLock;
16
17    use anyhow::Result;
18    use prometheus::{Encoder, IntCounter, IntGauge, TextEncoder, register_int_counter, register_int_gauge};
19
20    use super::{Counts, Number, Stats};
21    use crate::stun::Transport;
22
23    // The `register_int_counter` macro would be too long if written out in full,
24    // with too many line breaks after formatting, and this is wrapped directly into
25    // a macro again.
26    macro_rules! counter {
27        ($prefix:expr, $operation:expr, $dst:expr) => {
28            register_int_counter!(
29                format!("{}_{}_{}", $prefix, $operation, $dst),
30                format!("The {} amount of {} {}", $prefix, $dst, $operation)
31            )
32        };
33    }
34
35    pub static METRICS: LazyLock<Metrics> = LazyLock::new(|| Metrics::default());
36
37    /// # Example
38    ///
39    /// ```
40    /// use prometheus::register_int_counter;
41    /// use turn_server::statistics::{Number, prometheus::*};
42    ///
43    /// let count = register_int_counter!("test", "test").unwrap();
44    ///
45    /// count.add(1);
46    /// assert_eq!(count.get(), 1);
47    ///
48    /// count.add(1);
49    /// assert_eq!(count.get(), 2);
50    /// ```
51    impl Number for IntCounter {
52        fn add(&self, value: usize) {
53            self.inc_by(value as u64);
54        }
55
56        fn get(&self) -> usize {
57            self.get() as usize
58        }
59    }
60
61    impl Counts<IntCounter> {
62        fn new(prefix: &str) -> Result<Self> {
63            Ok(Self {
64                received_bytes: counter!(prefix, "received", "bytes")?,
65                send_bytes: counter!(prefix, "sent", "bytes")?,
66                received_pkts: counter!(prefix, "received", "packets")?,
67                send_pkts: counter!(prefix, "sent", "packets")?,
68                error_pkts: counter!(prefix, "error", "packets")?,
69            })
70        }
71    }
72
73    /// Summarized metrics data for Global/TCP/UDP.
74    pub struct Metrics {
75        pub allocated: IntGauge,
76        pub total: Counts<IntCounter>,
77        pub tcp: Counts<IntCounter>,
78        pub udp: Counts<IntCounter>,
79    }
80
81    impl Default for Metrics {
82        fn default() -> Self {
83            Self::new().expect("Unable to initialize Prometheus metrics data!")
84        }
85    }
86
87    impl Metrics {
88        pub fn new() -> Result<Self> {
89            Ok(Self {
90                total: Counts::new("total")?,
91                tcp: Counts::new("tcp")?,
92                udp: Counts::new("udp")?,
93                allocated: register_int_gauge!("allocated", "The number of allocated ports, count = 16383")?,
94            })
95        }
96
97        /// # Example
98        ///
99        /// ```
100        /// use turn_server::statistics::{prometheus::*, *};
101        /// use turn_server::stun::Transport;
102        ///
103        /// METRICS.add(Transport::TCP, &Stats::ReceivedBytes(1));
104        /// assert_eq!(METRICS.tcp.received_bytes.get(), 1);
105        /// assert_eq!(METRICS.total.received_bytes.get(), 1);
106        /// assert_eq!(METRICS.udp.received_bytes.get(), 0);
107        /// ```
108        pub fn add(&self, transport: Transport, payload: &Stats) {
109            self.total.add(payload);
110
111            if transport == Transport::TCP {
112                self.tcp.add(payload);
113            } else {
114                self.udp.add(payload);
115            }
116        }
117    }
118
119    /// Generate prometheus metrics data that externally needs to be exposed to
120    /// the `/metrics` route.
121    pub fn generate_metrics(buf: &mut Vec<u8>) -> Result<()> {
122        TextEncoder::new().encode(&prometheus::gather(), buf)?;
123        Ok(())
124    }
125}
126
127/// The type of information passed in the statisticsing channel
128#[derive(Debug, Clone, Copy)]
129pub enum Stats {
130    ReceivedBytes(usize),
131    SendBytes(usize),
132    ReceivedPkts(usize),
133    SendPkts(usize),
134    ErrorPkts(usize),
135}
136
137pub trait Number {
138    fn add(&self, value: usize);
139    fn get(&self) -> usize;
140}
141
142#[derive(Default)]
143pub struct Count(AtomicUsize);
144
145impl Number for Count {
146    fn add(&self, value: usize) {
147        self.0.fetch_add(value, Ordering::Relaxed);
148    }
149
150    fn get(&self) -> usize {
151        self.0.load(Ordering::Relaxed)
152    }
153}
154
155/// Worker independent statisticsing statistics
156pub struct Counts<T> {
157    pub received_bytes: T,
158    pub send_bytes: T,
159    pub received_pkts: T,
160    pub send_pkts: T,
161    pub error_pkts: T,
162}
163
164impl<T: Number> Counts<T> {
165    /// # Example
166    ///
167    /// ```
168    /// use turn_server::statistics::*;
169    ///
170    /// let counts = Counts {
171    ///     received_bytes: Count::default(),
172    ///     send_bytes: Count::default(),
173    ///     received_pkts: Count::default(),
174    ///     send_pkts: Count::default(),
175    ///     error_pkts: Count::default(),
176    /// };
177    ///
178    /// counts.add(&Stats::ReceivedBytes(1));
179    /// assert_eq!(counts.received_bytes.get(), 1);
180    ///
181    /// counts.add(&Stats::ReceivedPkts(1));
182    /// assert_eq!(counts.received_pkts.get(), 1);
183    ///
184    /// counts.add(&Stats::SendBytes(1));
185    /// assert_eq!(counts.send_bytes.get(), 1);
186    ///
187    /// counts.add(&Stats::SendPkts(1));
188    /// assert_eq!(counts.send_pkts.get(), 1);
189    /// ```
190    pub fn add(&self, payload: &Stats) {
191        match payload {
192            Stats::ReceivedBytes(v) => self.received_bytes.add(*v),
193            Stats::ReceivedPkts(v) => self.received_pkts.add(*v),
194            Stats::SendBytes(v) => self.send_bytes.add(*v),
195            Stats::SendPkts(v) => self.send_pkts.add(*v),
196            Stats::ErrorPkts(v) => self.error_pkts.add(*v),
197        }
198    }
199}
200
201/// worker cluster statistics
202#[derive(Clone)]
203pub struct Statistics(Arc<RwLock<AHashMap<SessionAddr, Counts<Count>>>>);
204
205impl Default for Statistics {
206    #[cfg(feature = "api")]
207    fn default() -> Self {
208        Self(Arc::new(RwLock::new(AHashMap::with_capacity(1024))))
209    }
210
211    // There's no need to take up so much memory when you don't have stats enabled.
212    #[cfg(not(feature = "api"))]
213    fn default() -> Self {
214        Self(Default::default())
215    }
216}
217
218impl Statistics {
219    /// get signal sender
220    ///
221    /// The signal sender can notify the statisticsing instance to update
222    /// internal statistics.
223    ///
224    /// # Example
225    ///
226    /// ```
227    /// use std::net::SocketAddr;
228    /// use turn_server::statistics::*;
229    /// use turn_server::stun::Transport;
230    /// use turn_server::turn::*;
231    ///
232    /// let statistics = Statistics::default();
233    /// let sender = statistics.get_reporter(Transport::UDP);
234    ///
235    /// let addr = SessionAddr {
236    ///     address: "127.0.0.1:8080".parse().unwrap(),
237    ///     interface: "127.0.0.1:3478".parse().unwrap(),
238    /// };
239    ///
240    /// sender.send(&addr, &[Stats::ReceivedBytes(100)]);
241    /// ```
242    pub fn get_reporter(&self, transport: Transport) -> StatisticsReporter {
243        StatisticsReporter {
244            table: self.0.clone(),
245            transport,
246        }
247    }
248
249    /// Add an address to the watch list
250    ///
251    /// # Example
252    ///
253    /// ```
254    /// use std::net::SocketAddr;
255    /// use turn_server::statistics::*;
256    /// use turn_server::turn::*;
257    ///
258    /// let statistics = Statistics::default();
259    ///
260    /// let addr = SessionAddr {
261    ///     address: "127.0.0.1:8080".parse().unwrap(),
262    ///     interface: "127.0.0.1:3478".parse().unwrap(),
263    /// };
264    ///
265    /// statistics.register(addr.clone());
266    /// assert_eq!(statistics.get(&addr).is_some(), true);
267    /// ```
268    pub fn register(&self, addr: SessionAddr) {
269        #[cfg(feature = "prometheus")]
270        {
271            self::prometheus::METRICS.allocated.inc();
272        }
273
274        self.0.write().insert(
275            addr,
276            Counts {
277                received_bytes: Count::default(),
278                send_bytes: Count::default(),
279                received_pkts: Count::default(),
280                send_pkts: Count::default(),
281                error_pkts: Count::default(),
282            },
283        );
284    }
285
286    /// Remove an address from the watch list
287    ///
288    /// # Example
289    ///
290    /// ```
291    /// use std::net::SocketAddr;
292    /// use turn_server::statistics::*;
293    /// use turn_server::turn::*;
294    ///
295    /// let statistics = Statistics::default();
296    ///
297    /// let addr = SessionAddr {
298    ///     address: "127.0.0.1:8080".parse().unwrap(),
299    ///     interface: "127.0.0.1:3478".parse().unwrap(),
300    /// };
301    ///
302    /// statistics.register(addr.clone());
303    /// assert_eq!(statistics.get(&addr).is_some(), true);
304    ///
305    /// statistics.unregister(&addr);
306    /// assert_eq!(statistics.get(&addr).is_some(), false);
307    /// ```
308    pub fn unregister(&self, addr: &SessionAddr) {
309        #[cfg(feature = "prometheus")]
310        {
311            self::prometheus::METRICS.allocated.dec();
312        }
313
314        self.0.write().remove(addr);
315    }
316
317    /// Obtain a list of statistics from statisticsing
318    ///
319    /// The obtained list is in the same order as it was added.
320    ///
321    /// # Example
322    ///
323    /// ```
324    /// use std::net::SocketAddr;
325    /// use turn_server::statistics::*;
326    /// use turn_server::turn::*;
327    ///
328    /// let statistics = Statistics::default();
329    ///
330    /// let addr = SessionAddr {
331    ///     address: "127.0.0.1:8080".parse().unwrap(),
332    ///     interface: "127.0.0.1:3478".parse().unwrap(),
333    /// };
334    ///
335    /// statistics.register(addr.clone());
336    /// assert_eq!(statistics.get(&addr).is_some(), true);
337    /// ```
338    pub fn get(&self, addr: &SessionAddr) -> Option<Counts<usize>> {
339        self.0.read().get(addr).map(|counts| Counts {
340            received_bytes: counts.received_bytes.get(),
341            received_pkts: counts.received_pkts.get(),
342            send_bytes: counts.send_bytes.get(),
343            send_pkts: counts.send_pkts.get(),
344            error_pkts: counts.error_pkts.get(),
345        })
346    }
347}
348
349/// statistics reporter
350///
351/// It is held by each worker, and status information can be sent to the
352/// statisticsing instance through this instance to update the internal
353/// statistical information of the statistics.
354#[derive(Clone)]
355#[allow(unused)]
356pub struct StatisticsReporter {
357    table: Arc<RwLock<AHashMap<SessionAddr, Counts<Count>>>>,
358    transport: Transport,
359}
360
361impl StatisticsReporter {
362    #[allow(unused_variables)]
363    pub fn send(&self, addr: &SessionAddr, reports: &[Stats]) {
364        #[cfg(feature = "api")]
365        {
366            #[cfg(feature = "prometheus")]
367            {
368                for report in reports {
369                    self::prometheus::METRICS.add(self.transport, report);
370                }
371            }
372
373            if let Some(counts) = self.table.read().get(addr) {
374                for item in reports {
375                    counts.add(item);
376                }
377            }
378        }
379    }
380}