torrust_tracker/core/
statistics.rs1use std::sync::Arc;
21
22use futures::future::BoxFuture;
23use futures::FutureExt;
24#[cfg(test)]
25use mockall::{automock, predicate::str};
26use tokio::sync::mpsc::error::SendError;
27use tokio::sync::{mpsc, RwLock, RwLockReadGuard};
28
29const CHANNEL_BUFFER_SIZE: usize = 65_535;
30
31#[derive(Debug, PartialEq, Eq)]
40pub enum Event {
41 Tcp4Announce,
44 Tcp4Scrape,
45 Tcp6Announce,
46 Tcp6Scrape,
47 Udp4Connect,
48 Udp4Announce,
49 Udp4Scrape,
50 Udp6Connect,
51 Udp6Announce,
52 Udp6Scrape,
53}
54
55#[derive(Debug, PartialEq, Default)]
64pub struct Metrics {
65 pub tcp4_connections_handled: u64,
69 pub tcp4_announces_handled: u64,
71 pub tcp4_scrapes_handled: u64,
73 pub tcp6_connections_handled: u64,
75 pub tcp6_announces_handled: u64,
77 pub tcp6_scrapes_handled: u64,
79 pub udp4_connections_handled: u64,
81 pub udp4_announces_handled: u64,
83 pub udp4_scrapes_handled: u64,
85 pub udp6_connections_handled: u64,
87 pub udp6_announces_handled: u64,
89 pub udp6_scrapes_handled: u64,
91}
92
93pub struct Keeper {
98 pub repository: Repo,
99}
100
101impl Default for Keeper {
102 fn default() -> Self {
103 Self::new()
104 }
105}
106
107impl Keeper {
108 #[must_use]
109 pub fn new() -> Self {
110 Self { repository: Repo::new() }
111 }
112
113 #[must_use]
114 pub fn new_active_instance() -> (Box<dyn EventSender>, Repo) {
115 let mut stats_tracker = Self::new();
116
117 let stats_event_sender = stats_tracker.run_event_listener();
118
119 (stats_event_sender, stats_tracker.repository)
120 }
121
122 pub fn run_event_listener(&mut self) -> Box<dyn EventSender> {
123 let (sender, receiver) = mpsc::channel::<Event>(CHANNEL_BUFFER_SIZE);
124
125 let stats_repository = self.repository.clone();
126
127 tokio::spawn(async move { event_listener(receiver, stats_repository).await });
128
129 Box::new(Sender { sender })
130 }
131}
132
133async fn event_listener(mut receiver: mpsc::Receiver<Event>, stats_repository: Repo) {
134 while let Some(event) = receiver.recv().await {
135 event_handler(event, &stats_repository).await;
136 }
137}
138
139async fn event_handler(event: Event, stats_repository: &Repo) {
140 match event {
141 Event::Tcp4Announce => {
143 stats_repository.increase_tcp4_announces().await;
144 stats_repository.increase_tcp4_connections().await;
145 }
146 Event::Tcp4Scrape => {
147 stats_repository.increase_tcp4_scrapes().await;
148 stats_repository.increase_tcp4_connections().await;
149 }
150
151 Event::Tcp6Announce => {
153 stats_repository.increase_tcp6_announces().await;
154 stats_repository.increase_tcp6_connections().await;
155 }
156 Event::Tcp6Scrape => {
157 stats_repository.increase_tcp6_scrapes().await;
158 stats_repository.increase_tcp6_connections().await;
159 }
160
161 Event::Udp4Connect => {
163 stats_repository.increase_udp4_connections().await;
164 }
165 Event::Udp4Announce => {
166 stats_repository.increase_udp4_announces().await;
167 }
168 Event::Udp4Scrape => {
169 stats_repository.increase_udp4_scrapes().await;
170 }
171
172 Event::Udp6Connect => {
174 stats_repository.increase_udp6_connections().await;
175 }
176 Event::Udp6Announce => {
177 stats_repository.increase_udp6_announces().await;
178 }
179 Event::Udp6Scrape => {
180 stats_repository.increase_udp6_scrapes().await;
181 }
182 }
183
184 tracing::debug!("stats: {:?}", stats_repository.get_stats().await);
185}
186
187#[cfg_attr(test, automock)]
189pub trait EventSender: Sync + Send {
190 fn send_event(&self, event: Event) -> BoxFuture<'_, Option<Result<(), SendError<Event>>>>;
191}
192
193pub struct Sender {
198 sender: mpsc::Sender<Event>,
199}
200
201impl EventSender for Sender {
202 fn send_event(&self, event: Event) -> BoxFuture<'_, Option<Result<(), SendError<Event>>>> {
203 async move { Some(self.sender.send(event).await) }.boxed()
204 }
205}
206
207#[derive(Clone)]
209pub struct Repo {
210 pub stats: Arc<RwLock<Metrics>>,
211}
212
213impl Default for Repo {
214 fn default() -> Self {
215 Self::new()
216 }
217}
218
219impl Repo {
220 #[must_use]
221 pub fn new() -> Self {
222 Self {
223 stats: Arc::new(RwLock::new(Metrics::default())),
224 }
225 }
226
227 pub async fn get_stats(&self) -> RwLockReadGuard<'_, Metrics> {
228 self.stats.read().await
229 }
230
231 pub async fn increase_tcp4_announces(&self) {
232 let mut stats_lock = self.stats.write().await;
233 stats_lock.tcp4_announces_handled += 1;
234 drop(stats_lock);
235 }
236
237 pub async fn increase_tcp4_connections(&self) {
238 let mut stats_lock = self.stats.write().await;
239 stats_lock.tcp4_connections_handled += 1;
240 drop(stats_lock);
241 }
242
243 pub async fn increase_tcp4_scrapes(&self) {
244 let mut stats_lock = self.stats.write().await;
245 stats_lock.tcp4_scrapes_handled += 1;
246 drop(stats_lock);
247 }
248
249 pub async fn increase_tcp6_announces(&self) {
250 let mut stats_lock = self.stats.write().await;
251 stats_lock.tcp6_announces_handled += 1;
252 drop(stats_lock);
253 }
254
255 pub async fn increase_tcp6_connections(&self) {
256 let mut stats_lock = self.stats.write().await;
257 stats_lock.tcp6_connections_handled += 1;
258 drop(stats_lock);
259 }
260
261 pub async fn increase_tcp6_scrapes(&self) {
262 let mut stats_lock = self.stats.write().await;
263 stats_lock.tcp6_scrapes_handled += 1;
264 drop(stats_lock);
265 }
266
267 pub async fn increase_udp4_connections(&self) {
268 let mut stats_lock = self.stats.write().await;
269 stats_lock.udp4_connections_handled += 1;
270 drop(stats_lock);
271 }
272
273 pub async fn increase_udp4_announces(&self) {
274 let mut stats_lock = self.stats.write().await;
275 stats_lock.udp4_announces_handled += 1;
276 drop(stats_lock);
277 }
278
279 pub async fn increase_udp4_scrapes(&self) {
280 let mut stats_lock = self.stats.write().await;
281 stats_lock.udp4_scrapes_handled += 1;
282 drop(stats_lock);
283 }
284
285 pub async fn increase_udp6_connections(&self) {
286 let mut stats_lock = self.stats.write().await;
287 stats_lock.udp6_connections_handled += 1;
288 drop(stats_lock);
289 }
290
291 pub async fn increase_udp6_announces(&self) {
292 let mut stats_lock = self.stats.write().await;
293 stats_lock.udp6_announces_handled += 1;
294 drop(stats_lock);
295 }
296
297 pub async fn increase_udp6_scrapes(&self) {
298 let mut stats_lock = self.stats.write().await;
299 stats_lock.udp6_scrapes_handled += 1;
300 drop(stats_lock);
301 }
302}
303
304#[cfg(test)]
305mod tests {
306
307 mod stats_tracker {
308 use crate::core::statistics::{Event, Keeper, Metrics};
309
310 #[tokio::test]
311 async fn should_contain_the_tracker_statistics() {
312 let stats_tracker = Keeper::new();
313
314 let stats = stats_tracker.repository.get_stats().await;
315
316 assert_eq!(stats.tcp4_announces_handled, Metrics::default().tcp4_announces_handled);
317 }
318
319 #[tokio::test]
320 async fn should_create_an_event_sender_to_send_statistical_events() {
321 let mut stats_tracker = Keeper::new();
322
323 let event_sender = stats_tracker.run_event_listener();
324
325 let result = event_sender.send_event(Event::Udp4Connect).await;
326
327 assert!(result.is_some());
328 }
329 }
330
331 mod event_handler {
332 use crate::core::statistics::{event_handler, Event, Repo};
333
334 #[tokio::test]
335 async fn should_increase_the_tcp4_announces_counter_when_it_receives_a_tcp4_announce_event() {
336 let stats_repository = Repo::new();
337
338 event_handler(Event::Tcp4Announce, &stats_repository).await;
339
340 let stats = stats_repository.get_stats().await;
341
342 assert_eq!(stats.tcp4_announces_handled, 1);
343 }
344
345 #[tokio::test]
346 async fn should_increase_the_tcp4_connections_counter_when_it_receives_a_tcp4_announce_event() {
347 let stats_repository = Repo::new();
348
349 event_handler(Event::Tcp4Announce, &stats_repository).await;
350
351 let stats = stats_repository.get_stats().await;
352
353 assert_eq!(stats.tcp4_connections_handled, 1);
354 }
355
356 #[tokio::test]
357 async fn should_increase_the_tcp4_scrapes_counter_when_it_receives_a_tcp4_scrape_event() {
358 let stats_repository = Repo::new();
359
360 event_handler(Event::Tcp4Scrape, &stats_repository).await;
361
362 let stats = stats_repository.get_stats().await;
363
364 assert_eq!(stats.tcp4_scrapes_handled, 1);
365 }
366
367 #[tokio::test]
368 async fn should_increase_the_tcp4_connections_counter_when_it_receives_a_tcp4_scrape_event() {
369 let stats_repository = Repo::new();
370
371 event_handler(Event::Tcp4Scrape, &stats_repository).await;
372
373 let stats = stats_repository.get_stats().await;
374
375 assert_eq!(stats.tcp4_connections_handled, 1);
376 }
377
378 #[tokio::test]
379 async fn should_increase_the_tcp6_announces_counter_when_it_receives_a_tcp6_announce_event() {
380 let stats_repository = Repo::new();
381
382 event_handler(Event::Tcp6Announce, &stats_repository).await;
383
384 let stats = stats_repository.get_stats().await;
385
386 assert_eq!(stats.tcp6_announces_handled, 1);
387 }
388
389 #[tokio::test]
390 async fn should_increase_the_tcp6_connections_counter_when_it_receives_a_tcp6_announce_event() {
391 let stats_repository = Repo::new();
392
393 event_handler(Event::Tcp6Announce, &stats_repository).await;
394
395 let stats = stats_repository.get_stats().await;
396
397 assert_eq!(stats.tcp6_connections_handled, 1);
398 }
399
400 #[tokio::test]
401 async fn should_increase_the_tcp6_scrapes_counter_when_it_receives_a_tcp6_scrape_event() {
402 let stats_repository = Repo::new();
403
404 event_handler(Event::Tcp6Scrape, &stats_repository).await;
405
406 let stats = stats_repository.get_stats().await;
407
408 assert_eq!(stats.tcp6_scrapes_handled, 1);
409 }
410
411 #[tokio::test]
412 async fn should_increase_the_tcp6_connections_counter_when_it_receives_a_tcp6_scrape_event() {
413 let stats_repository = Repo::new();
414
415 event_handler(Event::Tcp6Scrape, &stats_repository).await;
416
417 let stats = stats_repository.get_stats().await;
418
419 assert_eq!(stats.tcp6_connections_handled, 1);
420 }
421
422 #[tokio::test]
423 async fn should_increase_the_udp4_connections_counter_when_it_receives_a_udp4_connect_event() {
424 let stats_repository = Repo::new();
425
426 event_handler(Event::Udp4Connect, &stats_repository).await;
427
428 let stats = stats_repository.get_stats().await;
429
430 assert_eq!(stats.udp4_connections_handled, 1);
431 }
432
433 #[tokio::test]
434 async fn should_increase_the_udp4_announces_counter_when_it_receives_a_udp4_announce_event() {
435 let stats_repository = Repo::new();
436
437 event_handler(Event::Udp4Announce, &stats_repository).await;
438
439 let stats = stats_repository.get_stats().await;
440
441 assert_eq!(stats.udp4_announces_handled, 1);
442 }
443
444 #[tokio::test]
445 async fn should_increase_the_udp4_scrapes_counter_when_it_receives_a_udp4_scrape_event() {
446 let stats_repository = Repo::new();
447
448 event_handler(Event::Udp4Scrape, &stats_repository).await;
449
450 let stats = stats_repository.get_stats().await;
451
452 assert_eq!(stats.udp4_scrapes_handled, 1);
453 }
454
455 #[tokio::test]
456 async fn should_increase_the_udp6_connections_counter_when_it_receives_a_udp6_connect_event() {
457 let stats_repository = Repo::new();
458
459 event_handler(Event::Udp6Connect, &stats_repository).await;
460
461 let stats = stats_repository.get_stats().await;
462
463 assert_eq!(stats.udp6_connections_handled, 1);
464 }
465
466 #[tokio::test]
467 async fn should_increase_the_udp6_announces_counter_when_it_receives_a_udp6_announce_event() {
468 let stats_repository = Repo::new();
469
470 event_handler(Event::Udp6Announce, &stats_repository).await;
471
472 let stats = stats_repository.get_stats().await;
473
474 assert_eq!(stats.udp6_announces_handled, 1);
475 }
476
477 #[tokio::test]
478 async fn should_increase_the_udp6_scrapes_counter_when_it_receives_a_udp6_scrape_event() {
479 let stats_repository = Repo::new();
480
481 event_handler(Event::Udp6Scrape, &stats_repository).await;
482
483 let stats = stats_repository.get_stats().await;
484
485 assert_eq!(stats.udp6_scrapes_handled, 1);
486 }
487 }
488}