torrust_tracker/servers/udp/server/
mod.rs1use std::fmt::Debug;
3
4use derive_more::derive::Display;
5use thiserror::Error;
6
7use super::RawRequest;
8
9pub mod bound_socket;
10pub mod launcher;
11pub mod processor;
12pub mod receiver;
13pub mod request_buffer;
14pub mod spawner;
15pub mod states;
16
17#[derive(Debug, Error)]
28pub enum UdpError {
29 #[error("Any error to do with the socket")]
30 FailedToBindSocket(std::io::Error),
31
32 #[error("Any error to do with starting or stopping the sever")]
33 FailedToStartOrStopServer(String),
34}
35
36#[allow(clippy::module_name_repetitions)]
46#[derive(Debug, Display)]
47pub struct Server<S>
48where
49 S: std::fmt::Debug + std::fmt::Display,
50{
51 pub state: S,
53}
54
55#[cfg(test)]
56mod tests {
57 use std::sync::Arc;
58 use std::time::Duration;
59
60 use torrust_tracker_test_helpers::configuration::ephemeral_public;
61
62 use super::spawner::Spawner;
63 use super::Server;
64 use crate::bootstrap::app::initialize_with_configuration;
65 use crate::servers::registar::Registar;
66
67 #[tokio::test]
68 async fn it_should_be_able_to_start_and_stop() {
69 let cfg = Arc::new(ephemeral_public());
70 let tracker = initialize_with_configuration(&cfg);
71 let udp_trackers = cfg.udp_trackers.clone().expect("missing UDP trackers configuration");
72 let config = &udp_trackers[0];
73 let bind_to = config.bind_address;
74 let register = &Registar::default();
75
76 let stopped = Server::new(Spawner::new(bind_to));
77
78 let started = stopped
79 .start(tracker, register.give_form())
80 .await
81 .expect("it should start the server");
82
83 let stopped = started.stop().await.expect("it should stop the server");
84
85 tokio::time::sleep(Duration::from_secs(1)).await;
86
87 assert_eq!(stopped.state.spawner.bind_to, bind_to);
88 }
89
90 #[tokio::test]
91 async fn it_should_be_able_to_start_and_stop_with_wait() {
92 let cfg = Arc::new(ephemeral_public());
93 let tracker = initialize_with_configuration(&cfg);
94 let config = &cfg.udp_trackers.as_ref().unwrap().first().unwrap();
95 let bind_to = config.bind_address;
96 let register = &Registar::default();
97
98 let stopped = Server::new(Spawner::new(bind_to));
99
100 let started = stopped
101 .start(tracker, register.give_form())
102 .await
103 .expect("it should start the server");
104
105 tokio::time::sleep(Duration::from_secs(1)).await;
106
107 let stopped = started.stop().await.expect("it should stop the server");
108
109 tokio::time::sleep(Duration::from_secs(1)).await;
110
111 assert_eq!(stopped.state.spawner.bind_to, bind_to);
112 }
113}
114
115#[cfg(test)]
117mod test_tokio {
118 use std::sync::Arc;
119 use std::time::Duration;
120
121 use tokio::sync::Barrier;
122 use tokio::task::JoinSet;
123
124 #[tokio::test]
125 async fn test_barrier_with_aborted_tasks() {
126 let barrier = Arc::new(Barrier::new(10));
128 let mut tasks = JoinSet::default();
129 let mut handles = Vec::default();
130
131 for _ in 0..9 {
133 let c = barrier.clone();
134 handles.push(tasks.spawn(async move {
135 c.wait().await;
136 }));
137 }
138
139 for _ in 0..2 {
141 if let Some(handle) = handles.pop() {
142 handle.abort();
143 }
144 }
145
146 let c = barrier.clone();
148 handles.push(tasks.spawn(async move {
149 c.wait().await;
150 }));
151
152 tokio::time::sleep(Duration::from_millis(50)).await;
154
155 for h in &handles {
157 assert!(!h.is_finished());
158 }
159
160 for _ in 0..2 {
162 let c = barrier.clone();
163 handles.push(tasks.spawn(async move {
164 c.wait().await;
165 }));
166 }
167
168 tokio::time::sleep(Duration::from_millis(50)).await;
170
171 for h in &handles {
173 assert!(h.is_finished());
174 }
175
176 tasks.shutdown().await;
177 }
178}