torrust_tracker/servers/http/
server.rs1use std::net::SocketAddr;
3use std::sync::Arc;
4
5use axum_server::tls_rustls::RustlsConfig;
6use axum_server::Handle;
7use derive_more::Constructor;
8use futures::future::BoxFuture;
9use tokio::sync::oneshot::{Receiver, Sender};
10use tracing::instrument;
11
12use super::v1::routes::router;
13use crate::bootstrap::jobs::Started;
14use crate::core::Tracker;
15use crate::servers::custom_axum_server::{self, TimeoutAcceptor};
16use crate::servers::http::HTTP_TRACKER_LOG_TARGET;
17use crate::servers::logging::STARTED_ON;
18use crate::servers::registar::{ServiceHealthCheckJob, ServiceRegistration, ServiceRegistrationForm};
19use crate::servers::signals::{graceful_shutdown, Halted};
20
21#[derive(Debug)]
34pub enum Error {
35 Error(String),
36}
37
38#[derive(Constructor, Debug)]
39pub struct Launcher {
40 pub bind_to: SocketAddr,
41 pub tls: Option<RustlsConfig>,
42}
43
44impl Launcher {
45 #[instrument(skip(self, tracker, tx_start, rx_halt))]
46 fn start(&self, tracker: Arc<Tracker>, tx_start: Sender<Started>, rx_halt: Receiver<Halted>) -> BoxFuture<'static, ()> {
47 let socket = std::net::TcpListener::bind(self.bind_to).expect("Could not bind tcp_listener to address.");
48 let address = socket.local_addr().expect("Could not get local_addr from tcp_listener.");
49
50 let handle = Handle::new();
51
52 tokio::task::spawn(graceful_shutdown(
53 handle.clone(),
54 rx_halt,
55 format!("Shutting down HTTP server on socket address: {address}"),
56 ));
57
58 let tls = self.tls.clone();
59 let protocol = if tls.is_some() { "https" } else { "http" };
60
61 tracing::info!(target: HTTP_TRACKER_LOG_TARGET, "Starting on: {protocol}://{}", address);
62
63 let app = router(tracker, address);
64
65 let running = Box::pin(async {
66 match tls {
67 Some(tls) => custom_axum_server::from_tcp_rustls_with_timeouts(socket, tls)
68 .handle(handle)
69 .serve(app.into_make_service_with_connect_info::<std::net::SocketAddr>())
73 .await
74 .expect("Axum server crashed."),
75 None => custom_axum_server::from_tcp_with_timeouts(socket)
76 .handle(handle)
77 .acceptor(TimeoutAcceptor)
78 .serve(app.into_make_service_with_connect_info::<std::net::SocketAddr>())
79 .await
80 .expect("Axum server crashed."),
81 }
82 });
83
84 tracing::info!(target: HTTP_TRACKER_LOG_TARGET, "{STARTED_ON}: {protocol}://{}", address);
85
86 tx_start
87 .send(Started { address })
88 .expect("the HTTP(s) Tracker service should not be dropped");
89
90 running
91 }
92}
93
94#[allow(clippy::module_name_repetitions)]
96pub type StoppedHttpServer = HttpServer<Stopped>;
97
98#[allow(clippy::module_name_repetitions)]
100pub type RunningHttpServer = HttpServer<Running>;
101
102#[allow(clippy::module_name_repetitions)]
118pub struct HttpServer<S> {
119 pub state: S,
121}
122
123pub struct Stopped {
125 launcher: Launcher,
126}
127
128pub struct Running {
130 pub binding: SocketAddr,
132 pub halt_task: tokio::sync::oneshot::Sender<Halted>,
133 pub task: tokio::task::JoinHandle<Launcher>,
134}
135
136impl HttpServer<Stopped> {
137 #[must_use]
139 pub fn new(launcher: Launcher) -> Self {
140 Self {
141 state: Stopped { launcher },
142 }
143 }
144
145 pub async fn start(self, tracker: Arc<Tracker>, form: ServiceRegistrationForm) -> Result<HttpServer<Running>, Error> {
157 let (tx_start, rx_start) = tokio::sync::oneshot::channel::<Started>();
158 let (tx_halt, rx_halt) = tokio::sync::oneshot::channel::<Halted>();
159
160 let launcher = self.state.launcher;
161
162 let task = tokio::spawn(async move {
163 let server = launcher.start(tracker, tx_start, rx_halt);
164
165 server.await;
166
167 launcher
168 });
169
170 let binding = rx_start.await.expect("it should be able to start the service").address;
171
172 form.send(ServiceRegistration::new(binding, check_fn))
173 .expect("it should be able to send service registration");
174
175 Ok(HttpServer {
176 state: Running {
177 binding,
178 halt_task: tx_halt,
179 task,
180 },
181 })
182 }
183}
184
185impl HttpServer<Running> {
186 pub async fn stop(self) -> Result<HttpServer<Stopped>, Error> {
193 self.state
194 .halt_task
195 .send(Halted::Normal)
196 .map_err(|_| Error::Error("Task killer channel was closed.".to_string()))?;
197
198 let launcher = self.state.task.await.map_err(|e| Error::Error(e.to_string()))?;
199
200 Ok(HttpServer {
201 state: Stopped { launcher },
202 })
203 }
204}
205
206#[must_use]
213pub fn check_fn(binding: &SocketAddr) -> ServiceHealthCheckJob {
214 let url = format!("http://{binding}/health_check"); let info = format!("checking http tracker health check at: {url}");
217
218 let job = tokio::spawn(async move {
219 match reqwest::get(url).await {
220 Ok(response) => Ok(response.status().to_string()),
221 Err(err) => Err(err.to_string()),
222 }
223 });
224
225 ServiceHealthCheckJob::new(*binding, info, job)
226}
227
228#[cfg(test)]
229mod tests {
230 use std::sync::Arc;
231
232 use torrust_tracker_test_helpers::configuration::ephemeral_public;
233
234 use crate::bootstrap::app::initialize_with_configuration;
235 use crate::bootstrap::jobs::make_rust_tls;
236 use crate::servers::http::server::{HttpServer, Launcher};
237 use crate::servers::registar::Registar;
238
239 #[tokio::test]
240 async fn it_should_be_able_to_start_and_stop() {
241 let cfg = Arc::new(ephemeral_public());
242 let tracker = initialize_with_configuration(&cfg);
243 let http_trackers = cfg.http_trackers.clone().expect("missing HTTP trackers configuration");
244 let config = &http_trackers[0];
245
246 let bind_to = config.bind_address;
247
248 let tls = make_rust_tls(&config.tsl_config)
249 .await
250 .map(|tls| tls.expect("tls config failed"));
251
252 let register = &Registar::default();
253
254 let stopped = HttpServer::new(Launcher::new(bind_to, tls));
255 let started = stopped
256 .start(tracker, register.give_form())
257 .await
258 .expect("it should start the server");
259 let stopped = started.stop().await.expect("it should stop the server");
260
261 assert_eq!(stopped.state.launcher.bind_to, bind_to);
262 }
263}