Skip to main content

turn_server/
statistics.rs

1use std::sync::{
2    Arc,
3    atomic::{AtomicUsize, Ordering},
4};
5
6use ahash::HashMap;
7use parking_lot::RwLock;
8
9use crate::{server::transport::Transport, service::session::Identifier};
10
11/// The type of information passed in the statistics channel
12#[derive(Debug, Clone, Copy)]
13pub enum Stats {
14    ReceivedBytes(usize),
15    SendBytes(usize),
16    ReceivedPkts(usize),
17    SendPkts(usize),
18    ErrorPkts(usize),
19}
20
21pub trait Number {
22    fn add(&self, value: usize);
23    fn get(&self) -> usize;
24}
25
26#[derive(Default)]
27pub struct Count(AtomicUsize);
28
29impl Number for Count {
30    fn add(&self, value: usize) {
31        self.0.fetch_add(value, Ordering::Relaxed);
32    }
33
34    fn get(&self) -> usize {
35        self.0.load(Ordering::Relaxed)
36    }
37}
38
39/// Worker independent statistics
40pub struct Counts<T> {
41    pub received_bytes: T,
42    pub send_bytes: T,
43    pub received_pkts: T,
44    pub send_pkts: T,
45    pub error_pkts: T,
46}
47
48impl<T: Number> Counts<T> {
49    /// # Example
50    ///
51    /// ```
52    /// use turn_server::statistics::*;
53    ///
54    /// let counts = Counts {
55    ///     received_bytes: Count::default(),
56    ///     send_bytes: Count::default(),
57    ///     received_pkts: Count::default(),
58    ///     send_pkts: Count::default(),
59    ///     error_pkts: Count::default(),
60    /// };
61    ///
62    /// counts.add(&Stats::ReceivedBytes(1));
63    /// assert_eq!(counts.received_bytes.get(), 1);
64    ///
65    /// counts.add(&Stats::ReceivedPkts(1));
66    /// assert_eq!(counts.received_pkts.get(), 1);
67    ///
68    /// counts.add(&Stats::SendBytes(1));
69    /// assert_eq!(counts.send_bytes.get(), 1);
70    ///
71    /// counts.add(&Stats::SendPkts(1));
72    /// assert_eq!(counts.send_pkts.get(), 1);
73    /// ```
74    pub fn add(&self, payload: &Stats) {
75        match payload {
76            Stats::ReceivedBytes(v) => self.received_bytes.add(*v),
77            Stats::ReceivedPkts(v) => self.received_pkts.add(*v),
78            Stats::SendBytes(v) => self.send_bytes.add(*v),
79            Stats::SendPkts(v) => self.send_pkts.add(*v),
80            Stats::ErrorPkts(v) => self.error_pkts.add(*v),
81        }
82    }
83}
84
85/// worker cluster statistics
86#[derive(Clone)]
87pub struct Statistics(Arc<RwLock<HashMap<Identifier, Counts<Count>>>>);
88
89impl Default for Statistics {
90    #[cfg(feature = "api")]
91    fn default() -> Self {
92        use ahash::HashMapExt;
93
94        Self(Arc::new(RwLock::new(HashMap::with_capacity(1024))))
95    }
96
97    // There's no need to take up so much memory when you don't have stats enabled.
98    #[cfg(not(feature = "api"))]
99    fn default() -> Self {
100        Self(Default::default())
101    }
102}
103
104impl Statistics {
105    /// get signal sender
106    ///
107    /// The signal sender can notify the statistics instance to update
108    /// internal statistics.
109    ///
110    /// # Example
111    ///
112    /// ```
113    /// use turn_server::statistics::*;
114    /// use turn_server::service::session::Identifier;
115    /// use turn_server::server::transport::Transport;
116    ///
117    /// let statistics = Statistics::default();
118    /// let sender = statistics.get_reporter(Transport::Tcp);
119    ///
120    /// let identifier = Identifier::new(
121    ///     "127.0.0.1:8080".parse().unwrap(),
122    ///     "127.0.0.1:3478".parse().unwrap(),
123    /// );
124    ///
125    /// sender.send(&identifier, &[Stats::ReceivedBytes(100)]);
126    /// ```
127    pub fn get_reporter(&self, transport: Transport) -> StatisticsReporter {
128        StatisticsReporter {
129            table: self.0.clone(),
130            transport,
131        }
132    }
133
134    /// Add an address to the watch list
135    ///
136    /// # Example
137    ///
138    /// ```
139    /// use turn_server::statistics::*;
140    /// use turn_server::service::session::Identifier;
141    ///
142    /// let statistics = Statistics::default();
143    ///
144    /// let identifier = Identifier::new(
145    ///     "127.0.0.1:8080".parse().unwrap(),
146    ///     "127.0.0.1:3478".parse().unwrap(),
147    /// );
148    ///
149    /// statistics.register(identifier.clone());
150    /// assert_eq!(statistics.get(&identifier).is_some(), true);
151    /// ```
152    pub fn register(&self, identifier: Identifier) {
153        #[cfg(feature = "prometheus")]
154        {
155            crate::prometheus::METRICS.allocated.inc();
156        }
157
158        self.0.write().insert(
159            identifier,
160            Counts {
161                received_bytes: Count::default(),
162                send_bytes: Count::default(),
163                received_pkts: Count::default(),
164                send_pkts: Count::default(),
165                error_pkts: Count::default(),
166            },
167        );
168    }
169
170    /// Remove an address from the watch list
171    ///
172    /// # Example
173    ///
174    /// ```
175    /// use turn_server::statistics::*;
176    /// use turn_server::service::session::Identifier;
177    ///
178    /// let statistics = Statistics::default();
179    ///
180    /// let identifier = Identifier::new(
181    ///     "127.0.0.1:8080".parse().unwrap(),
182    ///     "127.0.0.1:3478".parse().unwrap(),
183    /// );
184    ///
185    /// statistics.register(identifier.clone());
186    /// assert_eq!(statistics.get(&identifier).is_some(), true);
187    ///
188    /// statistics.unregister(&identifier);
189    /// assert_eq!(statistics.get(&identifier).is_some(), false);
190    /// ```
191    pub fn unregister(&self, identifier: &Identifier) {
192        #[cfg(feature = "prometheus")]
193        {
194            crate::prometheus::METRICS.allocated.dec();
195        }
196
197        self.0.write().remove(identifier);
198    }
199
200    /// Obtain a list of statistics from statistics
201    ///
202    /// The obtained list is in the same order as it was added.
203    ///
204    /// # Example
205    ///
206    /// ```
207    /// use turn_server::statistics::*;
208    /// use turn_server::service::session::Identifier;
209    ///
210    /// let statistics = Statistics::default();
211    ///
212    /// let identifier = Identifier::new(
213    ///     "127.0.0.1:8080".parse().unwrap(),
214    ///     "127.0.0.1:3478".parse().unwrap(),
215    /// );
216    ///
217    /// statistics.register(identifier.clone());
218    /// assert_eq!(statistics.get(&identifier).is_some(), true);
219    /// ```
220    pub fn get(&self, identifier: &Identifier) -> Option<Counts<usize>> {
221        self.0.read().get(identifier).map(|counts| Counts {
222            received_bytes: counts.received_bytes.get(),
223            received_pkts: counts.received_pkts.get(),
224            send_bytes: counts.send_bytes.get(),
225            send_pkts: counts.send_pkts.get(),
226            error_pkts: counts.error_pkts.get(),
227        })
228    }
229}
230
231/// statistics reporter
232///
233/// It is held by each worker, and status information can be sent to the
234/// statistics instance through this instance to update the internal
235/// statistical information of the statistics.
236#[derive(Clone)]
237#[allow(unused)]
238pub struct StatisticsReporter {
239    table: Arc<RwLock<HashMap<Identifier, Counts<Count>>>>,
240    transport: Transport,
241}
242
243impl StatisticsReporter {
244    #[allow(unused_variables)]
245    pub fn send(&self, identifier: &Identifier, reports: &[Stats]) {
246        #[cfg(feature = "api")]
247        {
248            #[cfg(feature = "prometheus")]
249            {
250                for report in reports {
251                    crate::prometheus::METRICS.add(self.transport, report);
252                }
253            }
254
255            if let Some(counts) = self.table.read().get(identifier) {
256                for item in reports {
257                    counts.add(item);
258                }
259            }
260        }
261    }
262}