torrust_tracker/servers/udp/server/
launcher.rs

1use std::net::SocketAddr;
2use std::sync::Arc;
3use std::time::Duration;
4
5use derive_more::Constructor;
6use futures_util::StreamExt;
7use tokio::select;
8use tokio::sync::oneshot;
9use tracing::instrument;
10
11use super::request_buffer::ActiveRequests;
12use crate::bootstrap::jobs::Started;
13use crate::core::Tracker;
14use crate::servers::logging::STARTED_ON;
15use crate::servers::registar::ServiceHealthCheckJob;
16use crate::servers::signals::{shutdown_signal_with_message, Halted};
17use crate::servers::udp::server::bound_socket::BoundSocket;
18use crate::servers::udp::server::processor::Processor;
19use crate::servers::udp::server::receiver::Receiver;
20use crate::servers::udp::UDP_TRACKER_LOG_TARGET;
21use crate::shared::bit_torrent::tracker::udp::client::check;
22
23/// A UDP server instance launcher.
24#[derive(Constructor)]
25pub struct Launcher;
26
27impl Launcher {
28    /// It starts the UDP server instance with graceful shutdown.
29    ///
30    /// # Panics
31    ///
32    /// It panics if unable to bind to udp socket, and get the address from the udp socket.
33    /// It also panics if unable to send address of socket.
34    #[instrument(skip(tracker, bind_to, tx_start, rx_halt))]
35    pub async fn run_with_graceful_shutdown(
36        tracker: Arc<Tracker>,
37        bind_to: SocketAddr,
38        tx_start: oneshot::Sender<Started>,
39        rx_halt: oneshot::Receiver<Halted>,
40    ) {
41        tracing::info!(target: UDP_TRACKER_LOG_TARGET, "Starting on: {bind_to}");
42
43        let socket = tokio::time::timeout(Duration::from_millis(5000), BoundSocket::new(bind_to))
44            .await
45            .expect("it should bind to the socket within five seconds");
46
47        let bound_socket = match socket {
48            Ok(socket) => socket,
49            Err(e) => {
50                tracing::error!(target: UDP_TRACKER_LOG_TARGET, addr = %bind_to, err = %e, "Udp::run_with_graceful_shutdown panic! (error when building socket)" );
51                panic!("could not bind to socket!");
52            }
53        };
54
55        let address = bound_socket.address();
56        let local_udp_url = bound_socket.url().to_string();
57
58        tracing::info!(target: UDP_TRACKER_LOG_TARGET, "{STARTED_ON}: {local_udp_url}");
59
60        let receiver = Receiver::new(bound_socket.into());
61
62        tracing::trace!(target: UDP_TRACKER_LOG_TARGET, local_udp_url, "Udp::run_with_graceful_shutdown (spawning main loop)");
63
64        let running = {
65            let local_addr = local_udp_url.clone();
66            tokio::task::spawn(async move {
67                tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_with_graceful_shutdown::task (listening...)");
68                let () = Self::run_udp_server_main(receiver, tracker.clone()).await;
69            })
70        };
71
72        tx_start
73            .send(Started { address })
74            .expect("the UDP Tracker service should not be dropped");
75
76        tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_udp_url, "Udp::run_with_graceful_shutdown (started)");
77
78        let stop = running.abort_handle();
79
80        let halt_task = tokio::task::spawn(shutdown_signal_with_message(
81            rx_halt,
82            format!("Halting UDP Service Bound to Socket: {address}"),
83        ));
84
85        select! {
86            _ = running => { tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_udp_url, "Udp::run_with_graceful_shutdown (stopped)"); },
87            _ = halt_task => { tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_udp_url, "Udp::run_with_graceful_shutdown (halting)"); }
88        }
89        stop.abort();
90
91        tokio::task::yield_now().await; // lets allow the other threads to complete.
92    }
93
94    #[must_use]
95    #[instrument(skip(binding))]
96    pub fn check(binding: &SocketAddr) -> ServiceHealthCheckJob {
97        let binding = *binding;
98        let info = format!("checking the udp tracker health check at: {binding}");
99
100        let job = tokio::spawn(async move { check(&binding).await });
101
102        ServiceHealthCheckJob::new(binding, info, job)
103    }
104
105    #[instrument(skip(receiver, tracker))]
106    async fn run_udp_server_main(mut receiver: Receiver, tracker: Arc<Tracker>) {
107        let active_requests = &mut ActiveRequests::default();
108
109        let addr = receiver.bound_socket_address();
110        let local_addr = format!("udp://{addr}");
111
112        loop {
113            let processor = Processor::new(receiver.socket.clone(), tracker.clone());
114
115            if let Some(req) = {
116                tracing::trace!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server (wait for request)");
117                receiver.next().await
118            } {
119                tracing::trace!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server::loop (in)");
120
121                let req = match req {
122                    Ok(req) => req,
123                    Err(e) => {
124                        if e.kind() == std::io::ErrorKind::Interrupted {
125                            tracing::warn!(target: UDP_TRACKER_LOG_TARGET, local_addr, err = %e,  "Udp::run_udp_server::loop (interrupted)");
126                            return;
127                        }
128                        tracing::error!(target: UDP_TRACKER_LOG_TARGET, local_addr, err = %e,  "Udp::run_udp_server::loop break: (got error)");
129                        break;
130                    }
131                };
132
133                // We spawn the new task even if there active requests buffer is
134                // full. This could seem counterintuitive because we are accepting
135                // more request and consuming more memory even if the server is
136                // already busy. However, we "force_push" the new tasks in the
137                // buffer. That means, in the worst scenario we will abort a
138                // running task to make place for the new task.
139                //
140                // Once concern could be to reach an starvation point were we
141                // are only adding and removing tasks without given them the
142                // chance to finish. However, the buffer is yielding before
143                // aborting one tasks, giving it the chance to finish.
144                let abort_handle: tokio::task::AbortHandle = tokio::task::spawn(processor.process_request(req)).abort_handle();
145
146                if abort_handle.is_finished() {
147                    continue;
148                }
149
150                active_requests.force_push(abort_handle, &local_addr).await;
151            } else {
152                tokio::task::yield_now().await;
153
154                // the request iterator returned `None`.
155                tracing::error!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server breaking: (ran dry, should not happen in production!)");
156                break;
157            }
158        }
159    }
160}