use core::{
fmt,
ops::{Deref, DerefMut},
};
use std::io;
use tracing::trace;
use xitca_io::bytes::{Buf, BytesMut};
use xitca_unsafe_collection::{
bytes::{read_buf, BufList, ChunkVectoredUninit},
uninit::uninit_array,
};
pub use xitca_io::bytes::{BufInterest, BufRead, BufWrite};
#[derive(Debug)]
pub struct ReadBuf<const LIMIT: usize>(BytesMut);
impl<const LIMIT: usize> ReadBuf<LIMIT> {
#[inline(always)]
pub fn new() -> Self {
Self(BytesMut::new())
}
#[inline(always)]
pub fn into_inner(self) -> BytesMut {
self.0
}
}
impl<const LIMIT: usize> From<BytesMut> for ReadBuf<LIMIT> {
fn from(bytes: BytesMut) -> Self {
Self(bytes)
}
}
impl<const LIMIT: usize> Default for ReadBuf<LIMIT> {
fn default() -> Self {
Self::new()
}
}
impl<const LIMIT: usize> Deref for ReadBuf<LIMIT> {
type Target = BytesMut;
#[inline]
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<const LIMIT: usize> DerefMut for ReadBuf<LIMIT> {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<const LIMIT: usize> BufInterest for ReadBuf<LIMIT> {
#[inline]
fn want_write_buf(&self) -> bool {
self.0.remaining() < LIMIT
}
fn want_write_io(&self) -> bool {
unimplemented!()
}
}
impl<const LIMIT: usize> BufRead for ReadBuf<LIMIT> {
fn do_io<Io>(&mut self, io: &mut Io) -> io::Result<()>
where
Io: io::Read,
{
let len = self.0.len();
loop {
match read_buf(io, &mut self.0) {
Ok(0) => {
if self.0.len() == len {
return Err(io::ErrorKind::UnexpectedEof.into());
}
break;
}
Ok(_) => {
if !self.want_write_buf() {
trace!(
"READ_BUF_LIMIT: {LIMIT} bytes reached. Entering backpressure(no log event for recovery)."
);
break;
}
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
Err(e) => {
if self.0.len() == len {
return Err(e);
}
break;
}
}
}
Ok(())
}
}
#[derive(Default)]
pub struct WriteBuf<const LIMIT: usize>(xitca_io::bytes::WriteBuf);
impl<const LIMIT: usize> WriteBuf<LIMIT> {
#[inline]
pub fn new() -> Self {
Self(xitca_io::bytes::WriteBuf::new())
}
#[cfg(test)]
pub fn buf(&self) -> &[u8] {
self.0.buf()
}
}
impl<const LIMIT: usize> BufInterest for WriteBuf<LIMIT> {
#[inline]
fn want_write_buf(&self) -> bool {
self.0.len() < LIMIT
}
#[inline]
fn want_write_io(&self) -> bool {
self.0.want_write_io()
}
}
impl<const LIMIT: usize> BufWrite for WriteBuf<LIMIT> {
#[inline]
fn write_buf<F, T, E>(&mut self, func: F) -> Result<T, E>
where
F: FnOnce(&mut BytesMut) -> Result<T, E>,
{
self.0.write_buf(func)
}
#[inline]
fn do_io<Io: io::Write>(&mut self, io: &mut Io) -> io::Result<()> {
self.0.do_io(io)
}
}
pub struct ListWriteBuf<B, const LIMIT: usize> {
buf: BytesMut,
list: BufList<B, BUF_LIST_CNT>,
want_flush: bool,
}
impl<B: Buf, const LIMIT: usize> Default for ListWriteBuf<B, LIMIT> {
fn default() -> Self {
Self {
buf: BytesMut::new(),
list: BufList::new(),
want_flush: false,
}
}
}
impl<B: Buf, const LIMIT: usize> ListWriteBuf<B, LIMIT> {
pub fn split_buf(&mut self) -> BytesMut {
self.buf.split()
}
pub fn buffer<BB: Buf + Into<B>>(&mut self, buf: BB) {
self.list.push(buf.into());
self.want_flush = false;
}
}
impl<B: Buf, const LIMIT: usize> fmt::Debug for ListWriteBuf<B, LIMIT> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ListBuf")
.field("remaining", &self.list.remaining())
.finish()
}
}
const BUF_LIST_CNT: usize = 32;
impl<B, const LIMIT: usize> BufInterest for ListWriteBuf<B, LIMIT>
where
B: Buf + ChunkVectoredUninit,
{
#[inline]
fn want_write_buf(&self) -> bool {
self.list.remaining() < LIMIT && !self.list.is_full()
}
#[inline]
fn want_write_io(&self) -> bool {
self.list.remaining() != 0 || self.want_flush
}
}
impl<B, const LIMIT: usize> BufWrite for ListWriteBuf<B, LIMIT>
where
B: Buf + ChunkVectoredUninit,
{
fn write_buf<F, T, E>(&mut self, func: F) -> Result<T, E>
where
F: FnOnce(&mut BytesMut) -> Result<T, E>,
{
func(&mut self.buf).map_err(|e| {
self.buf.clear();
e
})
}
fn do_io<Io: io::Write>(&mut self, io: &mut Io) -> io::Result<()> {
let queue = &mut self.list;
loop {
if self.want_flush {
match io::Write::flush(io) {
Ok(_) => self.want_flush = false,
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
Err(e) => return Err(e),
}
break;
}
let mut buf = uninit_array::<_, BUF_LIST_CNT>();
let slice = queue.chunks_vectored_uninit_into_init(&mut buf);
match io.write_vectored(slice) {
Ok(0) => return write_zero(self.want_write_io()),
Ok(n) => {
queue.advance(n);
if queue.is_empty() {
self.want_flush = true;
}
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
Err(e) => return Err(e),
}
}
Ok(())
}
}
#[cold]
#[inline(never)]
fn write_zero(want_write: bool) -> io::Result<()> {
assert!(
want_write,
"BufWrite::write must be called after BufInterest::want_write return true."
);
Err(io::ErrorKind::WriteZero.into())
}