torrust_tracker/servers/udp/server/
mod.rs

1//! Module to handle the UDP server instances.
2use std::fmt::Debug;
3
4use derive_more::derive::Display;
5use thiserror::Error;
6
7use super::RawRequest;
8
9pub mod bound_socket;
10pub mod launcher;
11pub mod processor;
12pub mod receiver;
13pub mod request_buffer;
14pub mod spawner;
15pub mod states;
16
17/// Error that can occur when starting or stopping the UDP server.
18///
19/// Some errors triggered while starting the server are:
20///
21/// - The server cannot bind to the given address.
22/// - It cannot get the bound address.
23///
24/// Some errors triggered while stopping the server are:
25///
26/// - The [`Server`] cannot send the shutdown signal to the spawned UDP service thread.
27#[derive(Debug, Error)]
28pub enum UdpError {
29    #[error("Any error to do with the socket")]
30    FailedToBindSocket(std::io::Error),
31
32    #[error("Any error to do with starting or stopping the sever")]
33    FailedToStartOrStopServer(String),
34}
35
36/// A UDP server.
37///
38/// It's an state machine. Configurations cannot be changed. This struct
39/// represents concrete configuration and state. It allows to start and stop the
40/// server but always keeping the same configuration.
41///
42/// > **NOTICE**: if the configurations changes after running the server it will
43/// > reset to the initial value after stopping the server. This struct is not
44/// > intended to persist configurations between runs.
45#[allow(clippy::module_name_repetitions)]
46#[derive(Debug, Display)]
47pub struct Server<S>
48where
49    S: std::fmt::Debug + std::fmt::Display,
50{
51    /// The state of the server: `running` or `stopped`.
52    pub state: S,
53}
54
55#[cfg(test)]
56mod tests {
57    use std::sync::Arc;
58    use std::time::Duration;
59
60    use torrust_tracker_test_helpers::configuration::ephemeral_public;
61
62    use super::spawner::Spawner;
63    use super::Server;
64    use crate::bootstrap::app::initialize_with_configuration;
65    use crate::servers::registar::Registar;
66
67    #[tokio::test]
68    async fn it_should_be_able_to_start_and_stop() {
69        let cfg = Arc::new(ephemeral_public());
70        let tracker = initialize_with_configuration(&cfg);
71        let udp_trackers = cfg.udp_trackers.clone().expect("missing UDP trackers configuration");
72        let config = &udp_trackers[0];
73        let bind_to = config.bind_address;
74        let register = &Registar::default();
75
76        let stopped = Server::new(Spawner::new(bind_to));
77
78        let started = stopped
79            .start(tracker, register.give_form())
80            .await
81            .expect("it should start the server");
82
83        let stopped = started.stop().await.expect("it should stop the server");
84
85        tokio::time::sleep(Duration::from_secs(1)).await;
86
87        assert_eq!(stopped.state.spawner.bind_to, bind_to);
88    }
89
90    #[tokio::test]
91    async fn it_should_be_able_to_start_and_stop_with_wait() {
92        let cfg = Arc::new(ephemeral_public());
93        let tracker = initialize_with_configuration(&cfg);
94        let config = &cfg.udp_trackers.as_ref().unwrap().first().unwrap();
95        let bind_to = config.bind_address;
96        let register = &Registar::default();
97
98        let stopped = Server::new(Spawner::new(bind_to));
99
100        let started = stopped
101            .start(tracker, register.give_form())
102            .await
103            .expect("it should start the server");
104
105        tokio::time::sleep(Duration::from_secs(1)).await;
106
107        let stopped = started.stop().await.expect("it should stop the server");
108
109        tokio::time::sleep(Duration::from_secs(1)).await;
110
111        assert_eq!(stopped.state.spawner.bind_to, bind_to);
112    }
113}
114
115/// Todo: submit test to tokio documentation.
116#[cfg(test)]
117mod test_tokio {
118    use std::sync::Arc;
119    use std::time::Duration;
120
121    use tokio::sync::Barrier;
122    use tokio::task::JoinSet;
123
124    #[tokio::test]
125    async fn test_barrier_with_aborted_tasks() {
126        // Create a barrier that requires 10 tasks to proceed.
127        let barrier = Arc::new(Barrier::new(10));
128        let mut tasks = JoinSet::default();
129        let mut handles = Vec::default();
130
131        // Set Barrier to 9/10.
132        for _ in 0..9 {
133            let c = barrier.clone();
134            handles.push(tasks.spawn(async move {
135                c.wait().await;
136            }));
137        }
138
139        // Abort two tasks: Barrier: 7/10.
140        for _ in 0..2 {
141            if let Some(handle) = handles.pop() {
142                handle.abort();
143            }
144        }
145
146        // Spawn a single task: Barrier 8/10.
147        let c = barrier.clone();
148        handles.push(tasks.spawn(async move {
149            c.wait().await;
150        }));
151
152        // give a chance fro the barrier to release.
153        tokio::time::sleep(Duration::from_millis(50)).await;
154
155        // assert that the barrier isn't removed, i.e. 8, not 10.
156        for h in &handles {
157            assert!(!h.is_finished());
158        }
159
160        // Spawn two more tasks to trigger the barrier release: Barrier 10/10.
161        for _ in 0..2 {
162            let c = barrier.clone();
163            handles.push(tasks.spawn(async move {
164                c.wait().await;
165            }));
166        }
167
168        // give a chance fro the barrier to release.
169        tokio::time::sleep(Duration::from_millis(50)).await;
170
171        // assert that the barrier has been triggered
172        for h in &handles {
173            assert!(h.is_finished());
174        }
175
176        tasks.shutdown().await;
177    }
178}