mini_redis/server.rs
1//! Minimal Redis server implementation
2//!
3//! Provides an async `run` function that listens for inbound connections,
4//! spawning a task per connection.
5
6use crate::{Command, Connection, Db, Shutdown};
7
8use std::future::Future;
9use std::sync::Arc;
10use tokio::net::{TcpListener, TcpStream};
11use tokio::sync::{broadcast, mpsc, Semaphore};
12use tokio::time::{self, Duration};
13use tracing::{debug, error, info, instrument};
14
15/// Server listener state. Created in the `run` call. It includes a `run` method
16/// which performs the TCP listening and initialization of per-connection state.
17#[derive(Debug)]
18struct Listener {
19 /// Shared database handle.
20 ///
21 /// Contains the key / value store as well as the broadcast channels for
22 /// pub/sub.
23 ///
24 /// This is a wrapper around an `Arc`. This enables `db` to be cloned and
25 /// passed into the per connection state (`Handler`).
26 db: Db,
27
28 /// TCP listener supplied by the `run` caller.
29 listener: TcpListener,
30
31 /// Limit the max number of connections.
32 ///
33 /// A `Semaphore` is used to limit the max number of connections. Before
34 /// attempting to accept a new connection, a permit is acquired from the
35 /// semaphore. If none are available, the listener waits for one.
36 ///
37 /// When handlers complete processing a connection, the permit is returned
38 /// to the semaphore.
39 limit_connections: Arc<Semaphore>,
40
41 /// Broadcasts a shutdown signal to all active connections.
42 ///
43 /// The initial `shutdown` trigger is provided by the `run` caller. The
44 /// server is responsible for gracefully shutting down active connections.
45 /// When a connection task is spawned, it is passed a broadcast receiver
46 /// handle. When a graceful shutdown is initiated, a `()` value is sent via
47 /// the broadcast::Sender. Each active connection receives it, reaches a
48 /// safe terminal state, and completes the task.
49 notify_shutdown: broadcast::Sender<()>,
50
51 /// Used as part of the graceful shutdown process to wait for client
52 /// connections to complete processing.
53 ///
54 /// Tokio channels are closed once all `Sender` handles go out of scope.
55 /// When a channel is closed, the receiver receives `None`. This is
56 /// leveraged to detect all connection handlers completing. When a
57 /// connection handler is initialized, it is assigned a clone of
58 /// `shutdown_complete_tx`. When the listener shuts down, it drops the
59 /// sender held by this `shutdown_complete_tx` field. Once all handler tasks
60 /// complete, all clones of the `Sender` are also dropped. This results in
61 /// `shutdown_complete_rx.recv()` completing with `None`. At this point, it
62 /// is safe to exit the server process.
63 shutdown_complete_rx: mpsc::Receiver<()>,
64 shutdown_complete_tx: mpsc::Sender<()>,
65}
66
67/// Per-connection handler. Reads requests from `connection` and applies the
68/// commands to `db`.
69#[derive(Debug)]
70struct Handler {
71 /// Shared database handle.
72 ///
73 /// When a command is received from `connection`, it is applied with `db`.
74 /// The implementation of the command is in the `cmd` module. Each command
75 /// will need to interact with `db` in order to complete the work.
76 db: Db,
77
78 /// The TCP connection decorated with the redis protocol encoder / decoder
79 /// implemented using a buffered `TcpStream`.
80 ///
81 /// When `Listener` receives an inbound connection, the `TcpStream` is
82 /// passed to `Connection::new`, which initializes the associated buffers.
83 /// `Connection` allows the handler to operate at the "frame" level and keep
84 /// the byte level protocol parsing details encapsulated in `Connection`.
85 connection: Connection,
86
87 /// Max connection semaphore.
88 ///
89 /// When the handler is dropped, a permit is returned to this semaphore. If
90 /// the listener is waiting for connections to close, it will be notified of
91 /// the newly available permit and resume accepting connections.
92 limit_connections: Arc<Semaphore>,
93
94 /// Listen for shutdown notifications.
95 ///
96 /// A wrapper around the `broadcast::Receiver` paired with the sender in
97 /// `Listener`. The connection handler processes requests from the
98 /// connection until the peer disconnects **or** a shutdown notification is
99 /// received from `shutdown`. In the latter case, any in-flight work being
100 /// processed for the peer is continued until it reaches a safe state, at
101 /// which point the connection is terminated.
102 shutdown: Shutdown,
103
104 /// Not used directly. Instead, when `Handler` is dropped...?
105 _shutdown_complete: mpsc::Sender<()>,
106}
107
108/// Maximum number of concurrent connections the redis server will accept.
109///
110/// When this limit is reached, the server will stop accepting connections until
111/// an active connection terminates.
112///
113/// A real application will want to make this value configurable, but for this
114/// example, it is hard coded.
115///
116/// This is also set to a pretty low value to discourage using this in
117/// production (you'd think that all the disclaimers would make it obvious that
118/// this is not a serious project... but I thought that about mini-http as
119/// well).
120const MAX_CONNECTIONS: usize = 250;
121
122/// Run the mini-redis server.
123///
124/// Accepts connections from the supplied listener. For each inbound connection,
125/// a task is spawned to handle that connection. The server runs until the
126/// `shutdown` future completes, at which point the server shuts down
127/// gracefully.
128///
129/// `tokio::signal::ctrl_c()` can be used as the `shutdown` argument. This will
130/// listen for a SIGINT signal.
131pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result<()> {
132 // When the provided `shutdown` future completes, we must send a shutdown
133 // message to all active connections. We use a broadcast channel for this
134 // purpose. The call below ignores the receiver of the broadcast pair, and when
135 // a receiver is needed, the subscribe() method on the sender is used to create
136 // one.
137 let (notify_shutdown, _) = broadcast::channel(1);
138 let (shutdown_complete_tx, shutdown_complete_rx) = mpsc::channel(1);
139
140 // Initialize the listener state
141 let mut server = Listener {
142 listener,
143 db: Db::new(),
144 limit_connections: Arc::new(Semaphore::new(MAX_CONNECTIONS)),
145 notify_shutdown,
146 shutdown_complete_tx,
147 shutdown_complete_rx,
148 };
149
150 // Concurrently run the server and listen for the `shutdown` signal. The
151 // server task runs until an error is encountered, so under normal
152 // circumstances, this `select!` statement runs until the `shutdown` signal
153 // is received.
154 //
155 // `select!` statements are written in the form of:
156 //
157 // ```
158 // <result of async op> = <async op> => <step to perform with result>
159 // ```
160 //
161 // All `<async op>` statements are executed concurrently. Once the **first**
162 // op completes, its associated `<step to perform with result>` is
163 // performed.
164 //
165 // The `select! macro is a foundational building block for writing
166 // asynchronous Rust. See the API docs for more details:
167 //
168 // https://docs.rs/tokio/*/tokio/macro.select.html
169 tokio::select! {
170 res = server.run() => {
171 // If an error is received here, accepting connections from the TCP
172 // listener failed multiple times and the server is giving up and
173 // shutting down.
174 //
175 // Errors encountered when handling individual connections do not
176 // bubble up to this point.
177 if let Err(err) = res {
178 error!(cause = %err, "failed to accept");
179 }
180 }
181 _ = shutdown => {
182 // The shutdown signal has been received.
183 info!("shutting down");
184 }
185 }
186
187 // Extract the `shutdown_complete` receiver and transmitter
188 // explicitly drop `shutdown_transmitter`. This is important, as the
189 // `.await` below would otherwise never complete.
190 let Listener {
191 mut shutdown_complete_rx,
192 shutdown_complete_tx,
193 notify_shutdown,
194 ..
195 } = server;
196 // When `notify_shutdown` is dropped, all tasks which have `subscribe`d will
197 // receive the shutdown signal and can exit
198 drop(notify_shutdown);
199 // Drop final `Sender` so the `Receiver` below can complete
200 drop(shutdown_complete_tx);
201
202 // Wait for all active connections to finish processing. As the `Sender`
203 // handle held by the listener has been dropped above, the only remaining
204 // `Sender` instances are held by connection handler tasks. When those drop,
205 // the `mpsc` channel will close and `recv()` will return `None`.
206 let _ = shutdown_complete_rx.recv().await;
207
208 Ok(())
209}
210
211impl Listener {
212 /// Run the server
213 ///
214 /// Listen for inbound connections. For each inbound connection, spawn a
215 /// task to process that connection.
216 ///
217 /// # Errors
218 ///
219 /// Returns `Err` if accepting returns an error. This can happen for a
220 /// number reasons that resolve over time. For example, if the underlying
221 /// operating system has reached an internal limit for max number of
222 /// sockets, accept will fail.
223 ///
224 /// The process is not able to detect when a transient error resolves
225 /// itself. One strategy for handling this is to implement a back off
226 /// strategy, which is what we do here.
227 async fn run(&mut self) -> crate::Result<()> {
228 info!("accepting inbound connections");
229
230 loop {
231 // Wait for a permit to become available
232 //
233 // `acquire` returns a permit that is bound via a lifetime to the
234 // semaphore. When the permit value is dropped, it is automatically
235 // returned to the semaphore. This is convenient in many cases.
236 // However, in this case, the permit must be returned in a different
237 // task than it is acquired in (the handler task). To do this, we
238 // "forget" the permit, which drops the permit value **without**
239 // incrementing the semaphore's permits. Then, in the handler task
240 // we manually add a new permit when processing completes.
241 //
242 // `acquire()` returns `Err` when the semaphore has been closed. We
243 // don't ever close the sempahore, so `unwrap()` is safe.
244 self.limit_connections.acquire().await.unwrap().forget();
245
246 // Accept a new socket. This will attempt to perform error handling.
247 // The `accept` method internally attempts to recover errors, so an
248 // error here is non-recoverable.
249 let socket = self.accept().await?;
250
251 // Create the necessary per-connection handler state.
252 let mut handler = Handler {
253 // Get a handle to the shared database. Internally, this is an
254 // `Arc`, so a clone only increments the ref count.
255 db: self.db.clone(),
256
257 // Initialize the connection state. This allocates read/write
258 // buffers to perform redis protocol frame parsing.
259 connection: Connection::new(socket),
260
261 // The connection state needs a handle to the max connections
262 // semaphore. When the handler is done processing the
263 // connection, a permit is added back to the semaphore.
264 limit_connections: self.limit_connections.clone(),
265
266 // Receive shutdown notifications.
267 shutdown: Shutdown::new(self.notify_shutdown.subscribe()),
268
269 // Notifies the receiver half once all clones are
270 // dropped.
271 _shutdown_complete: self.shutdown_complete_tx.clone(),
272 };
273
274 // Spawn a new task to process the connections. Tokio tasks are like
275 // asynchronous green threads and are executed concurrently.
276 tokio::spawn(async move {
277 // Process the connection. If an error is encountered, log it.
278 if let Err(err) = handler.run().await {
279 error!(cause = ?err, "connection error");
280 }
281 });
282 }
283 }
284
285 /// Accept an inbound connection.
286 ///
287 /// Errors are handled by backing off and retrying. An exponential backoff
288 /// strategy is used. After the first failure, the task waits for 1 second.
289 /// After the second failure, the task waits for 2 seconds. Each subsequent
290 /// failure doubles the wait time. If accepting fails on the 6th try after
291 /// waiting for 64 seconds, then this function returns with an error.
292 async fn accept(&mut self) -> crate::Result<TcpStream> {
293 let mut backoff = 1;
294
295 // Try to accept a few times
296 loop {
297 // Perform the accept operation. If a socket is successfully
298 // accepted, return it. Otherwise, save the error.
299 match self.listener.accept().await {
300 Ok((socket, _)) => return Ok(socket),
301 Err(err) => {
302 if backoff > 64 {
303 // Accept has failed too many times. Return the error.
304 return Err(err.into());
305 }
306 }
307 }
308
309 // Pause execution until the back off period elapses.
310 time::sleep(Duration::from_secs(backoff)).await;
311
312 // Double the back off
313 backoff *= 2;
314 }
315 }
316}
317
318impl Handler {
319 /// Process a single connection.
320 ///
321 /// Request frames are read from the socket and processed. Responses are
322 /// written back to the socket.
323 ///
324 /// Currently, pipelining is not implemented. Pipelining is the ability to
325 /// process more than one request concurrently per connection without
326 /// interleaving frames. See for more details:
327 /// https://redis.io/topics/pipelining
328 ///
329 /// When the shutdown signal is received, the connection is processed until
330 /// it reaches a safe state, at which point it is terminated.
331 #[instrument(skip(self))]
332 async fn run(&mut self) -> crate::Result<()> {
333 // As long as the shutdown signal has not been received, try to read a
334 // new request frame.
335 while !self.shutdown.is_shutdown() {
336 // While reading a request frame, also listen for the shutdown
337 // signal.
338 let maybe_frame = tokio::select! {
339 res = self.connection.read_frame() => res?,
340 _ = self.shutdown.recv() => {
341 // If a shutdown signal is received, return from `run`.
342 // This will result in the task terminating.
343 return Ok(());
344 }
345 };
346
347 // If `None` is returned from `read_frame()` then the peer closed
348 // the socket. There is no further work to do and the task can be
349 // terminated.
350 let frame = match maybe_frame {
351 Some(frame) => frame,
352 None => return Ok(()),
353 };
354
355 // Convert the redis frame into a command struct. This returns an
356 // error if the frame is not a valid redis command or it is an
357 // unsupported command.
358 let cmd = Command::from_frame(frame)?;
359
360 // Logs the `cmd` object. The syntax here is a shorthand provided by
361 // the `tracing` crate. It can be thought of as similar to:
362 //
363 // ```
364 // debug!(cmd = format!("{:?}", cmd));
365 // ```
366 //
367 // `tracing` provides structured logging, so information is "logged"
368 // as key-value pairs.
369 debug!(?cmd);
370
371 // Perform the work needed to apply the command. This may mutate the
372 // database state as a result.
373 //
374 // The connection is passed into the apply function which allows the
375 // command to write response frames directly to the connection. In
376 // the case of pub/sub, multiple frames may be send back to the
377 // peer.
378 cmd.apply(&self.db, &mut self.connection, &mut self.shutdown)
379 .await?;
380 }
381
382 Ok(())
383 }
384}
385
386impl Drop for Handler {
387 fn drop(&mut self) {
388 // Add a permit back to the semaphore.
389 //
390 // Doing so unblocks the listener if the max number of
391 // connections has been reached.
392 //
393 // This is done in a `Drop` implementation in order to guarantee that
394 // the permit is added even if the task handling the connection panics.
395 // If `add_permit` was called at the end of the `run` function and some
396 // bug causes a panic. The permit would never be returned to the
397 // semaphore.
398 self.limit_connections.add_permits(1);
399 }
400}