1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
use std::{future::Future, io, net::SocketAddr, pin::Pin, ptr, sync::Arc};

use tokio::{
    net::{TcpListener, TcpSocket},
    sync::{watch, Semaphore},
};

use crate::{
    config,
    service::Rxh,
    sync::notify::{Notification, Notifier},
};

/// The [`Server`] struct represents a particular `[[server]]` instance from the
/// config file. It is responsible for accepting new connections and spawning
/// Tokio tasks to handle them properly, as well as gracefully stopping the
/// running tasks. In order to perform graceful shutdowns, the [`Server`]
/// notifies all the running tasks about the shutdown event and waits for their
/// acknowledgements. See [`Notifier`] for further details. Here's a simple
/// diagram describing the process:
///
/// ```text
///                     +--------+
///                     | Server |
///                     +--------+
///                         |
///                         v
///                     +--------+
///                +--- | Select | ---+
///                |    +--------+    |
///                v                  v
///          +----------+       +----------+
///          |  Accept  |       | Shutdown |
///          +----------+       +----------+
///                |                  |
///                v                  v
///          +----------+       +----------+
///          |  Spawn   |       |  Notify  |
///          +----------+       +----------+
///                |                  |
///                v                  v
/// +--------+   +--------+   +--------+   +--------+
/// | Task 1 |   | Task 2 |   | Task 3 |   | Task 4 |
/// +--------+   +--------+   +--------+   +--------+
/// ```
pub struct Server {
    /// State updates channel. Subscribers can use this to check the current
    /// [`State`] of this server.
    state: watch::Sender<State>,

    /// TCP listener used to accept connections.
    listener: TcpListener,

    /// Configuration for this server.
    config: config::Server,

    /// Socket address used by this server to listen for incoming connections.
    address: SocketAddr,

    /// [`Notifier`] object used to send notifications to tasks spawned by
    /// this server.
    notifier: Notifier,

    /// Shutdown future, this can be anything, which allows us to easily write
    /// integration tests. When this future completes, the server starts the
    /// shutdown process.
    shutdown: Pin<Box<dyn Future<Output = ()> + Send>>,

    /// Connections are limited to a maximum number. In order to allow a new
    /// connection we'll have a acquire a permite from the semaphore.
    connections: Arc<Semaphore>,
}

/// Represents the current state of the server.
#[derive(Debug, PartialEq, Eq)]
pub enum State {
    /// Server has started but is not accepting connections yet.
    Starting,

    /// Server is accepting incoming connections.
    Listening,

    /// Maximum number of connections reached.
    MaxConnectionsReached(usize),

    /// Server is gracefully shutting down.
    ShuttingDown(ShutdownState),
}

/// Represents a state in the graceful shutdown process.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum ShutdownState {
    /// The server has received the shutdown signal and won't accept more
    /// connections, but it will still process data for currently connected
    /// sockets.
    PendingConnections(usize),

    /// Shutdown process complete.
    Done,
}

impl Server {
    /// Initializes a [`Server`] with the given `config`. This process makes
    /// sure that the listening address can be used and configures a socket
    /// for that address, but does not accept connections yet. In order to
    /// process incoming connections, [`Server::run`] must be called and
    /// `await`ed. We do it this way because we use the port 0 for integration
    /// tests, which allows the OS to pick any available port, but we still want
    /// to know which port the server is using.
    ///
    /// For the `replica` parameter see [`super::master::Master`], but basically
    /// it's a number that indicates which address should this server choose for
    /// listening, since the config file allows multiple addresses.
    pub fn init(config: config::Server, replica: usize) -> Result<Self, io::Error> {
        let (state, _) = watch::channel(State::Starting);

        let socket = if config.listen[replica].is_ipv4() {
            TcpSocket::new_v4()?
        } else {
            TcpSocket::new_v6()?
        };

        #[cfg(not(windows))]
        socket.set_reuseaddr(true)?;

        socket.bind(config.listen[replica])?;

        // TODO: Hardcoded backlog, maybe this should be configurable.
        let listener = socket.listen(1024)?;
        let address = listener.local_addr().unwrap();

        let notifier = Notifier::new();

        // Don't shutdown on anything by default. CTRL-C will forcefully kill
        // the process.
        let shutdown = Box::pin(std::future::pending());

        let connections = Arc::new(Semaphore::new(config.connections));

        Ok(Self {
            state,
            listener,
            config,
            address,
            notifier,
            shutdown,
            connections,
        })
    }

    /// The [`Server`] will poll the given `future` and whenever it completes,
    /// the graceful shutdown process starts. If only one server is
    /// instantiated, this could be called with [`tokio::signal::ctrl_c`], but
    /// it can be any [`Future`] since we need customization for integration
    /// tests and spawning multiple servers using [`super::master::Master`].
    pub fn shutdown_on(mut self, future: impl Future + Send + 'static) -> Self {
        self.shutdown = Box::pin(async move {
            future.await;
        });

        self
    }

