turn_server/
statistics.rs

1use std::sync::{
2    Arc,
3    atomic::{AtomicUsize, Ordering},
4};
5
6use ahash::HashMap;
7use parking_lot::RwLock;
8
9use crate::service::session::Identifier;
10
11/// The type of information passed in the statisticsing channel
12#[derive(Debug, Clone, Copy)]
13pub enum Stats {
14    ReceivedBytes(usize),
15    SendBytes(usize),
16    ReceivedPkts(usize),
17    SendPkts(usize),
18    ErrorPkts(usize),
19}
20
21pub trait Number {
22    fn add(&self, value: usize);
23    fn get(&self) -> usize;
24}
25
26#[derive(Default)]
27pub struct Count(AtomicUsize);
28
29impl Number for Count {
30    fn add(&self, value: usize) {
31        self.0.fetch_add(value, Ordering::Relaxed);
32    }
33
34    fn get(&self) -> usize {
35        self.0.load(Ordering::Relaxed)
36    }
37}
38
39/// Worker independent statisticsing statistics
40pub struct Counts<T> {
41    pub received_bytes: T,
42    pub send_bytes: T,
43    pub received_pkts: T,
44    pub send_pkts: T,
45    pub error_pkts: T,
46}
47
48impl<T: Number> Counts<T> {
49    /// # Example
50    ///
51    /// ```
52    /// use turn_server::statistics::*;
53    ///
54    /// let counts = Counts {
55    ///     received_bytes: Count::default(),
56    ///     send_bytes: Count::default(),
57    ///     received_pkts: Count::default(),
58    ///     send_pkts: Count::default(),
59    ///     error_pkts: Count::default(),
60    /// };
61    ///
62    /// counts.add(&Stats::ReceivedBytes(1));
63    /// assert_eq!(counts.received_bytes.get(), 1);
64    ///
65    /// counts.add(&Stats::ReceivedPkts(1));
66    /// assert_eq!(counts.received_pkts.get(), 1);
67    ///
68    /// counts.add(&Stats::SendBytes(1));
69    /// assert_eq!(counts.send_bytes.get(), 1);
70    ///
71    /// counts.add(&Stats::SendPkts(1));
72    /// assert_eq!(counts.send_pkts.get(), 1);
73    /// ```
74    pub fn add(&self, payload: &Stats) {
75        match payload {
76            Stats::ReceivedBytes(v) => self.received_bytes.add(*v),
77            Stats::ReceivedPkts(v) => self.received_pkts.add(*v),
78            Stats::SendBytes(v) => self.send_bytes.add(*v),
79            Stats::SendPkts(v) => self.send_pkts.add(*v),
80            Stats::ErrorPkts(v) => self.error_pkts.add(*v),
81        }
82    }
83}
84
85/// worker cluster statistics
86#[derive(Clone)]
87pub struct Statistics(Arc<RwLock<HashMap<Identifier, Counts<Count>>>>);
88
89impl Default for Statistics {
90    #[cfg(feature = "grpc")]
91    fn default() -> Self {
92        use ahash::HashMapExt;
93
94        Self(Arc::new(RwLock::new(HashMap::with_capacity(1024))))
95    }
96
97    // There's no need to take up so much memory when you don't have stats enabled.
98    #[cfg(not(feature = "grpc"))]
99    fn default() -> Self {
100        Self(Default::default())
101    }
102}
103
104impl Statistics {
105    /// get signal sender
106    ///
107    /// The signal sender can notify the statisticsing instance to update
108    /// internal statistics.
109    ///
110    /// # Example
111    ///
112    /// ```
113    /// use turn_server::statistics::*;
114    /// use turn_server::service::session::Identifier;
115    ///
116    /// let statistics = Statistics::default();
117    /// let sender = statistics.get_reporter();
118    ///
119    /// let addr = Identifier {
120    ///     source: "127.0.0.1:8080".parse().unwrap(),
121    ///     interface: "127.0.0.1:3478".parse().unwrap(),
122    /// };
123    ///
124    /// sender.send(&addr, &[Stats::ReceivedBytes(100)]);
125    /// ```
126    pub fn get_reporter(&self) -> StatisticsReporter {
127        StatisticsReporter {
128            table: self.0.clone(),
129        }
130    }
131
132    /// Add an address to the watch list
133    ///
134    /// # Example
135    ///
136    /// ```
137    /// use turn_server::statistics::*;
138    /// use turn_server::service::session::Identifier;
139    ///
140    /// let statistics = Statistics::default();
141    ///
142    /// let addr = Identifier {
143    ///     source: "127.0.0.1:8080".parse().unwrap(),
144    ///     interface: "127.0.0.1:3478".parse().unwrap(),
145    /// };
146    ///
147    /// statistics.register(addr.clone());
148    /// assert_eq!(statistics.get(&addr).is_some(), true);
149    /// ```
150    pub fn register(&self, addr: Identifier) {
151        self.0.write().insert(
152            addr,
153            Counts {
154                received_bytes: Count::default(),
155                send_bytes: Count::default(),
156                received_pkts: Count::default(),
157                send_pkts: Count::default(),
158                error_pkts: Count::default(),
159            },
160        );
161    }
162
163    /// Remove an address from the watch list
164    ///
165    /// # Example
166    ///
167    /// ```
168    /// use turn_server::statistics::*;
169    /// use turn_server::service::session::Identifier;
170    ///
171    /// let statistics = Statistics::default();
172    ///
173    /// let addr = Identifier {
174    ///     source: "127.0.0.1:8080".parse().unwrap(),
175    ///     interface: "127.0.0.1:3478".parse().unwrap(),
176    /// };
177    ///
178    /// statistics.register(addr.clone());
179    /// assert_eq!(statistics.get(&addr).is_some(), true);
180    ///
181    /// statistics.unregister(&addr);
182    /// assert_eq!(statistics.get(&addr).is_some(), false);
183    /// ```
184    pub fn unregister(&self, addr: &Identifier) {
185        self.0.write().remove(addr);
186    }
187
188    /// Obtain a list of statistics from statisticsing
189    ///
190    /// The obtained list is in the same order as it was added.
191    ///
192    /// # Example
193    ///
194    /// ```
195    /// use turn_server::statistics::*;
196    /// use turn_server::service::session::Identifier;
197    ///
198    /// let statistics = Statistics::default();
199    ///
200    /// let addr = Identifier {
201    ///     source: "127.0.0.1:8080".parse().unwrap(),
202    ///     interface: "127.0.0.1:3478".parse().unwrap(),
203    /// };
204    ///
205    /// statistics.register(addr.clone());
206    /// assert_eq!(statistics.get(&addr).is_some(), true);
207    /// ```
208    pub fn get(&self, addr: &Identifier) -> Option<Counts<usize>> {
209        self.0.read().get(addr).map(|counts| Counts {
210            received_bytes: counts.received_bytes.get(),
211            received_pkts: counts.received_pkts.get(),
212            send_bytes: counts.send_bytes.get(),
213            send_pkts: counts.send_pkts.get(),
214            error_pkts: counts.error_pkts.get(),
215        })
216    }
217}
218
219/// statistics reporter
220///
221/// It is held by each worker, and status information can be sent to the
222/// statisticsing instance through this instance to update the internal
223/// statistical information of the statistics.
224#[derive(Clone)]
225#[allow(unused)]
226pub struct StatisticsReporter {
227    table: Arc<RwLock<HashMap<Identifier, Counts<Count>>>>,
228}
229
230impl StatisticsReporter {
231    #[allow(unused_variables)]
232    pub fn send(&self, addr: &Identifier, reports: &[Stats]) {
233        #[cfg(feature = "grpc")]
234        {
235            if let Some(counts) = self.table.read().get(addr) {
236                for item in reports {
237                    counts.add(item);
238                }
239            }
240        }
241    }
242}