torrust_tracker/servers/udp/server/
launcher.rs1use std::net::SocketAddr;
2use std::sync::Arc;
3use std::time::Duration;
4
5use derive_more::Constructor;
6use futures_util::StreamExt;
7use tokio::select;
8use tokio::sync::oneshot;
9use tracing::instrument;
10
11use super::request_buffer::ActiveRequests;
12use crate::bootstrap::jobs::Started;
13use crate::core::Tracker;
14use crate::servers::logging::STARTED_ON;
15use crate::servers::registar::ServiceHealthCheckJob;
16use crate::servers::signals::{shutdown_signal_with_message, Halted};
17use crate::servers::udp::server::bound_socket::BoundSocket;
18use crate::servers::udp::server::processor::Processor;
19use crate::servers::udp::server::receiver::Receiver;
20use crate::servers::udp::UDP_TRACKER_LOG_TARGET;
21use crate::shared::bit_torrent::tracker::udp::client::check;
22
23#[derive(Constructor)]
25pub struct Launcher;
26
27impl Launcher {
28 #[instrument(skip(tracker, bind_to, tx_start, rx_halt))]
35 pub async fn run_with_graceful_shutdown(
36 tracker: Arc<Tracker>,
37 bind_to: SocketAddr,
38 tx_start: oneshot::Sender<Started>,
39 rx_halt: oneshot::Receiver<Halted>,
40 ) {
41 tracing::info!(target: UDP_TRACKER_LOG_TARGET, "Starting on: {bind_to}");
42
43 let socket = tokio::time::timeout(Duration::from_millis(5000), BoundSocket::new(bind_to))
44 .await
45 .expect("it should bind to the socket within five seconds");
46
47 let bound_socket = match socket {
48 Ok(socket) => socket,
49 Err(e) => {
50 tracing::error!(target: UDP_TRACKER_LOG_TARGET, addr = %bind_to, err = %e, "Udp::run_with_graceful_shutdown panic! (error when building socket)" );
51 panic!("could not bind to socket!");
52 }
53 };
54
55 let address = bound_socket.address();
56 let local_udp_url = bound_socket.url().to_string();
57
58 tracing::info!(target: UDP_TRACKER_LOG_TARGET, "{STARTED_ON}: {local_udp_url}");
59
60 let receiver = Receiver::new(bound_socket.into());
61
62 tracing::trace!(target: UDP_TRACKER_LOG_TARGET, local_udp_url, "Udp::run_with_graceful_shutdown (spawning main loop)");
63
64 let running = {
65 let local_addr = local_udp_url.clone();
66 tokio::task::spawn(async move {
67 tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_with_graceful_shutdown::task (listening...)");
68 let () = Self::run_udp_server_main(receiver, tracker.clone()).await;
69 })
70 };
71
72 tx_start
73 .send(Started { address })
74 .expect("the UDP Tracker service should not be dropped");
75
76 tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_udp_url, "Udp::run_with_graceful_shutdown (started)");
77
78 let stop = running.abort_handle();
79
80 let halt_task = tokio::task::spawn(shutdown_signal_with_message(
81 rx_halt,
82 format!("Halting UDP Service Bound to Socket: {address}"),
83 ));
84
85 select! {
86 _ = running => { tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_udp_url, "Udp::run_with_graceful_shutdown (stopped)"); },
87 _ = halt_task => { tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_udp_url, "Udp::run_with_graceful_shutdown (halting)"); }
88 }
89 stop.abort();
90
91 tokio::task::yield_now().await; }
93
94 #[must_use]
95 #[instrument(skip(binding))]
96 pub fn check(binding: &SocketAddr) -> ServiceHealthCheckJob {
97 let binding = *binding;
98 let info = format!("checking the udp tracker health check at: {binding}");
99
100 let job = tokio::spawn(async move { check(&binding).await });
101
102 ServiceHealthCheckJob::new(binding, info, job)
103 }
104
105 #[instrument(skip(receiver, tracker))]
106 async fn run_udp_server_main(mut receiver: Receiver, tracker: Arc<Tracker>) {
107 let active_requests = &mut ActiveRequests::default();
108
109 let addr = receiver.bound_socket_address();
110 let local_addr = format!("udp://{addr}");
111
112 loop {
113 let processor = Processor::new(receiver.socket.clone(), tracker.clone());
114
115 if let Some(req) = {
116 tracing::trace!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server (wait for request)");
117 receiver.next().await
118 } {
119 tracing::trace!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server::loop (in)");
120
121 let req = match req {
122 Ok(req) => req,
123 Err(e) => {
124 if e.kind() == std::io::ErrorKind::Interrupted {
125 tracing::warn!(target: UDP_TRACKER_LOG_TARGET, local_addr, err = %e, "Udp::run_udp_server::loop (interrupted)");
126 return;
127 }
128 tracing::error!(target: UDP_TRACKER_LOG_TARGET, local_addr, err = %e, "Udp::run_udp_server::loop break: (got error)");
129 break;
130 }
131 };
132
133 let abort_handle: tokio::task::AbortHandle = tokio::task::spawn(processor.process_request(req)).abort_handle();
145
146 if abort_handle.is_finished() {
147 continue;
148 }
149
150 active_requests.force_push(abort_handle, &local_addr).await;
151 } else {
152 tokio::task::yield_now().await;
153
154 tracing::error!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server breaking: (ran dry, should not happen in production!)");
156 break;
157 }
158 }
159 }
160}