turn_server/statistics.rs
1use std::sync::{
2 Arc,
3 atomic::{AtomicUsize, Ordering},
4};
5
6use ahash::HashMap;
7use parking_lot::RwLock;
8
9use crate::{server::transport::Transport, service::session::Identifier};
10
11/// The type of information passed in the statistics channel
12#[derive(Debug, Clone, Copy)]
13pub enum Stats {
14 ReceivedBytes(usize),
15 SendBytes(usize),
16 ReceivedPkts(usize),
17 SendPkts(usize),
18 ErrorPkts(usize),
19}
20
21pub trait Number {
22 fn add(&self, value: usize);
23 fn get(&self) -> usize;
24}
25
26#[derive(Default)]
27pub struct Count(AtomicUsize);
28
29impl Number for Count {
30 fn add(&self, value: usize) {
31 self.0.fetch_add(value, Ordering::Relaxed);
32 }
33
34 fn get(&self) -> usize {
35 self.0.load(Ordering::Relaxed)
36 }
37}
38
39/// Worker independent statistics
40pub struct Counts<T> {
41 pub received_bytes: T,
42 pub send_bytes: T,
43 pub received_pkts: T,
44 pub send_pkts: T,
45 pub error_pkts: T,
46}
47
48impl<T: Number> Counts<T> {
49 /// # Example
50 ///
51 /// ```
52 /// use turn_server::statistics::*;
53 ///
54 /// let counts = Counts {
55 /// received_bytes: Count::default(),
56 /// send_bytes: Count::default(),
57 /// received_pkts: Count::default(),
58 /// send_pkts: Count::default(),
59 /// error_pkts: Count::default(),
60 /// };
61 ///
62 /// counts.add(&Stats::ReceivedBytes(1));
63 /// assert_eq!(counts.received_bytes.get(), 1);
64 ///
65 /// counts.add(&Stats::ReceivedPkts(1));
66 /// assert_eq!(counts.received_pkts.get(), 1);
67 ///
68 /// counts.add(&Stats::SendBytes(1));
69 /// assert_eq!(counts.send_bytes.get(), 1);
70 ///
71 /// counts.add(&Stats::SendPkts(1));
72 /// assert_eq!(counts.send_pkts.get(), 1);
73 /// ```
74 pub fn add(&self, payload: &Stats) {
75 match payload {
76 Stats::ReceivedBytes(v) => self.received_bytes.add(*v),
77 Stats::ReceivedPkts(v) => self.received_pkts.add(*v),
78 Stats::SendBytes(v) => self.send_bytes.add(*v),
79 Stats::SendPkts(v) => self.send_pkts.add(*v),
80 Stats::ErrorPkts(v) => self.error_pkts.add(*v),
81 }
82 }
83}
84
85/// worker cluster statistics
86#[derive(Clone)]
87pub struct Statistics(Arc<RwLock<HashMap<Identifier, Counts<Count>>>>);
88
89impl Default for Statistics {
90 #[cfg(feature = "api")]
91 fn default() -> Self {
92 use ahash::HashMapExt;
93
94 Self(Arc::new(RwLock::new(HashMap::with_capacity(1024))))
95 }
96
97 // There's no need to take up so much memory when you don't have stats enabled.
98 #[cfg(not(feature = "api"))]
99 fn default() -> Self {
100 Self(Default::default())
101 }
102}
103
104impl Statistics {
105 /// get signal sender
106 ///
107 /// The signal sender can notify the statistics instance to update
108 /// internal statistics.
109 ///
110 /// # Example
111 ///
112 /// ```
113 /// use turn_server::statistics::*;
114 /// use turn_server::service::session::Identifier;
115 /// use turn_server::server::transport::Transport;
116 ///
117 /// let statistics = Statistics::default();
118 /// let sender = statistics.get_reporter(Transport::Tcp);
119 ///
120 /// let identifier = Identifier::new(
121 /// "127.0.0.1:8080".parse().unwrap(),
122 /// "127.0.0.1:3478".parse().unwrap(),
123 /// );
124 ///
125 /// sender.send(&identifier, &[Stats::ReceivedBytes(100)]);
126 /// ```
127 pub fn get_reporter(&self, transport: Transport) -> StatisticsReporter {
128 StatisticsReporter {
129 table: self.0.clone(),
130 transport,
131 }
132 }
133
134 /// Add an address to the watch list
135 ///
136 /// # Example
137 ///
138 /// ```
139 /// use turn_server::statistics::*;
140 /// use turn_server::service::session::Identifier;
141 ///
142 /// let statistics = Statistics::default();
143 ///
144 /// let identifier = Identifier::new(
145 /// "127.0.0.1:8080".parse().unwrap(),
146 /// "127.0.0.1:3478".parse().unwrap(),
147 /// );
148 ///
149 /// statistics.register(identifier.clone());
150 /// assert_eq!(statistics.get(&identifier).is_some(), true);
151 /// ```
152 pub fn register(&self, identifier: Identifier) {
153 #[cfg(feature = "prometheus")]
154 {
155 crate::prometheus::METRICS.allocated.inc();
156 }
157
158 self.0.write().insert(
159 identifier,
160 Counts {
161 received_bytes: Count::default(),
162 send_bytes: Count::default(),
163 received_pkts: Count::default(),
164 send_pkts: Count::default(),
165 error_pkts: Count::default(),
166 },
167 );
168 }
169
170 /// Remove an address from the watch list
171 ///
172 /// # Example
173 ///
174 /// ```
175 /// use turn_server::statistics::*;
176 /// use turn_server::service::session::Identifier;
177 ///
178 /// let statistics = Statistics::default();
179 ///
180 /// let identifier = Identifier::new(
181 /// "127.0.0.1:8080".parse().unwrap(),
182 /// "127.0.0.1:3478".parse().unwrap(),
183 /// );
184 ///
185 /// statistics.register(identifier.clone());
186 /// assert_eq!(statistics.get(&identifier).is_some(), true);
187 ///
188 /// statistics.unregister(&identifier);
189 /// assert_eq!(statistics.get(&identifier).is_some(), false);
190 /// ```
191 pub fn unregister(&self, identifier: &Identifier) {
192 #[cfg(feature = "prometheus")]
193 {
194 crate::prometheus::METRICS.allocated.dec();
195 }
196
197 self.0.write().remove(identifier);
198 }
199
200 /// Obtain a list of statistics from statistics
201 ///
202 /// The obtained list is in the same order as it was added.
203 ///
204 /// # Example
205 ///
206 /// ```
207 /// use turn_server::statistics::*;
208 /// use turn_server::service::session::Identifier;
209 ///
210 /// let statistics = Statistics::default();
211 ///
212 /// let identifier = Identifier::new(
213 /// "127.0.0.1:8080".parse().unwrap(),
214 /// "127.0.0.1:3478".parse().unwrap(),
215 /// );
216 ///
217 /// statistics.register(identifier.clone());
218 /// assert_eq!(statistics.get(&identifier).is_some(), true);
219 /// ```
220 pub fn get(&self, identifier: &Identifier) -> Option<Counts<usize>> {
221 self.0.read().get(identifier).map(|counts| Counts {
222 received_bytes: counts.received_bytes.get(),
223 received_pkts: counts.received_pkts.get(),
224 send_bytes: counts.send_bytes.get(),
225 send_pkts: counts.send_pkts.get(),
226 error_pkts: counts.error_pkts.get(),
227 })
228 }
229}
230
231/// statistics reporter
232///
233/// It is held by each worker, and status information can be sent to the
234/// statistics instance through this instance to update the internal
235/// statistical information of the statistics.
236#[derive(Clone)]
237#[allow(unused)]
238pub struct StatisticsReporter {
239 table: Arc<RwLock<HashMap<Identifier, Counts<Count>>>>,
240 transport: Transport,
241}
242
243impl StatisticsReporter {
244 #[allow(unused_variables)]
245 pub fn send(&self, identifier: &Identifier, reports: &[Stats]) {
246 #[cfg(feature = "api")]
247 {
248 #[cfg(feature = "prometheus")]
249 {
250 for report in reports {
251 crate::prometheus::METRICS.add(self.transport, report);
252 }
253 }
254
255 if let Some(counts) = self.table.read().get(identifier) {
256 for item in reports {
257 counts.add(item);
258 }
259 }
260 }
261 }
262}