contained_core/connect/
connection.rs

1/*
2    Appellation: connection <module>
3    Contrib: FL03 <jo3mccain@icloud.com>
4    Description: This module implements an explicit connection handler that supports the parsing of frames. The connection handler is used by the server and client to handle incoming connections.
5        The primary motivation for this was to support operations on a custom frame
6*/
7use super::{Frame, FrameError};
8use crate::Resultant;
9use bytes::{Buf, BytesMut};
10use std::io::{self, Cursor};
11use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter};
12use tokio::net::TcpStream;
13
14/// Send and receive `Frame` values from a remote peer.
15///
16/// When implementing networking protocols, a message on that protocol is
17/// often composed of several smaller messages known as frames. The purpose of
18/// `Connection` is to read and write frames on the underlying `TcpStream`.
19///
20/// To read frames, the `Connection` uses an internal buffer, which is filled
21/// up until there are enough bytes to create a full frame. Once this happens,
22/// the `Connection` creates the frame and returns it to the caller.
23///
24/// When sending frames, the frame is first encoded into the write buffer.
25/// The contents of the write buffer are then written to the socket.
26#[derive(Debug)]
27pub struct Connection {
28    // The `TcpStream`. It is decorated with a `BufWriter`, which provides write
29    // level buffering. The `BufWriter` implementation provided by Tokio is
30    // sufficient for our needs.
31    stream: BufWriter<TcpStream>,
32
33    // The buffer for reading frames. Unfortunately, Tokio's `BufReader`
34    // currently requires you to empty its buffer before you can ask it to
35    // retrieve more data from the underlying stream, so we have to manually
36    // implement buffering. This should be fixed in Tokio v0.3.
37    buffer: BytesMut,
38}
39
40impl Connection {
41    /// Create a new `Connection`, backed by `socket`. Read and write buffers
42    /// are initialized.
43    pub fn new(socket: TcpStream) -> Connection {
44        Connection {
45            stream: BufWriter::new(socket),
46            // Default to a 4KB read buffer. For the use case of mini redis,
47            // this is fine. However, real applications will want to tune this
48            // value to their specific use case. There is a high likelihood that
49            // a larger read buffer will work better.
50            buffer: BytesMut::with_capacity(4 * 1024),
51        }
52    }
53
54    /// Read a single `Frame` value from the underlying stream.
55    ///
56    /// The function waits until it has retrieved enough data to parse a frame.
57    /// Any data remaining in the read buffer after the frame has been parsed is
58    /// kept there for the next call to `read_frame`.
59    ///
60    /// # Returns
61    ///
62    /// On success, the received frame is returned. If the `TcpStream`
63    /// is closed in a way that doesn't break a frame in half, it returns
64    /// `None`. Otherwise, an error is returned.
65    pub async fn read_frame(&mut self) -> Resultant<Option<Frame>> {
66        loop {
67            // Attempt to parse a frame from the buffered data. If enough data
68            // has been buffered, the frame is returned.
69            if let Some(frame) = self.parse_frame()? {
70                return Ok(Some(frame));
71            }
72
73            // There is not enough buffered data to read a frame. Attempt to
74            // read more data from the socket.
75            //
76            // On success, the number of bytes is returned. `0` indicates "end
77            // of stream".
78            if 0 == self.stream.read_buf(&mut self.buffer).await? {
79                // The remote closed the connection. For this to be a clean
80                // shutdown, there should be no data in the read buffer. If
81                // there is, this means that the peer closed the socket while
82                // sending a frame.
83                if self.buffer.is_empty() {
84                    return Ok(None);
85                } else {
86                    return Err(crate::Error::ConnectionError(
87                        "connection closed before entire frame was received".to_string(),
88                    ));
89                }
90            }
91        }
92    }
93
94    /// Tries to parse a frame from the buffer. If the buffer contains enough
95    /// data, the frame is returned and the data removed from the buffer. If not
96    /// enough data has been buffered yet, `Ok(None)` is returned. If the
97    /// buffered data does not represent a valid frame, `Err` is returned.
98    fn parse_frame(&mut self) -> Resultant<Option<Frame>> {
99        // Cursor is used to track the "current" location in the
100        // buffer. Cursor also implements `Buf` from the `bytes` crate
101        // which provides a number of helpful utilities for working
102        // with bytes.
103        let mut buf = Cursor::new(&self.buffer[..]);
104
105        // The first step is to check if enough data has been buffered to parse
106        // a single frame. This step is usually much faster than doing a full
107        // parse of the frame, and allows us to skip allocating data structures
108        // to hold the frame data unless we know the full frame has been
109        // received.
110        match Frame::check(&mut buf) {
111            Ok(_) => {
112                // The `check` function will have advanced the cursor until the
113                // end of the frame. Since the cursor had position set to zero
114                // before `Frame::check` was called, we obtain the length of the
115                // frame by checking the cursor position.
116                let len = buf.position() as usize;
117
118                // Reset the position to zero before passing the cursor to
119                // `Frame::parse`.
120                buf.set_position(0);
121
122                // Parse the frame from the buffer. This allocates the necessary
123                // structures to represent the frame and returns the frame
124                // value.
125                //
126                // If the encoded frame representation is invalid, an error is
127                // returned. This should terminate the **current** connection
128                // but should not impact any other connected client.
129                let frame = Frame::parse(&mut buf)?;
130
131                // Discard the parsed data from the read buffer.
132                //
133                // When `advance` is called on the read buffer, all of the data
134                // up to `len` is discarded. The details of how this works is
135                // left to `BytesMut`. This is often done by moving an internal
136                // cursor, but it may be done by reallocating and copying data.
137                self.buffer.advance(len);
138
139                // Return the parsed frame to the caller.
140                Ok(Some(frame))
141            }
142            // There is not enough data present in the read buffer to parse a
143            // single frame. We must wait for more data to be received from the
144            // socket. Reading from the socket will be done in the statement
145            // after this `match`.
146            //
147            // We do not want to return `Err` from here as this "error" is an
148            // expected runtime condition.
149            Err(err) => match err {
150                FrameError::Incomplete => Ok(None),
151                FrameError::Other(e) => {
152                    // An actual error was encountered while parsing the frame.
153                    Err(e)
154                }
155            },
156        }
157    }
158
159    /// Write a single `Frame` value to the underlying stream.
160    ///
161    /// The `Frame` value is written to the socket using the various `write_*`
162    /// functions provided by `AsyncWrite`. Calling these functions directly on
163    /// a `TcpStream` is **not** advised, as this will result in a large number of
164    /// syscalls. However, it is fine to call these functions on a *buffered*
165    /// write stream. The data will be written to the buffer. Once the buffer is
166    /// full, it is flushed to the underlying socket.
167    pub async fn write_frame(&mut self, frame: &Frame) -> io::Result<()> {
168        // Arrays are encoded by encoding each entry. All other frame types are
169        // considered literals. For now, mini-redis is not able to encode
170        // recursive frame structures. See below for more details.
171        match frame {
172            Frame::Array(val) => {
173                // Encode the frame type prefix. For an array, it is `*`.
174                self.stream.write_u8(b'*').await?;
175
176                // Encode the length of the array.
177                self.write_decimal(val.len() as u64).await?;
178
179                // Iterate and encode each entry in the array.
180                for entry in &**val {
181                    self.write_value(entry).await?;
182                }
183            }
184            // The frame type is a literal. Encode the value directly.
185            _ => self.write_value(frame).await?,
186        }
187
188        // Ensure the encoded frame is written to the socket. The calls above
189        // are to the buffered stream and writes. Calling `flush` writes the
190        // remaining contents of the buffer to the socket.
191        self.stream.flush().await
192    }
193
194    /// Write a frame literal to the stream
195    async fn write_value(&mut self, frame: &Frame) -> io::Result<()> {
196        match frame {
197            Frame::Simple(val) => {
198                self.stream.write_u8(b'+').await?;
199                self.stream.write_all(val.as_bytes()).await?;
200                self.stream.write_all(b"\r\n").await?;
201            }
202            Frame::Error(val) => {
203                self.stream.write_u8(b'-').await?;
204                self.stream.write_all(val.as_bytes()).await?;
205                self.stream.write_all(b"\r\n").await?;
206            }
207            Frame::Integer(val) => {
208                self.stream.write_u8(b':').await?;
209                self.write_decimal(*val).await?;
210            }
211            Frame::Null => {
212                self.stream.write_all(b"$-1\r\n").await?;
213            }
214            Frame::Bulk(val) => {
215                let len = val.len();
216
217                self.stream.write_u8(b'$').await?;
218                self.write_decimal(len as u64).await?;
219                self.stream.write_all(val).await?;
220                self.stream.write_all(b"\r\n").await?;
221            }
222            // Encoding an `Array` from within a value cannot be done using a
223            // recursive strategy. In general, async fns do not support
224            // recursion. Mini-redis has not needed to encode nested arrays yet,
225            // so for now it is skipped.
226            Frame::Array(_val) => unreachable!(),
227        }
228
229        Ok(())
230    }
231
232    /// Write a decimal frame to the stream
233    async fn write_decimal(&mut self, val: u64) -> io::Result<()> {
234        use std::io::Write;
235
236        // Convert the value to a string
237        let mut buf = [0u8; 12];
238        let mut buf = Cursor::new(&mut buf[..]);
239        write!(&mut buf, "{}", val)?;
240
241        let pos = buf.position() as usize;
242        self.stream.write_all(&buf.get_ref()[..pos]).await?;
243        self.stream.write_all(b"\r\n").await?;
244
245        Ok(())
246    }
247}