redis_asyncx/connection.rs
1use crate::Frame;
2use crate::RedisError;
3use crate::Result;
4use anyhow::anyhow;
5use bytes::Buf;
6use bytes::{Bytes, BytesMut};
7use std::io::Cursor;
8use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter};
9use tokio::net::TcpStream;
10
11// 512 MB = 512 * 1024 * 1024 bytes
12const MAX_BUFFER_SIZE: usize = 512 * 1024 * 1024;
13
14/// Represents a connection bewteen the client and the Redis server.
15///
16/// The connecton wraps a TCP stream and a buffer for reading and writing Frames.
17///
18/// To read Frames, the connection waits asynchronously until there is enough data to parse a Frame.
19/// On success, it deserializes the bytes into a Frame and returns it to the client.
20///
21/// To write Frames, the connection serializes the Frame into bytes and writes it to the stream.
22/// It then flushes the stream to ensure the data is sent to the server.
23pub struct Connection {
24 stream: BufWriter<TcpStream>,
25 buffer: BytesMut,
26}
27
28impl Connection {
29 /// Creates a new connection from a TCP stream. The stream is wrapped in a write buffer.
30 /// It also initializes a read buffer for reading from the TCP stream. The read buffer is 4kb.
31 pub fn new(stream: TcpStream) -> Self {
32 Self {
33 stream: BufWriter::new(stream),
34 // 512MB buffer for each connection
35 buffer: BytesMut::with_capacity(MAX_BUFFER_SIZE),
36 }
37 }
38
39 /// Reads a single Redis Frame from the TCP stream.
40 ///
41 /// The method reads from the stream into the buffer until it has a complete Frame.
42 /// It then parses the Frame and returns it to the client.
43 ///
44 /// # Returns
45 ///
46 /// An Option containing the Frame if it was successfully read and parsed.
47 /// None if the Frame is incomplete and more data is needed.
48 pub async fn read_frame(&mut self) -> Result<Option<Frame>> {
49 loop {
50 if let Some(frame) = self.try_parse_frame().await? {
51 return Ok(Some(frame));
52 }
53
54 // read from the stream into the buffer until we have a frame
55 if let Ok(0) = self.stream.read_buf(&mut self.buffer).await {
56 if self.buffer.is_empty() {
57 return Ok(None);
58 } else {
59 return Err(RedisError::Other(anyhow!("Stream closed")));
60 }
61 }
62 }
63 }
64
65 /// Writes a single Redis Frame to the TCP stream.
66 ///
67 /// The method serializes the Frame into bytes and writes it to the stream.
68 /// It then flushes the stream to ensure the data is sent to the server.
69 ///
70 /// # Arguments
71 ///
72 /// * `frame` - A reference to the Frame to be written to the stream
73 ///
74 /// # Returns
75 ///
76 /// A Result indicating success or failure
77 pub async fn write_frame(&mut self, frame: &Frame) -> Result<()> {
78 let bytes: Bytes = frame.serialize().await?;
79
80 self.stream.write_all(&bytes).await?;
81 self.stream.flush().await?;
82
83 Ok(())
84 }
85
86 /// Tries to parse a single Redis Frame from the buffer.
87 ///
88 /// The method checks if the buffer contains a complete Frame.
89 /// If it does, it deserializes the bytes into a Frame and returns it to the client.
90 /// If the Frame is incomplete, it returns None.
91 ///
92 /// # Returns
93 ///
94 /// An Option containing the Frame if it was successfully read and parsed.
95 /// None if the Frame is incomplete and more data is needed.
96 /// An error if the Frame is invalid.
97 async fn try_parse_frame(&mut self) -> Result<Option<Frame>> {
98 let mut cursor: Cursor<&[u8]> = Cursor::new(&self.buffer[..]);
99
100 match Frame::try_parse(&mut cursor) {
101 Ok(frame) => {
102 self.buffer.advance(cursor.position() as usize);
103 Ok(Some(frame))
104 }
105 Err(err) => {
106 if let RedisError::IncompleteFrame = err {
107 Ok(None)
108 } else {
109 Err(err)
110 }
111 }
112 }
113 }
114}