1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
//! Module to handle the UDP server instances.
//!
//! There are two main types in this module:
//!
//! - [`UdpServer`](crate::servers::udp::server::UdpServer): a controller to
//! start and stop the server.
//! - [`Udp`](crate::servers::udp::server::Udp): the server launcher.
//!
//! The `UdpServer` is an state machine for a given configuration. This struct
//! represents concrete configuration and state. It allows to start and
//! stop the server but always keeping the same configuration.
//!
//! The `Udp` is the server launcher. It's responsible for launching the UDP
//! but without keeping any state.
//!
//! For the time being, the `UdpServer` is only used for testing purposes,
//! because we want to be able to start and stop the server multiple times, and
//! we want to know the bound address and the current state of the server.
//! In production, the `Udp` launcher is used directly.
use std::future::Future;
use std::io::Cursor;
use std::net::SocketAddr;
use std::sync::Arc;

use aquatic_udp_protocol::Response;
use futures::pin_mut;
use log::{debug, error, info};
use tokio::net::UdpSocket;
use tokio::task::JoinHandle;

use crate::servers::signals::shutdown_signal;
use crate::servers::udp::handlers::handle_packet;
use crate::servers::udp::MAX_PACKET_SIZE;
use crate::tracker::Tracker;

/// Error that can occur when starting or stopping the UDP server.
///
/// Some errors triggered while starting the server are:
///
/// - The server cannot bind to the given address.
/// - It cannot get the bound address.
///
/// Some errors triggered while stopping the server are:
///
/// - The [`UdpServer`](crate::servers::udp::server::UdpServer) cannot send the
///  shutdown signal to the spawned UDP service thread.
#[derive(Debug)]
pub enum Error {
    /// Any kind of error starting or stopping the server.
    Error(String), // todo: refactor to use thiserror and add more variants for specific errors.
}

/// A UDP server instance controller with no UDP instance running.
#[allow(clippy::module_name_repetitions)]
pub type StoppedUdpServer = UdpServer<Stopped>;

/// A UDP server instance controller with a running UDP instance.
#[allow(clippy::module_name_repetitions)]
pub type RunningUdpServer = UdpServer<Running>;

/// A UDP server instance controller.
///
/// It's responsible for:
///
/// - Keeping the initial configuration of the server.
/// - Starting and stopping the server.
/// - Keeping the state of the server: `running` or `stopped`.
///
/// It's an state machine. Configurations cannot be changed. This struct
/// represents concrete configuration and state. It allows to start and stop the
/// server but always keeping the same configuration.
///
/// > **NOTICE**: if the configurations changes after running the server it will
/// reset to the initial value after stopping the server. This struct is not
/// intended to persist configurations between runs.
#[allow(clippy::module_name_repetitions)]
pub struct UdpServer<S> {
    /// The configuration of the server that will be used every time the server
    /// is started.    
    pub cfg: torrust_tracker_configuration::UdpTracker,
    /// The state of the server: `running` or `stopped`.
    pub state: S,
}

/// A stopped UDP server state.
pub struct Stopped;

/// A running UDP server state.
pub struct Running {
    /// The address where the server is bound.
    pub bind_address: SocketAddr,
    stop_job_sender: tokio::sync::oneshot::Sender<u8>,
    job: JoinHandle<()>,
}

impl UdpServer<Stopped> {
    /// Creates a new `UdpServer` instance in `stopped`state.
    #[must_use]
    pub fn new(cfg: torrust_tracker_configuration::UdpTracker) -> Self {
        Self { cfg, state: Stopped {} }
    }

    /// It starts the server and returns a `UdpServer` controller in `running`
    /// state.
    ///
    /// # Errors
    ///
    /// Will return `Err` if UDP can't bind to given bind address.
    pub async fn start(self, tracker: Arc<Tracker>) -> Result<UdpServer<Running>, Error> {
        let udp = Udp::new(&self.cfg.bind_address)
            .await
            .map_err(|e| Error::Error(e.to_string()))?;

        let bind_address = udp.socket.local_addr().map_err(|e| Error::Error(e.to_string()))?;

        let (sender, receiver) = tokio::sync::oneshot::channel::<u8>();

        let job = tokio::spawn(async move {
            udp.start_with_graceful_shutdown(tracker, shutdown_signal(receiver)).await;
        });

        let running_udp_server: UdpServer<Running> = UdpServer {
            cfg: self.cfg,
            state: Running {
                bind_address,
                stop_job_sender: sender,
                job,
            },
        };

        Ok(running_udp_server)
    }
}

