turn_server/statistics.rs
1use std::sync::{
2 Arc,
3 atomic::{AtomicUsize, Ordering},
4};
5
6use ahash::HashMap;
7use parking_lot::RwLock;
8
9use crate::service::{Transport, session::Identifier};
10
11/// The type of information passed in the statistics channel
12#[derive(Debug, Clone, Copy)]
13pub enum Stats {
14 ReceivedBytes(usize),
15 SendBytes(usize),
16 ReceivedPkts(usize),
17 SendPkts(usize),
18 ErrorPkts(usize),
19}
20
21pub trait Number {
22 fn add(&self, value: usize);
23 fn get(&self) -> usize;
24}
25
26#[derive(Default)]
27pub struct Count(AtomicUsize);
28
29impl Number for Count {
30 fn add(&self, value: usize) {
31 self.0.fetch_add(value, Ordering::Relaxed);
32 }
33
34 fn get(&self) -> usize {
35 self.0.load(Ordering::Relaxed)
36 }
37}
38
39/// Worker independent statistics
40pub struct Counts<T> {
41 pub received_bytes: T,
42 pub send_bytes: T,
43 pub received_pkts: T,
44 pub send_pkts: T,
45 pub error_pkts: T,
46}
47
48impl<T: Number> Counts<T> {
49 /// # Example
50 ///
51 /// ```
52 /// use turn_server::statistics::*;
53 ///
54 /// let counts = Counts {
55 /// received_bytes: Count::default(),
56 /// send_bytes: Count::default(),
57 /// received_pkts: Count::default(),
58 /// send_pkts: Count::default(),
59 /// error_pkts: Count::default(),
60 /// };
61 ///
62 /// counts.add(&Stats::ReceivedBytes(1));
63 /// assert_eq!(counts.received_bytes.get(), 1);
64 ///
65 /// counts.add(&Stats::ReceivedPkts(1));
66 /// assert_eq!(counts.received_pkts.get(), 1);
67 ///
68 /// counts.add(&Stats::SendBytes(1));
69 /// assert_eq!(counts.send_bytes.get(), 1);
70 ///
71 /// counts.add(&Stats::SendPkts(1));
72 /// assert_eq!(counts.send_pkts.get(), 1);
73 /// ```
74 pub fn add(&self, payload: &Stats) {
75 match payload {
76 Stats::ReceivedBytes(v) => self.received_bytes.add(*v),
77 Stats::ReceivedPkts(v) => self.received_pkts.add(*v),
78 Stats::SendBytes(v) => self.send_bytes.add(*v),
79 Stats::SendPkts(v) => self.send_pkts.add(*v),
80 Stats::ErrorPkts(v) => self.error_pkts.add(*v),
81 }
82 }
83}
84
85/// worker cluster statistics
86#[derive(Clone)]
87pub struct Statistics(Arc<RwLock<HashMap<Identifier, Counts<Count>>>>);
88
89impl Default for Statistics {
90 #[cfg(feature = "api")]
91 fn default() -> Self {
92 use ahash::HashMapExt;
93
94 Self(Arc::new(RwLock::new(HashMap::with_capacity(1024))))
95 }
96
97 // There's no need to take up so much memory when you don't have stats enabled.
98 #[cfg(not(feature = "api"))]
99 fn default() -> Self {
100 Self(Default::default())
101 }
102}
103
104impl Statistics {
105 /// get signal sender
106 ///
107 /// The signal sender can notify the statistics instance to update
108 /// internal statistics.
109 ///
110 /// # Example
111 ///
112 /// ```
113 /// use turn_server::statistics::*;
114 /// use turn_server::service::session::Identifier;
115 /// use turn_server::service::Transport;
116 ///
117 /// let statistics = Statistics::default();
118 /// let sender = statistics.get_reporter(Transport::Tcp);
119 ///
120 /// let identifier = Identifier {
121 /// source: "127.0.0.1:8080".parse().unwrap(),
122 /// external: "127.0.0.1:3478".parse().unwrap(),
123 /// interface: "127.0.0.1:3478".parse().unwrap(),
124 /// transport: Transport::Tcp,
125 /// };
126 ///
127 /// sender.send(&identifier, &[Stats::ReceivedBytes(100)]);
128 /// ```
129 pub fn get_reporter(&self, transport: Transport) -> StatisticsReporter {
130 StatisticsReporter {
131 table: self.0.clone(),
132 transport,
133 }
134 }
135
136 /// Add an address to the watch list
137 ///
138 /// # Example
139 ///
140 /// ```
141 /// use turn_server::statistics::*;
142 /// use turn_server::service::session::Identifier;
143 /// use turn_server::service::Transport;
144 ///
145 /// let statistics = Statistics::default();
146 ///
147 /// let identifier = Identifier {
148 /// source: "127.0.0.1:8080".parse().unwrap(),
149 /// external: "127.0.0.1:3478".parse().unwrap(),
150 /// interface: "127.0.0.1:3478".parse().unwrap(),
151 /// transport: Transport::Udp,
152 /// };
153 ///
154 /// statistics.register(identifier.clone());
155 /// assert_eq!(statistics.get(&identifier).is_some(), true);
156 /// ```
157 pub fn register(&self, identifier: Identifier) {
158 #[cfg(feature = "prometheus")]
159 {
160 crate::prometheus::METRICS.allocated.inc();
161 }
162
163 self.0.write().insert(
164 identifier,
165 Counts {
166 received_bytes: Count::default(),
167 send_bytes: Count::default(),
168 received_pkts: Count::default(),
169 send_pkts: Count::default(),
170 error_pkts: Count::default(),
171 },
172 );
173 }
174
175 /// Remove an address from the watch list
176 ///
177 /// # Example
178 ///
179 /// ```
180 /// use turn_server::statistics::*;
181 /// use turn_server::service::session::Identifier;
182 /// use turn_server::service::Transport;
183 ///
184 /// let statistics = Statistics::default();
185 ///
186 /// let identifier = Identifier {
187 /// source: "127.0.0.1:8080".parse().unwrap(),
188 /// external: "127.0.0.1:3478".parse().unwrap(),
189 /// interface: "127.0.0.1:3478".parse().unwrap(),
190 /// transport: Transport::Udp,
191 /// };
192 ///
193 /// statistics.register(identifier.clone());
194 /// assert_eq!(statistics.get(&identifier).is_some(), true);
195 ///
196 /// statistics.unregister(&identifier);
197 /// assert_eq!(statistics.get(&identifier).is_some(), false);
198 /// ```
199 pub fn unregister(&self, identifier: &Identifier) {
200 #[cfg(feature = "prometheus")]
201 {
202 crate::prometheus::METRICS.allocated.dec();
203 }
204
205 self.0.write().remove(identifier);
206 }
207
208 /// Obtain a list of statistics from statistics
209 ///
210 /// The obtained list is in the same order as it was added.
211 ///
212 /// # Example
213 ///
214 /// ```
215 /// use turn_server::statistics::*;
216 /// use turn_server::service::session::Identifier;
217 /// use turn_server::service::Transport;
218 ///
219 /// let statistics = Statistics::default();
220 ///
221 /// let identifier = Identifier {
222 /// source: "127.0.0.1:8080".parse().unwrap(),
223 /// external: "127.0.0.1:3478".parse().unwrap(),
224 /// interface: "127.0.0.1:3478".parse().unwrap(),
225 /// transport: Transport::Udp,
226 /// };
227 ///
228 /// statistics.register(identifier.clone());
229 /// assert_eq!(statistics.get(&identifier).is_some(), true);
230 /// ```
231 pub fn get(&self, identifier: &Identifier) -> Option<Counts<usize>> {
232 self.0.read().get(identifier).map(|counts| Counts {
233 received_bytes: counts.received_bytes.get(),
234 received_pkts: counts.received_pkts.get(),
235 send_bytes: counts.send_bytes.get(),
236 send_pkts: counts.send_pkts.get(),
237 error_pkts: counts.error_pkts.get(),
238 })
239 }
240}
241
242/// statistics reporter
243///
244/// It is held by each worker, and status information can be sent to the
245/// statistics instance through this instance to update the internal
246/// statistical information of the statistics.
247#[derive(Clone)]
248#[allow(unused)]
249pub struct StatisticsReporter {
250 table: Arc<RwLock<HashMap<Identifier, Counts<Count>>>>,
251 transport: Transport,
252}
253
254impl StatisticsReporter {
255 #[allow(unused_variables)]
256 pub fn send(&self, identifier: &Identifier, reports: &[Stats]) {
257 #[cfg(feature = "api")]
258 {
259 #[cfg(feature = "prometheus")]
260 {
261 for report in reports {
262 crate::prometheus::METRICS.add(self.transport, report);
263 }
264 }
265
266 if let Some(counts) = self.table.read().get(identifier) {
267 for item in reports {
268 counts.add(item);
269 }
270 }
271 }
272 }
273}