contained_core/connect/connection.rs
1/*
2 Appellation: connection <module>
3 Contrib: FL03 <jo3mccain@icloud.com>
4 Description: This module implements an explicit connection handler that supports the parsing of frames. The connection handler is used by the server and client to handle incoming connections.
5 The primary motivation for this was to support operations on a custom frame
6*/
7use super::{Frame, FrameError};
8use crate::Resultant;
9use bytes::{Buf, BytesMut};
10use std::io::{self, Cursor};
11use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter};
12use tokio::net::TcpStream;
13
14/// Send and receive `Frame` values from a remote peer.
15///
16/// When implementing networking protocols, a message on that protocol is
17/// often composed of several smaller messages known as frames. The purpose of
18/// `Connection` is to read and write frames on the underlying `TcpStream`.
19///
20/// To read frames, the `Connection` uses an internal buffer, which is filled
21/// up until there are enough bytes to create a full frame. Once this happens,
22/// the `Connection` creates the frame and returns it to the caller.
23///
24/// When sending frames, the frame is first encoded into the write buffer.
25/// The contents of the write buffer are then written to the socket.
26#[derive(Debug)]
27pub struct Connection {
28 // The `TcpStream`. It is decorated with a `BufWriter`, which provides write
29 // level buffering. The `BufWriter` implementation provided by Tokio is
30 // sufficient for our needs.
31 stream: BufWriter<TcpStream>,
32
33 // The buffer for reading frames. Unfortunately, Tokio's `BufReader`
34 // currently requires you to empty its buffer before you can ask it to
35 // retrieve more data from the underlying stream, so we have to manually
36 // implement buffering. This should be fixed in Tokio v0.3.
37 buffer: BytesMut,
38}
39
40impl Connection {
41 /// Create a new `Connection`, backed by `socket`. Read and write buffers
42 /// are initialized.
43 pub fn new(socket: TcpStream) -> Connection {
44 Connection {
45 stream: BufWriter::new(socket),
46 // Default to a 4KB read buffer. For the use case of mini redis,
47 // this is fine. However, real applications will want to tune this
48 // value to their specific use case. There is a high likelihood that
49 // a larger read buffer will work better.
50 buffer: BytesMut::with_capacity(4 * 1024),
51 }
52 }
53
54 /// Read a single `Frame` value from the underlying stream.
55 ///
56 /// The function waits until it has retrieved enough data to parse a frame.
57 /// Any data remaining in the read buffer after the frame has been parsed is
58 /// kept there for the next call to `read_frame`.
59 ///
60 /// # Returns
61 ///
62 /// On success, the received frame is returned. If the `TcpStream`
63 /// is closed in a way that doesn't break a frame in half, it returns
64 /// `None`. Otherwise, an error is returned.
65 pub async fn read_frame(&mut self) -> Resultant<Option<Frame>> {
66 loop {
67 // Attempt to parse a frame from the buffered data. If enough data
68 // has been buffered, the frame is returned.
69 if let Some(frame) = self.parse_frame()? {
70 return Ok(Some(frame));
71 }
72
73 // There is not enough buffered data to read a frame. Attempt to
74 // read more data from the socket.
75 //
76 // On success, the number of bytes is returned. `0` indicates "end
77 // of stream".
78 if 0 == self.stream.read_buf(&mut self.buffer).await? {
79 // The remote closed the connection. For this to be a clean
80 // shutdown, there should be no data in the read buffer. If
81 // there is, this means that the peer closed the socket while
82 // sending a frame.
83 if self.buffer.is_empty() {
84 return Ok(None);
85 } else {
86 return Err(crate::Error::ConnectionError(
87 "connection closed before entire frame was received".to_string(),
88 ));
89 }
90 }
91 }
92 }
93
94 /// Tries to parse a frame from the buffer. If the buffer contains enough
95 /// data, the frame is returned and the data removed from the buffer. If not
96 /// enough data has been buffered yet, `Ok(None)` is returned. If the
97 /// buffered data does not represent a valid frame, `Err` is returned.
98 fn parse_frame(&mut self) -> Resultant<Option<Frame>> {
99 // Cursor is used to track the "current" location in the
100 // buffer. Cursor also implements `Buf` from the `bytes` crate
101 // which provides a number of helpful utilities for working
102 // with bytes.
103 let mut buf = Cursor::new(&self.buffer[..]);
104
105 // The first step is to check if enough data has been buffered to parse
106 // a single frame. This step is usually much faster than doing a full
107 // parse of the frame, and allows us to skip allocating data structures
108 // to hold the frame data unless we know the full frame has been
109 // received.
110 match Frame::check(&mut buf) {
111 Ok(_) => {
112 // The `check` function will have advanced the cursor until the
113 // end of the frame. Since the cursor had position set to zero
114 // before `Frame::check` was called, we obtain the length of the
115 // frame by checking the cursor position.
116 let len = buf.position() as usize;
117
118 // Reset the position to zero before passing the cursor to
119 // `Frame::parse`.
120 buf.set_position(0);
121
122 // Parse the frame from the buffer. This allocates the necessary
123 // structures to represent the frame and returns the frame
124 // value.
125 //
126 // If the encoded frame representation is invalid, an error is
127 // returned. This should terminate the **current** connection
128 // but should not impact any other connected client.
129 let frame = Frame::parse(&mut buf)?;
130
131 // Discard the parsed data from the read buffer.
132 //
133 // When `advance` is called on the read buffer, all of the data
134 // up to `len` is discarded. The details of how this works is
135 // left to `BytesMut`. This is often done by moving an internal
136 // cursor, but it may be done by reallocating and copying data.
137 self.buffer.advance(len);
138
139 // Return the parsed frame to the caller.
140 Ok(Some(frame))
141 }
142 // There is not enough data present in the read buffer to parse a
143 // single frame. We must wait for more data to be received from the
144 // socket. Reading from the socket will be done in the statement
145 // after this `match`.
146 //
147 // We do not want to return `Err` from here as this "error" is an
148 // expected runtime condition.
149 Err(err) => match err {
150 FrameError::Incomplete => Ok(None),
151 FrameError::Other(e) => {
152 // An actual error was encountered while parsing the frame.
153 Err(e)
154 }
155 },
156 }
157 }
158
159 /// Write a single `Frame` value to the underlying stream.
160 ///
161 /// The `Frame` value is written to the socket using the various `write_*`
162 /// functions provided by `AsyncWrite`. Calling these functions directly on
163 /// a `TcpStream` is **not** advised, as this will result in a large number of
164 /// syscalls. However, it is fine to call these functions on a *buffered*
165 /// write stream. The data will be written to the buffer. Once the buffer is
166 /// full, it is flushed to the underlying socket.
167 pub async fn write_frame(&mut self, frame: &Frame) -> io::Result<()> {
168 // Arrays are encoded by encoding each entry. All other frame types are
169 // considered literals. For now, mini-redis is not able to encode
170 // recursive frame structures. See below for more details.
171 match frame {
172 Frame::Array(val) => {
173 // Encode the frame type prefix. For an array, it is `*`.
174 self.stream.write_u8(b'*').await?;
175
176 // Encode the length of the array.
177 self.write_decimal(val.len() as u64).await?;
178
179 // Iterate and encode each entry in the array.
180 for entry in &**val {
181 self.write_value(entry).await?;
182 }
183 }
184 // The frame type is a literal. Encode the value directly.
185 _ => self.write_value(frame).await?,
186 }
187
188 // Ensure the encoded frame is written to the socket. The calls above
189 // are to the buffered stream and writes. Calling `flush` writes the
190 // remaining contents of the buffer to the socket.
191 self.stream.flush().await
192 }
193
194 /// Write a frame literal to the stream
195 async fn write_value(&mut self, frame: &Frame) -> io::Result<()> {
196 match frame {
197 Frame::Simple(val) => {
198 self.stream.write_u8(b'+').await?;
199 self.stream.write_all(val.as_bytes()).await?;
200 self.stream.write_all(b"\r\n").await?;
201 }
202 Frame::Error(val) => {
203 self.stream.write_u8(b'-').await?;
204 self.stream.write_all(val.as_bytes()).await?;
205 self.stream.write_all(b"\r\n").await?;
206 }
207 Frame::Integer(val) => {
208 self.stream.write_u8(b':').await?;
209 self.write_decimal(*val).await?;
210 }
211 Frame::Null => {
212 self.stream.write_all(b"$-1\r\n").await?;
213 }
214 Frame::Bulk(val) => {
215 let len = val.len();
216
217 self.stream.write_u8(b'$').await?;
218 self.write_decimal(len as u64).await?;
219 self.stream.write_all(val).await?;
220 self.stream.write_all(b"\r\n").await?;
221 }
222 // Encoding an `Array` from within a value cannot be done using a
223 // recursive strategy. In general, async fns do not support
224 // recursion. Mini-redis has not needed to encode nested arrays yet,
225 // so for now it is skipped.
226 Frame::Array(_val) => unreachable!(),
227 }
228
229 Ok(())
230 }
231
232 /// Write a decimal frame to the stream
233 async fn write_decimal(&mut self, val: u64) -> io::Result<()> {
234 use std::io::Write;
235
236 // Convert the value to a string
237 let mut buf = [0u8; 12];
238 let mut buf = Cursor::new(&mut buf[..]);
239 write!(&mut buf, "{}", val)?;
240
241 let pos = buf.position() as usize;
242 self.stream.write_all(&buf.get_ref()[..pos]).await?;
243 self.stream.write_all(b"\r\n").await?;
244
245 Ok(())
246 }
247}