impl UdpServer<Running> {
    /// It stops the server and returns a `UdpServer` controller in `stopped`
    /// state.
    ///     
    /// # Errors
    ///
    /// Will return `Err` if the oneshot channel to send the stop signal
    /// has already been called once.
    pub async fn stop(self) -> Result<UdpServer<Stopped>, Error> {
        self.state.stop_job_sender.send(1).map_err(|e| Error::Error(e.to_string()))?;

        let _ = self.state.job.await;

        let stopped_api_server: UdpServer<Stopped> = UdpServer {
            cfg: self.cfg,
            state: Stopped {},
        };

        Ok(stopped_api_server)
    }
}

/// A UDP server instance launcher.
pub struct Udp {
    socket: Arc<UdpSocket>,
}

impl Udp {
    /// Creates a new `Udp` instance.
    ///
    /// # Errors
    ///
    /// Will return `Err` unable to bind to the supplied `bind_address`.
    pub async fn new(bind_address: &str) -> tokio::io::Result<Udp> {
        let socket = UdpSocket::bind(bind_address).await?;

        Ok(Udp {
            socket: Arc::new(socket),
        })
    }

    /// It starts the UDP server instance.
    ///
    /// # Panics
    ///
    /// It would panic if unable to resolve the `local_addr` from the supplied ´socket´.
    pub async fn start(&self, tracker: Arc<Tracker>) {
        loop {
            let mut data = [0; MAX_PACKET_SIZE];
            let socket = self.socket.clone();

            tokio::select! {
                _ = tokio::signal::ctrl_c() => {
                    info!("Stopping UDP server: {}..", socket.local_addr().unwrap());
                    break;
                }
                Ok((valid_bytes, remote_addr)) = socket.recv_from(&mut data) => {
                    let payload = data[..valid_bytes].to_vec();

                    info!("Received {} bytes", payload.len());
                    debug!("From: {}", &remote_addr);
                    debug!("Payload: {:?}", payload);

                    let response = handle_packet(remote_addr, payload, &tracker).await;

                    Udp::send_response(socket, remote_addr, response).await;
                }
            }
        }
    }

    /// It starts the UDP server instance with graceful shutdown.
    ///
    /// # Panics
    ///
    /// It would panic if unable to resolve the `local_addr` from the supplied ´socket´.
    async fn start_with_graceful_shutdown<F>(&self, tracker: Arc<Tracker>, shutdown_signal: F)
    where
        F: Future<Output = ()>,
    {
        // Pin the future so that it doesn't move to the first loop iteration.
        pin_mut!(shutdown_signal);

        loop {
            let mut data = [0; MAX_PACKET_SIZE];
            let socket = self.socket.clone();

            tokio::select! {
                _ = &mut shutdown_signal => {
                    info!("Stopping UDP server: {}..", self.socket.local_addr().unwrap());
                    break;
                }
                Ok((valid_bytes, remote_addr)) = socket.recv_from(&mut data) => {
                    let payload = data[..valid_bytes].to_vec();

                    info!("Received {} bytes", payload.len());
                    debug!("From: {}", &remote_addr);
                    debug!("Payload: {:?}", payload);

                    let response = handle_packet(remote_addr, payload, &tracker).await;

                    Udp::send_response(socket, remote_addr, response).await;
                }
            }
        }
    }

    async fn send_response(socket: Arc<UdpSocket>, remote_addr: SocketAddr, response: Response) {
        let buffer = vec![0u8; MAX_PACKET_SIZE];
        let mut cursor = Cursor::new(buffer);

        match response.write(&mut cursor) {
            Ok(_) => {
                #[allow(clippy::cast_possible_truncation)]
                let position = cursor.position() as usize;
                let inner = cursor.get_ref();

                info!("Sending {} bytes ...", &inner[..position].len());
                debug!("To: {:?}", &remote_addr);
                debug!("Payload: {:?}", &inner[..position]);

                Udp::send_packet(socket, &remote_addr, &inner[..position]).await;

                info!("{} bytes sent", &inner[..position].len());
            }
            Err(_) => {
                error!("could not write response to bytes.");
            }
        }
    }

    async fn send_packet(socket: Arc<UdpSocket>, remote_addr: &SocketAddr, payload: &[u8]) {
        // doesn't matter if it reaches or not
        drop(socket.send_to(payload, remote_addr).await);
    }
}