redis_asyncx/
connection.rs

1use crate::Frame;
2use crate::RedisError;
3use crate::Result;
4use anyhow::anyhow;
5use bytes::Buf;
6use bytes::{Bytes, BytesMut};
7use std::io::Cursor;
8use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter};
9use tokio::net::TcpStream;
10
11// 512 MB = 512 * 1024 * 1024 bytes
12const MAX_BUFFER_SIZE: usize = 512 * 1024 * 1024;
13
14/// Represents a connection bewteen the client and the Redis server.
15///
16/// The connecton wraps a TCP stream and a buffer for reading and writing Frames.
17///
18/// To read Frames, the connection waits asynchronously until there is enough data to parse a Frame.
19/// On success, it deserializes the bytes into a Frame and returns it to the client.
20///
21/// To write Frames, the connection serializes the Frame into bytes and writes it to the stream.
22/// It then flushes the stream to ensure the data is sent to the server.
23pub struct Connection {
24    stream: BufWriter<TcpStream>,
25    buffer: BytesMut,
26}
27
28impl Connection {
29    /// Creates a new connection from a TCP stream. The stream is wrapped in a write buffer.
30    /// It also initializes a read buffer for reading from the TCP stream. The read buffer is 4kb.
31    pub fn new(stream: TcpStream) -> Self {
32        Self {
33            stream: BufWriter::new(stream),
34            // 512MB buffer for each connection
35            buffer: BytesMut::with_capacity(MAX_BUFFER_SIZE),
36        }
37    }
38
39    /// Reads a single Redis Frame from the TCP stream.
40    ///
41    /// The method reads from the stream into the buffer until it has a complete Frame.
42    /// It then parses the Frame and returns it to the client.
43    ///
44    /// # Returns
45    ///
46    /// An Option containing the Frame if it was successfully read and parsed.
47    /// None if the Frame is incomplete and more data is needed.
48    pub async fn read_frame(&mut self) -> Result<Option<Frame>> {
49        loop {
50            if let Some(frame) = self.try_parse_frame().await? {
51                return Ok(Some(frame));
52            }
53
54            // read from the stream into the buffer until we have a frame
55            if let Ok(0) = self.stream.read_buf(&mut self.buffer).await {
56                if self.buffer.is_empty() {
57                    return Ok(None);
58                } else {
59                    return Err(RedisError::Other(anyhow!("Stream closed")));
60                }
61            }
62        }
63    }
64
65    /// Writes a single Redis Frame to the TCP stream.
66    ///
67    /// The method serializes the Frame into bytes and writes it to the stream.
68    /// It then flushes the stream to ensure the data is sent to the server.
69    ///
70    /// # Arguments
71    ///
72    /// * `frame` - A reference to the Frame to be written to the stream
73    ///
74    /// # Returns
75    ///
76    /// A Result indicating success or failure
77    pub async fn write_frame(&mut self, frame: &Frame) -> Result<()> {
78        let bytes: Bytes = frame.serialize().await?;
79
80        self.stream.write_all(&bytes).await?;
81        self.stream.flush().await?;
82
83        Ok(())
84    }
85
86    /// Tries to parse a single Redis Frame from the buffer.
87    ///
88    /// The method checks if the buffer contains a complete Frame.
89    /// If it does, it deserializes the bytes into a Frame and returns it to the client.
90    /// If the Frame is incomplete, it returns None.
91    ///
92    /// # Returns
93    ///
94    /// An Option containing the Frame if it was successfully read and parsed.
95    /// None if the Frame is incomplete and more data is needed.
96    /// An error if the Frame is invalid.
97    async fn try_parse_frame(&mut self) -> Result<Option<Frame>> {
98        let mut cursor: Cursor<&[u8]> = Cursor::new(&self.buffer[..]);
99
100        match Frame::try_parse(&mut cursor) {
101            Ok(frame) => {
102                self.buffer.advance(cursor.position() as usize);
103                Ok(Some(frame))
104            }
105            Err(err) => {
106                if let RedisError::IncompleteFrame = err {
107                    Ok(None)
108                } else {
109                    Err(err)
110                }
111            }
112        }
113    }
114}