use std::{
fmt::{self, Debug},
future::Future,
};
use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, ReserveError, Slice};
use crate::{AsyncWrite, IoResult, util::MISSING_BUF};
pub struct Buffer<B = Vec<u8>>(Option<Slice<B>>);
impl Buffer<Vec<u8>> {
pub fn new() -> Self {
Self(Some(Vec::new().slice(..)))
}
#[inline]
pub fn with_capacity(cap: usize) -> Self {
Self(Some(Vec::with_capacity(cap).slice(..)))
}
#[allow(dead_code)]
pub fn compact_to(&mut self, capacity: usize, max_capacity: usize) {
let inner = self.take_inner();
let pos = inner.begin();
let mut buf = inner.into_inner();
if pos > 0 && pos < buf.len() {
let buf_len = buf.len();
let remaining = buf_len - pos;
buf.copy_within(pos..buf_len, 0);
unsafe {
buf.set_len(remaining);
}
} else if pos >= buf.len() {
buf.clear();
if buf.capacity() > max_capacity {
buf.shrink_to(capacity);
}
}
self.restore_inner(buf.slice(..));
}
}
impl<B> Buffer<B> {
#[inline]
pub(crate) fn take_inner(&mut self) -> Slice<B> {
self.0.take().expect(MISSING_BUF)
}
#[inline]
pub(crate) fn restore_inner(&mut self, buf: Slice<B>) {
debug_assert!(self.0.is_none());
self.0 = Some(buf);
}
#[inline]
pub(crate) fn inner(&self) -> &Slice<B> {
self.0.as_ref().expect(MISSING_BUF)
}
#[inline]
fn inner_mut(&mut self) -> &mut Slice<B> {
self.0.as_mut().expect(MISSING_BUF)
}
#[inline]
fn buf(&self) -> &B {
self.inner().as_inner()
}
#[inline]
fn buf_mut(&mut self) -> &mut B {
self.inner_mut().as_inner_mut()
}
}
impl<B: IoBufMut> Buffer<B> {
pub fn new_with(buf: B) -> Self {
Self(Some(buf.slice(..)))
}
#[inline]
pub fn buffer(&self) -> &[u8] {
self.inner()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.buf().is_empty()
}
#[inline]
pub fn all_done(&self) -> bool {
self.inner().is_empty()
}
#[inline]
pub fn need_fill(&self) -> bool {
self.is_empty()
}
#[inline]
pub fn need_flush(&mut self) -> bool {
let buf = self.buf_mut();
let cap = buf.buf_capacity();
let len = (*buf).buf_len();
len > cap * 2 / 3
}
#[inline]
pub fn reset(&mut self) {
let mut buf = self.take_inner().into_inner();
buf.clear();
self.restore_inner(buf.slice(..));
}
pub fn reserve(&mut self, additional: usize) {
if let Err(ReserveError::ReserveFailed(e)) = self.try_reserve(additional) {
panic!("Buffer reserve failed: {}", e)
}
}
pub fn try_reserve(&mut self, additional: usize) -> Result<(), ReserveError> {
self.inner_mut().reserve(additional)
}
pub async fn with<R, Fut, F>(&mut self, func: F) -> IoResult<R>
where
F: FnOnce(Slice<B>) -> Fut,
Fut: Future<Output = BufResult<R, Slice<B>>>,
{
let BufResult(res, buf) = func(self.take_inner()).await;
self.restore_inner(buf);
res
}
pub fn with_sync<R>(
&mut self,
func: impl FnOnce(Slice<B>) -> BufResult<R, Slice<B>>,
) -> std::io::Result<R> {
let BufResult(res, buf) = func(self.take_inner());
self.restore_inner(buf);
res
}
pub async fn flush_to(&mut self, writer: &mut impl AsyncWrite) -> IoResult<usize> {
if self.inner().is_empty() {
return Ok(0);
}
let mut total = 0;
loop {
let written = self.with(|inner| writer.write(inner)).await?;
if written == 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::WriteZero,
"cannot flush all buffer data",
));
}
total += written;
if self.advance(written) {
break;
}
}
self.reset();
Ok(total)
}
#[inline]
pub fn advance(&mut self, amount: usize) -> bool {
assert!(self.inner().begin() + amount <= self.buf_mut().buf_capacity());
let inner = self.take_inner();
let pos = inner.begin() + amount;
self.restore_inner(inner.into_inner().slice(pos..));
self.all_done()
}
}
impl<B: IoBuf> Debug for Buffer<B> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Buffer")
.field("capacity", &"...") .field("init", &self.buf().buf_len())
.field("progress", &self.inner().begin())
.finish()
}
}