use crate::Frame;
use crate::RedisError;
use crate::Result;
use anyhow::anyhow;
use bytes::Buf;
use bytes::{Bytes, BytesMut};
use std::io::Cursor;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter};
use tokio::net::TcpStream;
const MAX_BUFFER_SIZE: usize = 512 * 1024 * 1024;
pub struct Connection {
stream: BufWriter<TcpStream>,
buffer: BytesMut,
}
impl Connection {
pub fn new(stream: TcpStream) -> Self {
Self {
stream: BufWriter::new(stream),
buffer: BytesMut::with_capacity(MAX_BUFFER_SIZE),
}
}
pub async fn read_frame(&mut self) -> Result<Option<Frame>> {
loop {
if let Some(frame) = self.try_parse_frame().await? {
return Ok(Some(frame));
}
if let Ok(0) = self.stream.read_buf(&mut self.buffer).await {
if self.buffer.is_empty() {
return Ok(None);
} else {
return Err(RedisError::Other(anyhow!("Stream closed")));
}
}
}
}
pub async fn write_frame(&mut self, frame: &Frame) -> Result<()> {
let bytes: Bytes = frame.serialize().await?;
self.stream.write_all(&bytes).await?;
self.stream.flush().await?;
Ok(())
}
async fn try_parse_frame(&mut self) -> Result<Option<Frame>> {
let mut cursor: Cursor<&[u8]> = Cursor::new(&self.buffer[..]);
match Frame::try_parse(&mut cursor) {
Ok(frame) => {
self.buffer.advance(cursor.position() as usize);
Ok(Some(frame))
}
Err(err) => {
if let RedisError::IncompleteFrame = err {
Ok(None)
} else {
Err(err)
}
}
}
}
}