torrust_tracker/servers/apis/
server.rs1use std::net::SocketAddr;
27use std::sync::Arc;
28
29use axum_server::tls_rustls::RustlsConfig;
30use axum_server::Handle;
31use derive_more::derive::Display;
32use derive_more::Constructor;
33use futures::future::BoxFuture;
34use thiserror::Error;
35use tokio::sync::oneshot::{Receiver, Sender};
36use torrust_tracker_configuration::AccessTokens;
37use tracing::{instrument, Level};
38
39use super::routes::router;
40use crate::bootstrap::jobs::Started;
41use crate::core::Tracker;
42use crate::servers::apis::API_LOG_TARGET;
43use crate::servers::custom_axum_server::{self, TimeoutAcceptor};
44use crate::servers::logging::STARTED_ON;
45use crate::servers::registar::{ServiceHealthCheckJob, ServiceRegistration, ServiceRegistrationForm};
46use crate::servers::signals::{graceful_shutdown, Halted};
47
48#[derive(Debug, Error)]
50pub enum Error {
51 #[error("Error when starting or stopping the API server")]
52 FailedToStartOrStop(String),
53}
54
55#[allow(clippy::module_name_repetitions)]
57pub type StoppedApiServer = ApiServer<Stopped>;
58
59#[allow(clippy::module_name_repetitions)]
61pub type RunningApiServer = ApiServer<Running>;
62
63#[allow(clippy::module_name_repetitions)]
69#[derive(Debug, Display)]
70pub struct ApiServer<S>
71where
72 S: std::fmt::Debug + std::fmt::Display,
73{
74 pub state: S,
75}
76
77#[derive(Debug, Display)]
79#[display("Stopped: {launcher}")]
80pub struct Stopped {
81 launcher: Launcher,
82}
83
84#[derive(Debug, Display)]
86#[display("Running (with local address): {local_addr}")]
87pub struct Running {
88 pub local_addr: SocketAddr,
89 pub halt_task: tokio::sync::oneshot::Sender<Halted>,
90 pub task: tokio::task::JoinHandle<Launcher>,
91}
92
93impl Running {
94 #[must_use]
95 pub fn new(
96 local_addr: SocketAddr,
97 halt_task: tokio::sync::oneshot::Sender<Halted>,
98 task: tokio::task::JoinHandle<Launcher>,
99 ) -> Self {
100 Self {
101 local_addr,
102 halt_task,
103 task,
104 }
105 }
106}
107
108impl ApiServer<Stopped> {
109 #[must_use]
110 pub fn new(launcher: Launcher) -> Self {
111 Self {
112 state: Stopped { launcher },
113 }
114 }
115
116 #[instrument(skip(self, tracker, form, access_tokens), err, ret(Display, level = Level::INFO))]
126 pub async fn start(
127 self,
128 tracker: Arc<Tracker>,
129 form: ServiceRegistrationForm,
130 access_tokens: Arc<AccessTokens>,
131 ) -> Result<ApiServer<Running>, Error> {
132 let (tx_start, rx_start) = tokio::sync::oneshot::channel::<Started>();
133 let (tx_halt, rx_halt) = tokio::sync::oneshot::channel::<Halted>();
134
135 let launcher = self.state.launcher;
136
137 let task = tokio::spawn(async move {
138 tracing::debug!(target: API_LOG_TARGET, "Starting with launcher in spawned task ...");
139
140 let _task = launcher.start(tracker, access_tokens, tx_start, rx_halt).await;
141
142 tracing::debug!(target: API_LOG_TARGET, "Started with launcher in spawned task");
143
144 launcher
145 });
146
147 let api_server = match rx_start.await {
148 Ok(started) => {
149 form.send(ServiceRegistration::new(started.address, check_fn))
150 .expect("it should be able to send service registration");
151
152 ApiServer {
153 state: Running::new(started.address, tx_halt, task),
154 }
155 }
156 Err(err) => {
157 let msg = format!("Unable to start API server: {err}");
158 tracing::error!("{}", msg);
159 panic!("{}", msg);
160 }
161 };
162
163 Ok(api_server)
164 }
165}
166
167impl ApiServer<Running> {
168 #[instrument(skip(self), err, ret(Display, level = Level::INFO))]
174 pub async fn stop(self) -> Result<ApiServer<Stopped>, Error> {
175 self.state
176 .halt_task
177 .send(Halted::Normal)
178 .map_err(|_| Error::FailedToStartOrStop("Task killer channel was closed.".to_string()))?;
179
180 let launcher = self.state.task.await.map_err(|e| Error::FailedToStartOrStop(e.to_string()))?;
181
182 Ok(ApiServer {
183 state: Stopped { launcher },
184 })
185 }
186}
187
188#[must_use]
195#[instrument(skip())]
196pub fn check_fn(binding: &SocketAddr) -> ServiceHealthCheckJob {
197 let url = format!("http://{binding}/api/health_check"); let info = format!("checking api health check at: {url}");
200
201 let job = tokio::spawn(async move {
202 match reqwest::get(url).await {
203 Ok(response) => Ok(response.status().to_string()),
204 Err(err) => Err(err.to_string()),
205 }
206 });
207 ServiceHealthCheckJob::new(*binding, info, job)
208}
209
210#[derive(Constructor, Debug)]
212pub struct Launcher {
213 bind_to: SocketAddr,
214 tls: Option<RustlsConfig>,
215}
216
217impl std::fmt::Display for Launcher {
218 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
219 if self.tls.is_some() {
220 write!(f, "(with socket): {}, using TLS", self.bind_to,)
221 } else {
222 write!(f, "(with socket): {}, without TLS", self.bind_to,)
223 }
224 }
225}
226
227impl Launcher {
228 #[instrument(skip(self, tracker, access_tokens, tx_start, rx_halt))]
239 pub fn start(
240 &self,
241 tracker: Arc<Tracker>,
242 access_tokens: Arc<AccessTokens>,
243 tx_start: Sender<Started>,
244 rx_halt: Receiver<Halted>,
245 ) -> BoxFuture<'static, ()> {
246 let router = router(tracker, access_tokens);
247 let socket = std::net::TcpListener::bind(self.bind_to).expect("Could not bind tcp_listener to address.");
248 let address = socket.local_addr().expect("Could not get local_addr from tcp_listener.");
249
250 let handle = Handle::new();
251
252 tokio::task::spawn(graceful_shutdown(
253 handle.clone(),
254 rx_halt,
255 format!("Shutting down tracker API server on socket address: {address}"),
256 ));
257
258 let tls = self.tls.clone();
259 let protocol = if tls.is_some() { "https" } else { "http" };
260
261 tracing::info!(target: API_LOG_TARGET, "Starting on {protocol}://{}", address);
262
263 let running = Box::pin(async {
264 match tls {
265 Some(tls) => custom_axum_server::from_tcp_rustls_with_timeouts(socket, tls)
266 .handle(handle)
267 .serve(router.into_make_service_with_connect_info::<std::net::SocketAddr>())
271 .await
272 .expect("Axum server for tracker API crashed."),
273 None => custom_axum_server::from_tcp_with_timeouts(socket)
274 .handle(handle)
275 .acceptor(TimeoutAcceptor)
276 .serve(router.into_make_service_with_connect_info::<std::net::SocketAddr>())
277 .await
278 .expect("Axum server for tracker API crashed."),
279 }
280 });
281
282 tracing::info!(target: API_LOG_TARGET, "{STARTED_ON} {protocol}://{}", address);
283
284 tx_start
285 .send(Started { address })
286 .expect("the HTTP(s) Tracker API service should not be dropped");
287
288 running
289 }
290}
291
292#[cfg(test)]
293mod tests {
294 use std::sync::Arc;
295
296 use torrust_tracker_test_helpers::configuration::ephemeral_public;
297
298 use crate::bootstrap::app::initialize_with_configuration;
299 use crate::bootstrap::jobs::make_rust_tls;
300 use crate::servers::apis::server::{ApiServer, Launcher};
301 use crate::servers::registar::Registar;
302
303 #[tokio::test]
304 async fn it_should_be_able_to_start_and_stop() {
305 let cfg = Arc::new(ephemeral_public());
306 let config = &cfg.http_api.clone().unwrap();
307
308 let tracker = initialize_with_configuration(&cfg);
309
310 let bind_to = config.bind_address;
311
312 let tls = make_rust_tls(&config.tsl_config)
313 .await
314 .map(|tls| tls.expect("tls config failed"));
315
316 let access_tokens = Arc::new(config.access_tokens.clone());
317
318 let stopped = ApiServer::new(Launcher::new(bind_to, tls));
319
320 let register = &Registar::default();
321
322 let started = stopped
323 .start(tracker, register.give_form(), access_tokens)
324 .await
325 .expect("it should start the server");
326 let stopped = started.stop().await.expect("it should stop the server");
327
328 assert_eq!(stopped.state.launcher.bind_to, bind_to);
329 }
330}