use async_std::io::{
prelude::{ReadExt, WriteExt},
Read, Write,
};
use std::io;
pub struct BufStream<S> {
pub(crate) stream: S,
stream_eof: bool,
wbuf: Vec<u8>,
rbuf: Vec<u8>,
rbuf_rindex: usize,
rbuf_windex: usize,
}
impl<S> BufStream<S>
where
S: Read + Write + Unpin,
{
pub fn new(stream: S) -> Self {
Self {
stream,
stream_eof: false,
wbuf: Vec::with_capacity(1024),
rbuf: vec![0; 8 * 1024],
rbuf_rindex: 0,
rbuf_windex: 0,
}
}
#[inline]
pub fn buffer_mut(&mut self) -> &mut Vec<u8> {
&mut self.wbuf
}
#[inline]
pub async fn flush(&mut self) -> io::Result<()> {
if !self.wbuf.is_empty() {
self.stream.write_all(&self.wbuf).await?;
self.wbuf.clear();
}
Ok(())
}
#[inline]
pub fn consume(&mut self, cnt: usize) {
self.rbuf_rindex += cnt;
}
pub async fn peek(&mut self, cnt: usize) -> io::Result<Option<&[u8]>> {
loop {
if self.stream_eof {
return Ok(None);
}
if self.rbuf_windex >= (self.rbuf_rindex + cnt) {
let buf = &self.rbuf[self.rbuf_rindex..(self.rbuf_rindex + cnt)];
return Ok(Some(buf));
}
if self.rbuf.len() < (self.rbuf_windex + cnt) {
debug_assert_eq!(self.rbuf_rindex, self.rbuf_windex);
self.rbuf_rindex = 0;
self.rbuf_windex = 0;
}
let n = self.stream.read(&mut self.rbuf[self.rbuf_windex..]).await?;
self.rbuf_windex += n;
if n == 0 {
self.stream_eof = true;
}
}
}
}
#[allow(unused)]
macro_rules! ret_if_none {
($val:expr) => {
match $val {
Some(val) => val,
None => {
return Ok(None);
}
}
};
}