use crate::deserializer::{self, ClientResult};
use crate::{Query, Response};
use bytes::{Buf, BytesMut};
pub use std::io::Result as IoResult;
use std::io::{Error, ErrorKind};
use tokio::io::AsyncReadExt;
use tokio::net::TcpStream;
const BUF_CAP: usize = 4096;
pub struct Connection {
stream: TcpStream,
buffer: BytesMut,
}
impl Connection {
pub async fn new(host: &str, port: u16) -> IoResult<Self> {
let stream = TcpStream::connect((host, port)).await?;
Ok(Connection {
stream: stream,
buffer: BytesMut::with_capacity(BUF_CAP),
})
}
pub async fn run_simple_query(&mut self, mut query: Query) -> IoResult<Response> {
match query.write_query_to(&mut self.stream).await {
Ok(_) => (),
Err(e) => {
return Err(e);
}
};
loop {
match self.stream.read_buf(&mut self.buffer).await {
Ok(_) => (),
Err(e) => return Err(e),
}
match self.try_response().await {
ClientResult::Empty => break Err(Error::from(ErrorKind::ConnectionReset)),
ClientResult::Incomplete => {
continue;
}
ClientResult::SimpleResponse(r, f) => {
self.buffer.advance(f);
break Ok(Response::Array(r));
}
ClientResult::ResponseItem(r, f) => {
self.buffer.advance(f);
break Ok(Response::Item(r));
}
ClientResult::InvalidResponse => {
self.buffer.clear();
break Ok(Response::InvalidResponse);
}
ClientResult::ParseError => {
self.buffer.clear();
break Ok(Response::ParseError);
}
ClientResult::PipelinedResponse(_, _) => {
todo!("Pipelined queries haven't been implemented yet!")
}
}
}
}
async fn try_response(&mut self) -> ClientResult {
if self.buffer.is_empty() {
return ClientResult::Empty;
}
deserializer::parse(&self.buffer)
}
}