    /// Address of the listening socket. This is necessary for obtaining the
    /// actual address in cases port 0 was used.
    pub fn socket_address(&self) -> SocketAddr {
        self.address
    }

    /// By subscribing to this server the caller obtains a channel where the
    /// current state of the server can be read. This allows the server and
    /// caller to run on separate Tokio tasks while still allowing the caller
    /// to read the state.
    pub fn subscribe(&self) -> watch::Receiver<State> {
        self.state.subscribe()
    }

    /// This is the entry point, by calling and `await`ing this function the
    /// server starts to process connections.
    pub async fn run(self) -> Result<(), crate::Error> {
        let Self {
            mut config,
            state,
            listener,
            notifier,
            shutdown,
            address,
            connections,
        } = self;

        let log_name = if let Some(ref id) = config.name {
            format!("{address} ({id})")
        } else {
            address.to_string()
        };

        config.log_name = log_name.clone();

        state.send_replace(State::Listening);
        println!("{log_name} => Listening for requests");

        // Leak the configuration to get a 'static lifetime, which we need to
        // spawn tokio tasks. Later when all tasks have finished, we'll drop this
        // value to avoid actual memory leaks.
        let config = Box::leak(Box::new(config));

        let listener = Listener {
            config,
            connections,
            listener,
            notifier: &notifier,
            state: &state,
        };

        tokio::select! {
            result = listener.listen() => {
                if let Err(err) = result {
                    println!("{log_name} => Error while accepting connections: {err}");
                }
            }
            _ = shutdown => {
                println!("{log_name} => Received shutdown signal");
            }
        }

        // Drop the listener to stop accepting new connections. This will cause
        // a "Connection Refused" error on any new client socket that attempts
        // to connect. Already connected sockets will still be able to send and
        // receive data.
        drop(listener);

        if let Ok(num_tasks) = notifier.send(Notification::Shutdown) {
            println!("{log_name} => Can't shutdown yet, {num_tasks} pending connections");
            state.send_replace(State::ShuttingDown(ShutdownState::PendingConnections(
                num_tasks,
            )));
            notifier.collect_acknowledgements().await;
        }

        // SAFETY: Nobody is reading this configuration anymore because all
        // tasks have ended at this point, so there are no more references to
        // this address. It's an ugly hack, but we don't have to use Arc if we
        // do this, we can simply skip the reference counting and avoid atomic
        // operations.
        unsafe {
            drop(Box::from_raw(ptr::from_ref(config).cast_mut()));
        }

        state.send_replace(State::ShuttingDown(ShutdownState::Done));
        println!("{log_name} => Shutdown complete");

        Ok(())
    }
}

/// Listens for incoming connections and spawns tasks to handle them if permits
/// are available.
struct Listener<'a> {
    /// Underlying TCP listener. We take ownership of this so that when this
    /// struct is dropped the socket is also dropped and we stop accepting
    /// connections.
    listener: TcpListener,

    /// Reference to the configuration of this server.
    config: &'static config::Server,

    /// Needed to obtain subscriptions and pass them down to request handler
    /// tasks.
    notifier: &'a Notifier,

    /// Used to update the state when max connections are reached.
    state: &'a watch::Sender<State>,

    /// Connections permits.
    connections: Arc<Semaphore>,
}

impl<'a> Listener<'a> {
    pub async fn listen(&self) -> Result<(), crate::Error> {
        loop {
            // Move out of config to get a static lifetime that we can pass down
            // to the new Tokio task.
            let config = self.config;

            let mut notify_listening_again = false;

            if self.connections.available_permits() == 0 {
                println!(
                    "{} => Reached max connections: {}",
                    config.log_name, config.connections
                );
                self.state
                    .send_replace(State::MaxConnectionsReached(config.connections));
                notify_listening_again = true;
            }

            // We don't close the semaphore so unwrapping is OK.
            let permit = self.connections.clone().acquire_owned().await.unwrap();

            // Once we've obtainied a permite we can start listening again if
            // we stopped before.
            if notify_listening_again {
                println!("{} => Accepting connections again", config.log_name);
                self.state.send_replace(State::Listening);
            }

            let (stream, client_addr) = self.listener.accept().await?;
            let mut subscription = self.notifier.subscribe();
            let server_addr = stream.local_addr()?;

            tokio::task::spawn(async move {
                if let Err(err) = hyper::server::conn::http1::Builder::new()
                    .preserve_header_case(true)
                    .title_case_headers(true)
                    .serve_connection(stream, Rxh::new(config, client_addr, server_addr))
                    .with_upgrades()
                    .await
                {
                    println!("Failed to serve connection: {:?}", err);
                }

                if let Some(Notification::Shutdown) = subscription.receive_notification() {
                    subscription.acknowledge_notification().await;
                }

                // Permit is dropped only when the accepted socket is done
                // sending and receiving data.
                drop(permit);
            });
        }
    }
}