mod copy;
use std::io;
use std::io::ErrorKind;
use async_trait::async_trait;
use futures::io::{ReadHalf, WriteHalf};
use futures::prelude::*;
use futures::{AsyncReadExt, AsyncWriteExt};
pub use copy::copy;
#[async_trait]
pub trait ReadEx: Send {
async fn read2(&mut self, buf: &mut [u8]) -> Result<usize, io::Error>;
async fn read_exact2<'a>(&'a mut self, buf: &'a mut [u8]) -> Result<(), io::Error> {
let mut buf_piece = buf;
while !buf_piece.is_empty() {
let n = self.read2(buf_piece).await?;
if n == 0 {
return Err(ErrorKind::UnexpectedEof.into());
}
let (_, rest) = buf_piece.split_at_mut(n);
buf_piece = rest;
}
Ok(())
}
async fn read_fixed_u32(&mut self) -> Result<usize, io::Error> {
let mut len = [0; 4];
self.read_exact2(&mut len).await?;
let n = u32::from_be_bytes(len) as usize;
Ok(n)
}
async fn read_varint(&mut self) -> Result<usize, io::Error> {
let mut buffer = unsigned_varint::encode::usize_buffer();
let mut buffer_len = 0;
loop {
match self.read2(&mut buffer[buffer_len..=buffer_len]).await? {
0 => {
if buffer_len == 0 {
return Ok(0);
} else {
return Err(io::ErrorKind::UnexpectedEof.into());
}
}
n => debug_assert_eq!(n, 1),
}
buffer_len += 1;
match unsigned_varint::decode::usize(&buffer[..buffer_len]) {
Ok((len, _)) => return Ok(len),
Err(unsigned_varint::decode::Error::Overflow) => {
return Err(io::Error::new(io::ErrorKind::InvalidData, "overflow in variable-length integer"));
}
Err(_) => {}
}
}
}
async fn read_one_fixed(&mut self, max_size: usize) -> Result<Vec<u8>, io::Error> {
let len = self.read_fixed_u32().await?;
if len > max_size {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Received data size over maximum frame length: {}>{}", len, max_size),
));
}
let mut buf = vec![0; len];
self.read_exact2(&mut buf).await?;
Ok(buf)
}
async fn read_one(&mut self, max_size: usize) -> Result<Vec<u8>, io::Error> {
let len = self.read_varint().await?;
if len > max_size {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Received data size over maximum frame length: {}>{}", len, max_size),
));
}
let mut buf = vec![0; len];
self.read_exact2(&mut buf).await?;
Ok(buf)
}
}
#[async_trait]
pub trait WriteEx: Send {
async fn write2(&mut self, buf: &[u8]) -> Result<usize, io::Error>;
async fn write_all2(&mut self, buf: &[u8]) -> Result<(), io::Error> {
let mut buf_piece = buf;
while !buf_piece.is_empty() {
let n = self.write2(buf_piece).await?;
if n == 0 {
return Err(io::ErrorKind::WriteZero.into());
}
let (_, rest) = buf_piece.split_at(n);
buf_piece = rest;
}
Ok(())
}
async fn write_varint(&mut self, len: usize) -> Result<(), io::Error> {
let mut len_data = unsigned_varint::encode::usize_buffer();
let encoded_len = unsigned_varint::encode::usize(len, &mut len_data).len();
self.write_all2(&len_data[..encoded_len]).await?;
Ok(())
}
async fn write_fixed_u32(&mut self, len: usize) -> Result<(), io::Error> {
self.write_all2((len as u32).to_be_bytes().as_ref()).await?;
Ok(())
}
async fn write_one_fixed(&mut self, buf: &[u8]) -> Result<(), io::Error> {
self.write_fixed_u32(buf.len()).await?;
self.write_all2(buf).await?;
self.flush2().await?;
Ok(())
}
async fn write_one(&mut self, buf: &[u8]) -> Result<(), io::Error> {
self.write_varint(buf.len()).await?;
self.write_all2(buf).await?;
self.flush2().await?;
Ok(())
}
async fn flush2(&mut self) -> Result<(), io::Error>;
async fn close2(&mut self) -> Result<(), io::Error>;
}
#[async_trait]
impl<T: AsyncRead + Unpin + Send> ReadEx for T {
async fn read2(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
let n = AsyncReadExt::read(self, buf).await?;
Ok(n)
}
}
#[async_trait]
impl<T: AsyncWrite + Unpin + Send> WriteEx for T {
async fn write2(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
AsyncWriteExt::write(self, buf).await
}
async fn flush2(&mut self) -> Result<(), io::Error> {
AsyncWriteExt::flush(self).await
}
async fn close2(&mut self) -> Result<(), io::Error> {
AsyncWriteExt::close(self).await
}
}
pub trait SplitEx {
type Reader: ReadEx + Unpin;
type Writer: WriteEx + Unpin;
fn split(self) -> (Self::Reader, Self::Writer);
}
impl<T: AsyncRead + AsyncWrite + Send + Unpin> SplitEx for T {
type Reader = ReadHalf<T>;
type Writer = WriteHalf<T>;
fn split(self) -> (Self::Reader, Self::Writer) {
futures::AsyncReadExt::split(self)
}
}
pub trait SplittableReadWrite: ReadEx + WriteEx + SplitEx + Unpin + 'static {}
impl<T: ReadEx + WriteEx + SplitEx + Unpin + 'static> SplittableReadWrite for T {}
#[cfg(test)]
mod tests {
use super::*;
use futures::io::{self, AsyncReadExt, Cursor};
use libp2prs_runtime::task;
struct Test(Cursor<Vec<u8>>);
#[async_trait]
impl ReadEx for Test {
async fn read2(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
self.0.read(buf).await
}
}
#[async_trait]
impl WriteEx for Test {
async fn write2(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
self.0.write(buf).await
}
async fn flush2(&mut self) -> Result<(), io::Error> {
self.0.flush().await
}
async fn close2(&mut self) -> Result<(), io::Error> {
self.0.close().await
}
}
#[test]
fn test_read() {
task::block_on(async {
let mut reader = Test(Cursor::new(vec![1, 2, 3, 4]));
let mut output = [0u8; 3];
let bytes = reader.read2(&mut output[..]).await.unwrap();
assert_eq!(bytes, 3);
assert_eq!(output, [1, 2, 3]);
});
}
#[test]
fn test_read_string() {
task::block_on(async {
let mut reader = Test(Cursor::new(b"hello world".to_vec()));
let mut output = [0u8; 3];
let bytes = reader.read2(&mut output[..]).await.unwrap();
assert_eq!(bytes, 3);
assert_eq!(output, [104, 101, 108]);
});
}
#[test]
fn test_read_exact() {
task::block_on(async {
let mut reader = Test(Cursor::new(vec![1, 2, 3, 4]));
let mut output = [0u8; 3];
let _bytes = reader.read_exact2(&mut output[..]).await;
assert_eq!(output, [1, 2, 3]);
});
}
#[test]
fn test_read_fixed_u32() {
task::block_on(async {
let mut reader = Test(Cursor::new(b"hello world".to_vec()));
let size = reader.read_fixed_u32().await.unwrap();
assert_eq!(size, 1751477356);
});
}
#[test]
fn test_read_varint() {
task::block_on(async {
let mut reader = Test(Cursor::new(vec![1, 2, 3, 4, 5, 6]));
let size = reader.read_varint().await.unwrap();
assert_eq!(size, 1);
});
}
#[test]
fn test_read_one() {
task::block_on(async {
let mut reader = Test(Cursor::new(vec![11, 104, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100]));
let output = match reader.read_one(11).await {
Ok(v) => v,
_ => Vec::new(),
};
assert_eq!(output, b"hello world");
});
}
#[test]
fn test_write() {
task::block_on(async {
let mut writer = Test(Cursor::new(vec![0u8; 5]));
let size = writer.write2(&[1, 2, 3, 4]).await.unwrap();
assert_eq!(size, 4);
assert_eq!(writer.0.get_mut(), &[1, 2, 3, 4, 0])
});
}
#[test]
fn test_write_all2() {
task::block_on(async {
let mut writer = Test(Cursor::new(vec![0u8; 4]));
let output = vec![1, 2, 3, 4, 5];
let _bytes = writer.write_all2(&output[..]).await.unwrap();
assert_eq!(writer.0.get_mut(), &[1, 2, 3, 4, 5]);
});
}
#[test]
fn test_write_fixed_u32() {
task::block_on(async {
let mut writer = Test(Cursor::new(b"hello world".to_vec()));
let _result = writer.write_fixed_u32(1751477356).await.unwrap();
assert_eq!(writer.0.position(), 4);
});
}
#[test]
fn test_write_varint() {
task::block_on(async {
let mut writer = Test(Cursor::new(vec![2, 2, 3, 4, 5, 6]));
let _result = writer.write_varint(1).await.unwrap();
assert_eq!(writer.0.position(), 1);
});
}
#[test]
fn test_write_one() {
task::block_on(async {
let mut writer = Test(Cursor::new(vec![0u8; 0]));
let _result = writer.write_one("hello world".as_ref()).await;
assert_eq!(writer.0.get_mut(), &[11, 104, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100]);
});
}
}