#![allow(unused)]
use bytes::BytesMut;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::io::{Cursor, Error};
use thiserror::Error;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter};
use tokio::net::{TcpStream, ToSocketAddrs};
static DEFAULT_BUFFER_SIZE: usize = 4 * 1024;
#[derive(Error, Debug)]
pub enum ConnectionError {
#[error("`{0}`")]
IoError(Error),
#[error("`{0}`")]
BincodeError(Box<bincode::Error>),
#[error("`{0}`")]
ConnectionReset(String),
}
pub struct Connection {
buffer: BytesMut,
stream: BufWriter<TcpStream>,
}
impl Connection {
pub async fn dial<A: ToSocketAddrs>(addr: A) -> Result<Connection, ConnectionError> {
let stream = TcpStream::connect(addr).await?;
Ok(Connection::new(stream))
}
pub async fn dial_with_capacity<A: ToSocketAddrs>(
addr: A,
capacity: usize,
) -> Result<Connection, ConnectionError> {
let stream = TcpStream::connect(addr).await?;
Ok(Connection::new_with_capacity(stream, capacity))
}
pub fn new(stream: TcpStream) -> Self {
Self::new_with_capacity(stream, DEFAULT_BUFFER_SIZE)
}
pub fn new_with_capacity(stream: TcpStream, capacity: usize) -> Self {
Self {
buffer: BytesMut::with_capacity(capacity),
stream: BufWriter::new(stream),
}
}
pub async fn write<T: Serialize>(&mut self, value: &T) -> Result<(), ConnectionError> {
let buf = bincode::serialize(value)?;
self.write_to_stream(&buf).await?;
Ok(())
}
pub async fn read<T: DeserializeOwned>(&mut self) -> Result<Option<T>, ConnectionError> {
loop {
if let Some(value) = self.parse_value()? {
self.buffer.clear();
return Ok(Some(value));
}
self.read_to_buffer().await?;
}
}
fn parse_value<T: DeserializeOwned>(&mut self) -> Result<Option<T>, ConnectionError> {
let mut buf = Cursor::new(&self.buffer[..]);
match bincode::deserialize_from(&mut buf) {
Ok(value) => Ok(Some(value)),
Err(_) => Ok(None),
}
}
async fn write_to_stream(&mut self, buf: &[u8]) -> Result<(), ConnectionError> {
self.stream.write_all(buf).await?;
self.stream.flush().await?;
Ok(())
}
async fn read_to_buffer(&mut self) -> Result<(), ConnectionError> {
if 0 == self.stream.read_buf(&mut self.buffer).await? {
return if self.buffer.is_empty() {
Ok(())
} else {
Err(ConnectionError::ConnectionReset(
"connection reset by peer".into(),
))
};
}
Ok(())
}
}
impl From<std::io::Error> for ConnectionError {
fn from(e: std::io::Error) -> Self {
ConnectionError::IoError(e)
}
}
impl From<Box<bincode::ErrorKind>> for ConnectionError {
fn from(e: Box<bincode::ErrorKind>) -> Self {
ConnectionError::BincodeError(Box::new(e))
}
}