1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400
//! Minimal Redis server implementation //! //! Provides an async `run` function that listens for inbound connections, //! spawning a task per connection. use crate::{Command, Connection, Db, Shutdown}; use std::future::Future; use std::sync::Arc; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::{broadcast, mpsc, Semaphore}; use tokio::time::{self, Duration}; use tracing::{debug, error, info, instrument}; /// Server listener state. Created in the `run` call. It includes a `run` method /// which performs the TCP listening and initialization of per-connection state. #[derive(Debug)] struct Listener { /// Shared database handle. /// /// Contains the key / value store as well as the broadcast channels for /// pub/sub. /// /// This is a wrapper around an `Arc`. This enables `db` to be cloned and /// passed into the per connection state (`Handler`). db: Db, /// TCP listener supplied by the `run` caller. listener: TcpListener, /// Limit the max number of connections. /// /// A `Semaphore` is used to limit the max number of connections. Before /// attempting to accept a new connection, a permit is acquired from the /// semaphore. If none are available, the listener waits for one. /// /// When handlers complete processing a connection, the permit is returned /// to the semaphore. limit_connections: Arc<Semaphore>, /// Broadcasts a shutdown signal to all active connections. /// /// The initial `shutdown` trigger is provided by the `run` caller. The /// server is responsible for gracefully shutting down active connections. /// When a connection task is spawned, it is passed a broadcast receiver /// handle. When a graceful shutdown is initiated, a `()` value is sent via /// the broadcast::Sender. Each active connection receives it, reaches a /// safe terminal state, and completes the task. notify_shutdown: broadcast::Sender<()>, /// Used as part of the graceful shutdown process to wait for client /// connections to complete processing. /// /// Tokio channels are closed once all `Sender` handles go out of scope. /// When a channel is closed, the receiver receives `None`. This is /// leveraged to detect all connection handlers completing. When a /// connection handler is initialized, it is assigned a clone of /// `shutdown_complete_tx`. When the listener shuts down, it drops the /// sender held by this `shutdown_complete_tx` field. Once all handler tasks /// complete, all clones of the `Sender` are also dropped. This results in /// `shutdown_complete_rx.recv()` completing with `None`. At this point, it /// is safe to exit the server process. shutdown_complete_rx: mpsc::Receiver<()>, shutdown_complete_tx: mpsc::Sender<()>, } /// Per-connection handler. Reads requests from `connection` and applies the /// commands to `db`. #[derive(Debug)] struct Handler { /// Shared database handle. /// /// When a command is received from `connection`, it is applied with `db`. /// The implementation of the command is in the `cmd` module. Each command /// will need to interact with `db` in order to complete the work. db: Db, /// The TCP connection decorated with the redis protocol encoder / decoder /// implemented using a buffered `TcpStream`. /// /// When `Listener` receives an inbound connection, the `TcpStream` is /// passed to `Connection::new`, which initializes the associated buffers. /// `Connection` allows the handler to operate at the "frame" level and keep /// the byte level protocol parsing details encapsulated in `Connection`. connection: Connection, /// Max connection semaphore. /// /// When the handler is dropped, a permit is returned to this semaphore. If /// the listener is waiting for connections to close, it will be notified of /// the newly available permit and resume accepting connections. limit_connections: Arc<Semaphore>, /// Listen for shutdown notifications. /// /// A wrapper around the `broadcast::Receiver` paired with the sender in /// `Listener`. The connection handler processes requests from the /// connection until the peer disconnects **or** a shutdown notification is /// received from `shutdown`. In the latter case, any in-flight work being /// processed for the peer is continued until it reaches a safe state, at /// which point the connection is terminated. shutdown: Shutdown, /// Not used directly. Instead, when `Handler` is dropped...? _shutdown_complete: mpsc::Sender<()>, } /// Maximum number of concurrent connections the redis server will accept. /// /// When this limit is reached, the server will stop accepting connections until /// an active connection terminates. /// /// A real application will want to make this value configurable, but for this /// example, it is hard coded. /// /// This is also set to a pretty low value to discourage using this in /// production (you'd think that all the disclaimers would make it obvious that /// this is not a serious project... but I thought that about mini-http as /// well). const MAX_CONNECTIONS: usize = 250; /// Run the mini-redis server. /// /// Accepts connections from the supplied listener. For each inbound connection, /// a task is spawned to handle that connection. The server runs until the /// `shutdown` future completes, at which point the server shuts down /// gracefully. /// /// `tokio::signal::ctrl_c()` can be used as the `shutdown` argument. This will /// listen for a SIGINT signal. pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result<()> { // When the provided `shutdown` future completes, we must send a shutdown // message to all active connections. We use a broadcast channel for this // purpose. The call below ignores the receiver of the broadcast pair, and when // a receiver is needed, the subscribe() method on the sender is used to create // one. let (notify_shutdown, _) = broadcast::channel(1); let (shutdown_complete_tx, shutdown_complete_rx) = mpsc::channel(1); // Initialize the listener state let mut server = Listener { listener, db: Db::new(), limit_connections: Arc::new(Semaphore::new(MAX_CONNECTIONS)), notify_shutdown, shutdown_complete_tx, shutdown_complete_rx, }; // Concurrently run the server and listen for the `shutdown` signal. The // server task runs until an error is encountered, so under normal // circumstances, this `select!` statement runs until the `shutdown` signal // is received. // // `select!` statements are written in the form of: // // ``` // <result of async op> = <async op> => <step to perform with result> // ``` // // All `<async op>` statements are executed concurrently. Once the **first** // op completes, its associated `<step to perform with result>` is // performed. // // The `select! macro is a foundational building block for writing // asynchronous Rust. See the API docs for more details: // // https://docs.rs/tokio/*/tokio/macro.select.html tokio::select! { res = server.run() => { // If an error is received here, accepting connections from the TCP // listener failed multiple times and the server is giving up and // shutting down. // // Errors encountered when handling individual connections do not // bubble up to this point. if let Err(err) = res { error!(cause = %err, "failed to accept"); } } _ = shutdown => { // The shutdown signal has been received. info!("shutting down"); } } // Extract the `shutdown_complete` receiver and transmitter // explicitly drop `shutdown_transmitter`. This is important, as the // `.await` below would otherwise never complete. let Listener { mut shutdown_complete_rx, shutdown_complete_tx, notify_shutdown, .. } = server; // When `notify_shutdown` is dropped, all tasks which have `subscribe`d will // receive the shutdown signal and can exit drop(notify_shutdown); // Drop final `Sender` so the `Receiver` below can complete drop(shutdown_complete_tx); // Wait for all active connections to finish processing. As the `Sender` // handle held by the listener has been dropped above, the only remaining // `Sender` instances are held by connection handler tasks. When those drop, // the `mpsc` channel will close and `recv()` will return `None`. let _ = shutdown_complete_rx.recv().await; Ok(()) } impl Listener { /// Run the server /// /// Listen for inbound connections. For each inbound connection, spawn a /// task to process that connection. /// /// # Errors /// /// Returns `Err` if accepting returns an error. This can happen for a /// number reasons that resolve over time. For example, if the underlying /// operating system has reached an internal limit for max number of /// sockets, accept will fail. /// /// The process is not able to detect when a transient error resolves /// itself. One strategy for handling this is to implement a back off /// strategy, which is what we do here. async fn run(&mut self) -> crate::Result<()> { info!("accepting inbound connections"); loop { // Wait for a permit to become available // // `acquire` returns a permit that is bound via a lifetime to the // semaphore. When the permit value is dropped, it is automatically // returned to the semaphore. This is convenient in many cases. // However, in this case, the permit must be returned in a different // task than it is acquired in (the handler task). To do this, we // "forget" the permit, which drops the permit value **without** // incrementing the semaphore's permits. Then, in the handler task // we manually add a new permit when processing completes. // // `acquire()` returns `Err` when the semaphore has been closed. We // don't ever close the sempahore, so `unwrap()` is safe. self.limit_connections.acquire().await.unwrap().forget(); // Accept a new socket. This will attempt to perform error handling. // The `accept` method internally attempts to recover errors, so an // error here is non-recoverable. let socket = self.accept().await?; // Create the necessary per-connection handler state. let mut handler = Handler { // Get a handle to the shared database. Internally, this is an // `Arc`, so a clone only increments the ref count. db: self.db.clone(), // Initialize the connection state. This allocates read/write // buffers to perform redis protocol frame parsing. connection: Connection::new(socket), // The connection state needs a handle to the max connections // semaphore. When the handler is done processing the // connection, a permit is added back to the semaphore. limit_connections: self.limit_connections.clone(), // Receive shutdown notifications. shutdown: Shutdown::new(self.notify_shutdown.subscribe()), // Notifies the receiver half once all clones are // dropped. _shutdown_complete: self.shutdown_complete_tx.clone(), }; // Spawn a new task to process the connections. Tokio tasks are like // asynchronous green threads and are executed concurrently. tokio::spawn(async move { // Process the connection. If an error is encountered, log it. if let Err(err) = handler.run().await { error!(cause = ?err, "connection error"); } }); } } /// Accept an inbound connection. /// /// Errors are handled by backing off and retrying. An exponential backoff /// strategy is used. After the first failure, the task waits for 1 second. /// After the second failure, the task waits for 2 seconds. Each subsequent /// failure doubles the wait time. If accepting fails on the 6th try after /// waiting for 64 seconds, then this function returns with an error. async fn accept(&mut self) -> crate::Result<TcpStream> { let mut backoff = 1; // Try to accept a few times loop { // Perform the accept operation. If a socket is successfully // accepted, return it. Otherwise, save the error. match self.listener.accept().await { Ok((socket, _)) => return Ok(socket), Err(err) => { if backoff > 64 { // Accept has failed too many times. Return the error. return Err(err.into()); } } } // Pause execution until the back off period elapses. time::sleep(Duration::from_secs(backoff)).await; // Double the back off backoff *= 2; } } } impl Handler { /// Process a single connection. /// /// Request frames are read from the socket and processed. Responses are /// written back to the socket. /// /// Currently, pipelining is not implemented. Pipelining is the ability to /// process more than one request concurrently per connection without /// interleaving frames. See for more details: /// https://redis.io/topics/pipelining /// /// When the shutdown signal is received, the connection is processed until /// it reaches a safe state, at which point it is terminated. #[instrument(skip(self))] async fn run(&mut self) -> crate::Result<()> { // As long as the shutdown signal has not been received, try to read a // new request frame. while !self.shutdown.is_shutdown() { // While reading a request frame, also listen for the shutdown // signal. let maybe_frame = tokio::select! { res = self.connection.read_frame() => res?, _ = self.shutdown.recv() => { // If a shutdown signal is received, return from `run`. // This will result in the task terminating. return Ok(()); } }; // If `None` is returned from `read_frame()` then the peer closed // the socket. There is no further work to do and the task can be // terminated. let frame = match maybe_frame { Some(frame) => frame, None => return Ok(()), }; // Convert the redis frame into a command struct. This returns an // error if the frame is not a valid redis command or it is an // unsupported command. let cmd = Command::from_frame(frame)?; // Logs the `cmd` object. The syntax here is a shorthand provided by // the `tracing` crate. It can be thought of as similar to: // // ``` // debug!(cmd = format!("{:?}", cmd)); // ``` // // `tracing` provides structured logging, so information is "logged" // as key-value pairs. debug!(?cmd); // Perform the work needed to apply the command. This may mutate the // database state as a result. // // The connection is passed into the apply function which allows the // command to write response frames directly to the connection. In // the case of pub/sub, multiple frames may be send back to the // peer. cmd.apply(&self.db, &mut self.connection, &mut self.shutdown) .await?; } Ok(()) } } impl Drop for Handler { fn drop(&mut self) { // Add a permit back to the semaphore. // // Doing so unblocks the listener if the max number of // connections has been reached. // // This is done in a `Drop` implementation in order to guarantee that // the permit is added even if the task handling the connection panics. // If `add_permit` was called at the end of the `run` function and some // bug causes a panic. The permit would never be returned to the // semaphore. self.limit_connections.add_permits(1); } }