turn_server/statistics.rs
1use std::sync::{
2 Arc,
3 atomic::{AtomicUsize, Ordering},
4};
5
6use ahash::HashMap;
7use parking_lot::RwLock;
8
9use crate::service::session::Identifier;
10
11/// The type of information passed in the statisticsing channel
12#[derive(Debug, Clone, Copy)]
13pub enum Stats {
14 ReceivedBytes(usize),
15 SendBytes(usize),
16 ReceivedPkts(usize),
17 SendPkts(usize),
18 ErrorPkts(usize),
19}
20
21pub trait Number {
22 fn add(&self, value: usize);
23 fn get(&self) -> usize;
24}
25
26#[derive(Default)]
27pub struct Count(AtomicUsize);
28
29impl Number for Count {
30 fn add(&self, value: usize) {
31 self.0.fetch_add(value, Ordering::Relaxed);
32 }
33
34 fn get(&self) -> usize {
35 self.0.load(Ordering::Relaxed)
36 }
37}
38
39/// Worker independent statisticsing statistics
40pub struct Counts<T> {
41 pub received_bytes: T,
42 pub send_bytes: T,
43 pub received_pkts: T,
44 pub send_pkts: T,
45 pub error_pkts: T,
46}
47
48impl<T: Number> Counts<T> {
49 /// # Example
50 ///
51 /// ```
52 /// use turn_server::statistics::*;
53 ///
54 /// let counts = Counts {
55 /// received_bytes: Count::default(),
56 /// send_bytes: Count::default(),
57 /// received_pkts: Count::default(),
58 /// send_pkts: Count::default(),
59 /// error_pkts: Count::default(),
60 /// };
61 ///
62 /// counts.add(&Stats::ReceivedBytes(1));
63 /// assert_eq!(counts.received_bytes.get(), 1);
64 ///
65 /// counts.add(&Stats::ReceivedPkts(1));
66 /// assert_eq!(counts.received_pkts.get(), 1);
67 ///
68 /// counts.add(&Stats::SendBytes(1));
69 /// assert_eq!(counts.send_bytes.get(), 1);
70 ///
71 /// counts.add(&Stats::SendPkts(1));
72 /// assert_eq!(counts.send_pkts.get(), 1);
73 /// ```
74 pub fn add(&self, payload: &Stats) {
75 match payload {
76 Stats::ReceivedBytes(v) => self.received_bytes.add(*v),
77 Stats::ReceivedPkts(v) => self.received_pkts.add(*v),
78 Stats::SendBytes(v) => self.send_bytes.add(*v),
79 Stats::SendPkts(v) => self.send_pkts.add(*v),
80 Stats::ErrorPkts(v) => self.error_pkts.add(*v),
81 }
82 }
83}
84
85/// worker cluster statistics
86#[derive(Clone)]
87pub struct Statistics(Arc<RwLock<HashMap<Identifier, Counts<Count>>>>);
88
89impl Default for Statistics {
90 #[cfg(feature = "grpc")]
91 fn default() -> Self {
92 use ahash::HashMapExt;
93
94 Self(Arc::new(RwLock::new(HashMap::with_capacity(1024))))
95 }
96
97 // There's no need to take up so much memory when you don't have stats enabled.
98 #[cfg(not(feature = "grpc"))]
99 fn default() -> Self {
100 Self(Default::default())
101 }
102}
103
104impl Statistics {
105 /// get signal sender
106 ///
107 /// The signal sender can notify the statisticsing instance to update
108 /// internal statistics.
109 ///
110 /// # Example
111 ///
112 /// ```
113 /// use turn_server::statistics::*;
114 /// use turn_server::service::session::Identifier;
115 ///
116 /// let statistics = Statistics::default();
117 /// let sender = statistics.get_reporter();
118 ///
119 /// let addr = Identifier {
120 /// source: "127.0.0.1:8080".parse().unwrap(),
121 /// interface: "127.0.0.1:3478".parse().unwrap(),
122 /// };
123 ///
124 /// sender.send(&addr, &[Stats::ReceivedBytes(100)]);
125 /// ```
126 pub fn get_reporter(&self) -> StatisticsReporter {
127 StatisticsReporter {
128 table: self.0.clone(),
129 }
130 }
131
132 /// Add an address to the watch list
133 ///
134 /// # Example
135 ///
136 /// ```
137 /// use turn_server::statistics::*;
138 /// use turn_server::service::session::Identifier;
139 ///
140 /// let statistics = Statistics::default();
141 ///
142 /// let addr = Identifier {
143 /// source: "127.0.0.1:8080".parse().unwrap(),
144 /// interface: "127.0.0.1:3478".parse().unwrap(),
145 /// };
146 ///
147 /// statistics.register(addr.clone());
148 /// assert_eq!(statistics.get(&addr).is_some(), true);
149 /// ```
150 pub fn register(&self, addr: Identifier) {
151 self.0.write().insert(
152 addr,
153 Counts {
154 received_bytes: Count::default(),
155 send_bytes: Count::default(),
156 received_pkts: Count::default(),
157 send_pkts: Count::default(),
158 error_pkts: Count::default(),
159 },
160 );
161 }
162
163 /// Remove an address from the watch list
164 ///
165 /// # Example
166 ///
167 /// ```
168 /// use turn_server::statistics::*;
169 /// use turn_server::service::session::Identifier;
170 ///
171 /// let statistics = Statistics::default();
172 ///
173 /// let addr = Identifier {
174 /// source: "127.0.0.1:8080".parse().unwrap(),
175 /// interface: "127.0.0.1:3478".parse().unwrap(),
176 /// };
177 ///
178 /// statistics.register(addr.clone());
179 /// assert_eq!(statistics.get(&addr).is_some(), true);
180 ///
181 /// statistics.unregister(&addr);
182 /// assert_eq!(statistics.get(&addr).is_some(), false);
183 /// ```
184 pub fn unregister(&self, addr: &Identifier) {
185 self.0.write().remove(addr);
186 }
187
188 /// Obtain a list of statistics from statisticsing
189 ///
190 /// The obtained list is in the same order as it was added.
191 ///
192 /// # Example
193 ///
194 /// ```
195 /// use turn_server::statistics::*;
196 /// use turn_server::service::session::Identifier;
197 ///
198 /// let statistics = Statistics::default();
199 ///
200 /// let addr = Identifier {
201 /// source: "127.0.0.1:8080".parse().unwrap(),
202 /// interface: "127.0.0.1:3478".parse().unwrap(),
203 /// };
204 ///
205 /// statistics.register(addr.clone());
206 /// assert_eq!(statistics.get(&addr).is_some(), true);
207 /// ```
208 pub fn get(&self, addr: &Identifier) -> Option<Counts<usize>> {
209 self.0.read().get(addr).map(|counts| Counts {
210 received_bytes: counts.received_bytes.get(),
211 received_pkts: counts.received_pkts.get(),
212 send_bytes: counts.send_bytes.get(),
213 send_pkts: counts.send_pkts.get(),
214 error_pkts: counts.error_pkts.get(),
215 })
216 }
217}
218
219/// statistics reporter
220///
221/// It is held by each worker, and status information can be sent to the
222/// statisticsing instance through this instance to update the internal
223/// statistical information of the statistics.
224#[derive(Clone)]
225#[allow(unused)]
226pub struct StatisticsReporter {
227 table: Arc<RwLock<HashMap<Identifier, Counts<Count>>>>,
228}
229
230impl StatisticsReporter {
231 #[allow(unused_variables)]
232 pub fn send(&self, addr: &Identifier, reports: &[Stats]) {
233 #[cfg(feature = "grpc")]
234 {
235 if let Some(counts) = self.table.read().get(addr) {
236 for item in reports {
237 counts.add(item);
238 }
239 }
240 }
241 }
242}