Skip to main content

turn_server/
statistics.rs

1use std::sync::{
2    Arc,
3    atomic::{AtomicUsize, Ordering},
4};
5
6use ahash::HashMap;
7use parking_lot::RwLock;
8
9use crate::service::{Transport, session::Identifier};
10
11/// The type of information passed in the statistics channel
12#[derive(Debug, Clone, Copy)]
13pub enum Stats {
14    ReceivedBytes(usize),
15    SendBytes(usize),
16    ReceivedPkts(usize),
17    SendPkts(usize),
18    ErrorPkts(usize),
19}
20
21pub trait Number {
22    fn add(&self, value: usize);
23    fn get(&self) -> usize;
24}
25
26#[derive(Default)]
27pub struct Count(AtomicUsize);
28
29impl Number for Count {
30    fn add(&self, value: usize) {
31        self.0.fetch_add(value, Ordering::Relaxed);
32    }
33
34    fn get(&self) -> usize {
35        self.0.load(Ordering::Relaxed)
36    }
37}
38
39/// Worker independent statistics
40pub struct Counts<T> {
41    pub received_bytes: T,
42    pub send_bytes: T,
43    pub received_pkts: T,
44    pub send_pkts: T,
45    pub error_pkts: T,
46}
47
48impl<T: Number> Counts<T> {
49    /// # Example
50    ///
51    /// ```
52    /// use turn_server::statistics::*;
53    ///
54    /// let counts = Counts {
55    ///     received_bytes: Count::default(),
56    ///     send_bytes: Count::default(),
57    ///     received_pkts: Count::default(),
58    ///     send_pkts: Count::default(),
59    ///     error_pkts: Count::default(),
60    /// };
61    ///
62    /// counts.add(&Stats::ReceivedBytes(1));
63    /// assert_eq!(counts.received_bytes.get(), 1);
64    ///
65    /// counts.add(&Stats::ReceivedPkts(1));
66    /// assert_eq!(counts.received_pkts.get(), 1);
67    ///
68    /// counts.add(&Stats::SendBytes(1));
69    /// assert_eq!(counts.send_bytes.get(), 1);
70    ///
71    /// counts.add(&Stats::SendPkts(1));
72    /// assert_eq!(counts.send_pkts.get(), 1);
73    /// ```
74    pub fn add(&self, payload: &Stats) {
75        match payload {
76            Stats::ReceivedBytes(v) => self.received_bytes.add(*v),
77            Stats::ReceivedPkts(v) => self.received_pkts.add(*v),
78            Stats::SendBytes(v) => self.send_bytes.add(*v),
79            Stats::SendPkts(v) => self.send_pkts.add(*v),
80            Stats::ErrorPkts(v) => self.error_pkts.add(*v),
81        }
82    }
83}
84
85/// worker cluster statistics
86#[derive(Clone)]
87pub struct Statistics(Arc<RwLock<HashMap<Identifier, Counts<Count>>>>);
88
89impl Default for Statistics {
90    #[cfg(feature = "api")]
91    fn default() -> Self {
92        use ahash::HashMapExt;
93
94        Self(Arc::new(RwLock::new(HashMap::with_capacity(1024))))
95    }
96
97    // There's no need to take up so much memory when you don't have stats enabled.
98    #[cfg(not(feature = "api"))]
99    fn default() -> Self {
100        Self(Default::default())
101    }
102}
103
104impl Statistics {
105    /// get signal sender
106    ///
107    /// The signal sender can notify the statistics instance to update
108    /// internal statistics.
109    ///
110    /// # Example
111    ///
112    /// ```
113    /// use turn_server::statistics::*;
114    /// use turn_server::service::session::Identifier;
115    /// use turn_server::service::Transport;
116    ///
117    /// let statistics = Statistics::default();
118    /// let sender = statistics.get_reporter(Transport::Tcp);
119    ///
120    /// let identifier = Identifier {
121    ///     source: "127.0.0.1:8080".parse().unwrap(),
122    ///     external: "127.0.0.1:3478".parse().unwrap(),
123    ///     interface: "127.0.0.1:3478".parse().unwrap(),
124    ///     transport: Transport::Tcp,
125    /// };
126    ///
127    /// sender.send(&identifier, &[Stats::ReceivedBytes(100)]);
128    /// ```
129    pub fn get_reporter(&self, transport: Transport) -> StatisticsReporter {
130        StatisticsReporter {
131            table: self.0.clone(),
132            transport,
133        }
134    }
135
136    /// Add an address to the watch list
137    ///
138    /// # Example
139    ///
140    /// ```
141    /// use turn_server::statistics::*;
142    /// use turn_server::service::session::Identifier;
143    /// use turn_server::service::Transport;
144    ///
145    /// let statistics = Statistics::default();
146    ///
147    /// let identifier = Identifier {
148    ///     source: "127.0.0.1:8080".parse().unwrap(),
149    ///     external: "127.0.0.1:3478".parse().unwrap(),
150    ///     interface: "127.0.0.1:3478".parse().unwrap(),
151    ///     transport: Transport::Udp,
152    /// };
153    ///
154    /// statistics.register(identifier.clone());
155    /// assert_eq!(statistics.get(&identifier).is_some(), true);
156    /// ```
157    pub fn register(&self, identifier: Identifier) {
158        #[cfg(feature = "prometheus")]
159        {
160            crate::prometheus::METRICS.allocated.inc();
161        }
162
163        self.0.write().insert(
164            identifier,
165            Counts {
166                received_bytes: Count::default(),
167                send_bytes: Count::default(),
168                received_pkts: Count::default(),
169                send_pkts: Count::default(),
170                error_pkts: Count::default(),
171            },
172        );
173    }
174
175    /// Remove an address from the watch list
176    ///
177    /// # Example
178    ///
179    /// ```
180    /// use turn_server::statistics::*;
181    /// use turn_server::service::session::Identifier;
182    /// use turn_server::service::Transport;
183    ///
184    /// let statistics = Statistics::default();
185    ///
186    /// let identifier = Identifier {
187    ///     source: "127.0.0.1:8080".parse().unwrap(),
188    ///     external: "127.0.0.1:3478".parse().unwrap(),
189    ///     interface: "127.0.0.1:3478".parse().unwrap(),
190    ///     transport: Transport::Udp,
191    /// };
192    ///
193    /// statistics.register(identifier.clone());
194    /// assert_eq!(statistics.get(&identifier).is_some(), true);
195    ///
196    /// statistics.unregister(&identifier);
197    /// assert_eq!(statistics.get(&identifier).is_some(), false);
198    /// ```
199    pub fn unregister(&self, identifier: &Identifier) {
200        #[cfg(feature = "prometheus")]
201        {
202            crate::prometheus::METRICS.allocated.dec();
203        }
204
205        self.0.write().remove(identifier);
206    }
207
208    /// Obtain a list of statistics from statistics
209    ///
210    /// The obtained list is in the same order as it was added.
211    ///
212    /// # Example
213    ///
214    /// ```
215    /// use turn_server::statistics::*;
216    /// use turn_server::service::session::Identifier;
217    /// use turn_server::service::Transport;
218    ///
219    /// let statistics = Statistics::default();
220    ///
221    /// let identifier = Identifier {
222    ///     source: "127.0.0.1:8080".parse().unwrap(),
223    ///     external: "127.0.0.1:3478".parse().unwrap(),
224    ///     interface: "127.0.0.1:3478".parse().unwrap(),
225    ///     transport: Transport::Udp,
226    /// };
227    ///
228    /// statistics.register(identifier.clone());
229    /// assert_eq!(statistics.get(&identifier).is_some(), true);
230    /// ```
231    pub fn get(&self, identifier: &Identifier) -> Option<Counts<usize>> {
232        self.0.read().get(identifier).map(|counts| Counts {
233            received_bytes: counts.received_bytes.get(),
234            received_pkts: counts.received_pkts.get(),
235            send_bytes: counts.send_bytes.get(),
236            send_pkts: counts.send_pkts.get(),
237            error_pkts: counts.error_pkts.get(),
238        })
239    }
240}
241
242/// statistics reporter
243///
244/// It is held by each worker, and status information can be sent to the
245/// statistics instance through this instance to update the internal
246/// statistical information of the statistics.
247#[derive(Clone)]
248#[allow(unused)]
249pub struct StatisticsReporter {
250    table: Arc<RwLock<HashMap<Identifier, Counts<Count>>>>,
251    transport: Transport,
252}
253
254impl StatisticsReporter {
255    #[allow(unused_variables)]
256    pub fn send(&self, identifier: &Identifier, reports: &[Stats]) {
257        #[cfg(feature = "api")]
258        {
259            #[cfg(feature = "prometheus")]
260            {
261                for report in reports {
262                    crate::prometheus::METRICS.add(self.transport, report);
263                }
264            }
265
266            if let Some(counts) = self.table.read().get(identifier) {
267                for item in reports {
268                    counts.add(item);
269                }
270            }
271        }
272    }
273}