use std::{
io,
pin::Pin,
task::{ready, Context, Poll},
};
use algorithm::buf::{BinaryMut, Bt};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
#[derive(Debug)]
pub struct FramedWrite<T> {
inner: T,
binary: BinaryMut,
}
impl<T> FramedWrite<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
pub fn new(io: T) -> Self {
Self {
inner: io,
binary: BinaryMut::new(),
}
}
pub fn into_io(self) -> T {
self.inner
}
pub fn get_mut(&mut self) -> &mut T {
&mut self.inner
}
pub fn get_mut_bytes(&mut self) -> &mut BinaryMut {
&mut self.binary
}
pub fn get_bytes(&self) -> &BinaryMut {
&self.binary
}
pub fn has_capacity(&self) -> bool {
true
}
pub fn poll_ready(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
if !self.has_capacity() {
ready!(self.flush(cx))?;
if !self.has_capacity() {
return Poll::Pending;
}
}
Poll::Ready(Ok(()))
}
pub fn flush(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
let span = tracing::trace_span!("FramedWrite::flush");
let _e = span.enter();
if !self.binary.has_remaining() {
return Poll::Ready(Ok(()));
}
let n = ready!(Pin::new(&mut self.inner).poll_write(cx, self.binary.chunk()))?;
self.binary.advance(n);
Poll::Ready(Ok(()))
}
pub fn shutdown(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
ready!(self.flush(cx))?;
Pin::new(&mut self.inner).poll_shutdown(cx)
}
pub fn set_cache_buf(&mut self, write_buf: BinaryMut) {
self.binary.put_slice(write_buf.chunk());
}
pub fn is_write_end(&self) -> bool {
self.binary.is_empty()
}
}
impl<T: AsyncRead + Unpin> AsyncRead for FramedWrite<T> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_read(cx, buf)
}
}