torrust_tracker/core/
statistics.rs

1//! Structs to collect and keep tracker metrics.
2//!
3//! The tracker collects metrics such as:
4//!
5//! - Number of connections handled
6//! - Number of `announce` requests handled
7//! - Number of `scrape` request handled
8//!
9//! These metrics are collected for each connection type: UDP and HTTP and
10//! also for each IP version used by the peers: IPv4 and IPv6.
11//!
12//! > Notice: that UDP tracker have an specific `connection` request. For the HTTP metrics the counter counts one connection for each `announce` or `scrape` request.
13//!
14//! The data is collected by using an `event-sender -> event listener` model.
15//!
16//! The tracker uses an [`statistics::EventSender`](crate::core::statistics::EventSender) instance to send an event.
17//! The [`statistics::Keeper`](crate::core::statistics::Keeper) listens to new events and uses the [`statistics::Repo`](crate::core::statistics::Repo) to upgrade and store metrics.
18//!
19//! See the [`statistics::Event`](crate::core::statistics::Event) enum to check which events are available.
20use std::sync::Arc;
21
22use futures::future::BoxFuture;
23use futures::FutureExt;
24#[cfg(test)]
25use mockall::{automock, predicate::str};
26use tokio::sync::mpsc::error::SendError;
27use tokio::sync::{mpsc, RwLock, RwLockReadGuard};
28
29const CHANNEL_BUFFER_SIZE: usize = 65_535;
30
31/// An statistics event. It is used to collect tracker metrics.
32///
33/// - `Tcp` prefix means the event was triggered by the HTTP tracker
34/// - `Udp` prefix means the event was triggered by the UDP tracker
35/// - `4` or `6` prefixes means the IP version used by the peer
36/// - Finally the event suffix is the type of request: `announce`, `scrape` or `connection`
37///
38/// > NOTE: HTTP trackers do not use `connection` requests.
39#[derive(Debug, PartialEq, Eq)]
40pub enum Event {
41    // code-review: consider one single event for request type with data: Event::Announce { scheme: HTTPorUDP, ip_version: V4orV6 }
42    // Attributes are enums too.
43    Tcp4Announce,
44    Tcp4Scrape,
45    Tcp6Announce,
46    Tcp6Scrape,
47    Udp4Connect,
48    Udp4Announce,
49    Udp4Scrape,
50    Udp6Connect,
51    Udp6Announce,
52    Udp6Scrape,
53}
54
55/// Metrics collected by the tracker.
56///
57/// - Number of connections handled
58/// - Number of `announce` requests handled
59/// - Number of `scrape` request handled
60///
61/// These metrics are collected for each connection type: UDP and HTTP
62/// and also for each IP version used by the peers: IPv4 and IPv6.
63#[derive(Debug, PartialEq, Default)]
64pub struct Metrics {
65    /// Total number of TCP (HTTP tracker) connections from IPv4 peers.
66    /// Since the HTTP tracker spec does not require a handshake, this metric
67    /// increases for every HTTP request.
68    pub tcp4_connections_handled: u64,
69    /// Total number of TCP (HTTP tracker) `announce` requests from IPv4 peers.
70    pub tcp4_announces_handled: u64,
71    /// Total number of TCP (HTTP tracker) `scrape` requests from IPv4 peers.
72    pub tcp4_scrapes_handled: u64,
73    /// Total number of TCP (HTTP tracker) connections from IPv6 peers.
74    pub tcp6_connections_handled: u64,
75    /// Total number of TCP (HTTP tracker) `announce` requests from IPv6 peers.
76    pub tcp6_announces_handled: u64,
77    /// Total number of TCP (HTTP tracker) `scrape` requests from IPv6 peers.
78    pub tcp6_scrapes_handled: u64,
79    /// Total number of UDP (UDP tracker) connections from IPv4 peers.
80    pub udp4_connections_handled: u64,
81    /// Total number of UDP (UDP tracker) `announce` requests from IPv4 peers.
82    pub udp4_announces_handled: u64,
83    /// Total number of UDP (UDP tracker) `scrape` requests from IPv4 peers.
84    pub udp4_scrapes_handled: u64,
85    /// Total number of UDP (UDP tracker) `connection` requests from IPv6 peers.
86    pub udp6_connections_handled: u64,
87    /// Total number of UDP (UDP tracker) `announce` requests from IPv6 peers.
88    pub udp6_announces_handled: u64,
89    /// Total number of UDP (UDP tracker) `scrape` requests from IPv6 peers.
90    pub udp6_scrapes_handled: u64,
91}
92
93/// The service responsible for keeping tracker metrics (listening to statistics events and handle them).
94///
95/// It actively listen to new statistics events. When it receives a new event
96/// it accordingly increases the counters.
97pub struct Keeper {
98    pub repository: Repo,
99}
100
101impl Default for Keeper {
102    fn default() -> Self {
103        Self::new()
104    }
105}
106
107impl Keeper {
108    #[must_use]
109    pub fn new() -> Self {
110        Self { repository: Repo::new() }
111    }
112
113    #[must_use]
114    pub fn new_active_instance() -> (Box<dyn EventSender>, Repo) {
115        let mut stats_tracker = Self::new();
116
117        let stats_event_sender = stats_tracker.run_event_listener();
118
119        (stats_event_sender, stats_tracker.repository)
120    }
121
122    pub fn run_event_listener(&mut self) -> Box<dyn EventSender> {
123        let (sender, receiver) = mpsc::channel::<Event>(CHANNEL_BUFFER_SIZE);
124
125        let stats_repository = self.repository.clone();
126
127        tokio::spawn(async move { event_listener(receiver, stats_repository).await });
128
129        Box::new(Sender { sender })
130    }
131}
132
133async fn event_listener(mut receiver: mpsc::Receiver<Event>, stats_repository: Repo) {
134    while let Some(event) = receiver.recv().await {
135        event_handler(event, &stats_repository).await;
136    }
137}
138
139async fn event_handler(event: Event, stats_repository: &Repo) {
140    match event {
141        // TCP4
142        Event::Tcp4Announce => {
143            stats_repository.increase_tcp4_announces().await;
144            stats_repository.increase_tcp4_connections().await;
145        }
146        Event::Tcp4Scrape => {
147            stats_repository.increase_tcp4_scrapes().await;
148            stats_repository.increase_tcp4_connections().await;
149        }
150
151        // TCP6
152        Event::Tcp6Announce => {
153            stats_repository.increase_tcp6_announces().await;
154            stats_repository.increase_tcp6_connections().await;
155        }
156        Event::Tcp6Scrape => {
157            stats_repository.increase_tcp6_scrapes().await;
158            stats_repository.increase_tcp6_connections().await;
159        }
160
161        // UDP4
162        Event::Udp4Connect => {
163            stats_repository.increase_udp4_connections().await;
164        }
165        Event::Udp4Announce => {
166            stats_repository.increase_udp4_announces().await;
167        }
168        Event::Udp4Scrape => {
169            stats_repository.increase_udp4_scrapes().await;
170        }
171
172        // UDP6
173        Event::Udp6Connect => {
174            stats_repository.increase_udp6_connections().await;
175        }
176        Event::Udp6Announce => {
177            stats_repository.increase_udp6_announces().await;
178        }
179        Event::Udp6Scrape => {
180            stats_repository.increase_udp6_scrapes().await;
181        }
182    }
183
184    tracing::debug!("stats: {:?}", stats_repository.get_stats().await);
185}
186
187/// A trait to allow sending statistics events
188#[cfg_attr(test, automock)]
189pub trait EventSender: Sync + Send {
190    fn send_event(&self, event: Event) -> BoxFuture<'_, Option<Result<(), SendError<Event>>>>;
191}
192
193/// An [`statistics::EventSender`](crate::core::statistics::EventSender) implementation.
194///
195/// It uses a channel sender to send the statistic events. The channel is created by a
196/// [`statistics::Keeper`](crate::core::statistics::Keeper)
197pub struct Sender {
198    sender: mpsc::Sender<Event>,
199}
200
201impl EventSender for Sender {
202    fn send_event(&self, event: Event) -> BoxFuture<'_, Option<Result<(), SendError<Event>>>> {
203        async move { Some(self.sender.send(event).await) }.boxed()
204    }
205}
206
207/// A repository for the tracker metrics.
208#[derive(Clone)]
209pub struct Repo {
210    pub stats: Arc<RwLock<Metrics>>,
211}
212
213impl Default for Repo {
214    fn default() -> Self {
215        Self::new()
216    }
217}
218
219impl Repo {
220    #[must_use]
221    pub fn new() -> Self {
222        Self {
223            stats: Arc::new(RwLock::new(Metrics::default())),
224        }
225    }
226
227    pub async fn get_stats(&self) -> RwLockReadGuard<'_, Metrics> {
228        self.stats.read().await
229    }
230
231    pub async fn increase_tcp4_announces(&self) {
232        let mut stats_lock = self.stats.write().await;
233        stats_lock.tcp4_announces_handled += 1;
234        drop(stats_lock);
235    }
236
237    pub async fn increase_tcp4_connections(&self) {
238        let mut stats_lock = self.stats.write().await;
239        stats_lock.tcp4_connections_handled += 1;
240        drop(stats_lock);
241    }
242
243    pub async fn increase_tcp4_scrapes(&self) {
244        let mut stats_lock = self.stats.write().await;
245        stats_lock.tcp4_scrapes_handled += 1;
246        drop(stats_lock);
247    }
248
249    pub async fn increase_tcp6_announces(&self) {
250        let mut stats_lock = self.stats.write().await;
251        stats_lock.tcp6_announces_handled += 1;
252        drop(stats_lock);
253    }
254
255    pub async fn increase_tcp6_connections(&self) {
256        let mut stats_lock = self.stats.write().await;
257        stats_lock.tcp6_connections_handled += 1;
258        drop(stats_lock);
259    }
260
261    pub async fn increase_tcp6_scrapes(&self) {
262        let mut stats_lock = self.stats.write().await;
263        stats_lock.tcp6_scrapes_handled += 1;
264        drop(stats_lock);
265    }
266
267    pub async fn increase_udp4_connections(&self) {
268        let mut stats_lock = self.stats.write().await;
269        stats_lock.udp4_connections_handled += 1;
270        drop(stats_lock);
271    }
272
273    pub async fn increase_udp4_announces(&self) {
274        let mut stats_lock = self.stats.write().await;
275        stats_lock.udp4_announces_handled += 1;
276        drop(stats_lock);
277    }
278
279    pub async fn increase_udp4_scrapes(&self) {
280        let mut stats_lock = self.stats.write().await;
281        stats_lock.udp4_scrapes_handled += 1;
282        drop(stats_lock);
283    }
284
285    pub async fn increase_udp6_connections(&self) {
286        let mut stats_lock = self.stats.write().await;
287        stats_lock.udp6_connections_handled += 1;
288        drop(stats_lock);
289    }
290
291    pub async fn increase_udp6_announces(&self) {
292        let mut stats_lock = self.stats.write().await;
293        stats_lock.udp6_announces_handled += 1;
294        drop(stats_lock);
295    }
296
297    pub async fn increase_udp6_scrapes(&self) {
298        let mut stats_lock = self.stats.write().await;
299        stats_lock.udp6_scrapes_handled += 1;
300        drop(stats_lock);
301    }
302}
303
304#[cfg(test)]
305mod tests {
306
307    mod stats_tracker {
308        use crate::core::statistics::{Event, Keeper, Metrics};
309
310        #[tokio::test]
311        async fn should_contain_the_tracker_statistics() {
312            let stats_tracker = Keeper::new();
313
314            let stats = stats_tracker.repository.get_stats().await;
315
316            assert_eq!(stats.tcp4_announces_handled, Metrics::default().tcp4_announces_handled);
317        }
318
319        #[tokio::test]
320        async fn should_create_an_event_sender_to_send_statistical_events() {
321            let mut stats_tracker = Keeper::new();
322
323            let event_sender = stats_tracker.run_event_listener();
324
325            let result = event_sender.send_event(Event::Udp4Connect).await;
326
327            assert!(result.is_some());
328        }
329    }
330
331    mod event_handler {
332        use crate::core::statistics::{event_handler, Event, Repo};
333
334        #[tokio::test]
335        async fn should_increase_the_tcp4_announces_counter_when_it_receives_a_tcp4_announce_event() {
336            let stats_repository = Repo::new();
337
338            event_handler(Event::Tcp4Announce, &stats_repository).await;
339
340            let stats = stats_repository.get_stats().await;
341
342            assert_eq!(stats.tcp4_announces_handled, 1);
343        }
344
345        #[tokio::test]
346        async fn should_increase_the_tcp4_connections_counter_when_it_receives_a_tcp4_announce_event() {
347            let stats_repository = Repo::new();
348
349            event_handler(Event::Tcp4Announce, &stats_repository).await;
350
351            let stats = stats_repository.get_stats().await;
352
353            assert_eq!(stats.tcp4_connections_handled, 1);
354        }
355
356        #[tokio::test]
357        async fn should_increase_the_tcp4_scrapes_counter_when_it_receives_a_tcp4_scrape_event() {
358            let stats_repository = Repo::new();
359
360            event_handler(Event::Tcp4Scrape, &stats_repository).await;
361
362            let stats = stats_repository.get_stats().await;
363
364            assert_eq!(stats.tcp4_scrapes_handled, 1);
365        }
366
367        #[tokio::test]
368        async fn should_increase_the_tcp4_connections_counter_when_it_receives_a_tcp4_scrape_event() {
369            let stats_repository = Repo::new();
370
371            event_handler(Event::Tcp4Scrape, &stats_repository).await;
372
373            let stats = stats_repository.get_stats().await;
374
375            assert_eq!(stats.tcp4_connections_handled, 1);
376        }
377
378        #[tokio::test]
379        async fn should_increase_the_tcp6_announces_counter_when_it_receives_a_tcp6_announce_event() {
380            let stats_repository = Repo::new();
381
382            event_handler(Event::Tcp6Announce, &stats_repository).await;
383
384            let stats = stats_repository.get_stats().await;
385
386            assert_eq!(stats.tcp6_announces_handled, 1);
387        }
388
389        #[tokio::test]
390        async fn should_increase_the_tcp6_connections_counter_when_it_receives_a_tcp6_announce_event() {
391            let stats_repository = Repo::new();
392
393            event_handler(Event::Tcp6Announce, &stats_repository).await;
394
395            let stats = stats_repository.get_stats().await;
396
397            assert_eq!(stats.tcp6_connections_handled, 1);
398        }
399
400        #[tokio::test]
401        async fn should_increase_the_tcp6_scrapes_counter_when_it_receives_a_tcp6_scrape_event() {
402            let stats_repository = Repo::new();
403
404            event_handler(Event::Tcp6Scrape, &stats_repository).await;
405
406            let stats = stats_repository.get_stats().await;
407
408            assert_eq!(stats.tcp6_scrapes_handled, 1);
409        }
410
411        #[tokio::test]
412        async fn should_increase_the_tcp6_connections_counter_when_it_receives_a_tcp6_scrape_event() {
413            let stats_repository = Repo::new();
414
415            event_handler(Event::Tcp6Scrape, &stats_repository).await;
416
417            let stats = stats_repository.get_stats().await;
418
419            assert_eq!(stats.tcp6_connections_handled, 1);
420        }
421
422        #[tokio::test]
423        async fn should_increase_the_udp4_connections_counter_when_it_receives_a_udp4_connect_event() {
424            let stats_repository = Repo::new();
425
426            event_handler(Event::Udp4Connect, &stats_repository).await;
427
428            let stats = stats_repository.get_stats().await;
429
430            assert_eq!(stats.udp4_connections_handled, 1);
431        }
432
433        #[tokio::test]
434        async fn should_increase_the_udp4_announces_counter_when_it_receives_a_udp4_announce_event() {
435            let stats_repository = Repo::new();
436
437            event_handler(Event::Udp4Announce, &stats_repository).await;
438
439            let stats = stats_repository.get_stats().await;
440
441            assert_eq!(stats.udp4_announces_handled, 1);
442        }
443
444        #[tokio::test]
445        async fn should_increase_the_udp4_scrapes_counter_when_it_receives_a_udp4_scrape_event() {
446            let stats_repository = Repo::new();
447
448            event_handler(Event::Udp4Scrape, &stats_repository).await;
449
450            let stats = stats_repository.get_stats().await;
451
452            assert_eq!(stats.udp4_scrapes_handled, 1);
453        }
454
455        #[tokio::test]
456        async fn should_increase_the_udp6_connections_counter_when_it_receives_a_udp6_connect_event() {
457            let stats_repository = Repo::new();
458
459            event_handler(Event::Udp6Connect, &stats_repository).await;
460
461            let stats = stats_repository.get_stats().await;
462
463            assert_eq!(stats.udp6_connections_handled, 1);
464        }
465
466        #[tokio::test]
467        async fn should_increase_the_udp6_announces_counter_when_it_receives_a_udp6_announce_event() {
468            let stats_repository = Repo::new();
469
470            event_handler(Event::Udp6Announce, &stats_repository).await;
471
472            let stats = stats_repository.get_stats().await;
473
474            assert_eq!(stats.udp6_announces_handled, 1);
475        }
476
477        #[tokio::test]
478        async fn should_increase_the_udp6_scrapes_counter_when_it_receives_a_udp6_scrape_event() {
479            let stats_repository = Repo::new();
480
481            event_handler(Event::Udp6Scrape, &stats_repository).await;
482
483            let stats = stats_repository.get_stats().await;
484
485            assert_eq!(stats.udp6_scrapes_handled, 1);
486        }
487    }
488}