use std::{
pin::Pin,
task::{Context, Poll},
};
pub use bytes::Bytes;
pub use bytes::{Buf, BytesMut};
use futures_util::{AsyncWrite, Stream};
pub struct FrameReader<'a> {
inner: &'a mut BytesMut,
read_offset: usize,
}
impl AsRef<[u8]> for FrameReader<'_> {
fn as_ref(&self) -> &[u8] {
self.chunk()
}
}
impl<'a> Buf for FrameReader<'a> {
fn remaining(&self) -> usize {
self.inner.len() - self.read_offset
}
fn chunk(&self) -> &[u8] {
&self.inner[self.read_offset..]
}
fn advance(&mut self, cnt: usize) {
self.read_offset += cnt;
assert!(self.read_offset <= self.inner.len());
}
}
impl<'a> FrameReader<'a> {
pub fn new(inner: &'a mut BytesMut) -> Self {
Self { inner, read_offset: 0 }
}
pub fn as_slice(&self) -> &[u8] {
self.chunk()
}
pub fn take(&mut self) -> BytesMut {
let read_offset = std::mem::take(&mut self.read_offset);
if self.inner.capacity() > self.inner.len() * 2 {
self.inner.split_off(read_offset)
} else {
std::mem::take(&mut self.inner)
}
}
pub fn advanced(&self) -> usize {
self.read_offset
}
pub fn advance(&mut self, cnt: usize) {
<Self as Buf>::advance(self, cnt);
}
pub fn is_empty(&self) -> bool {
self.read_offset == self.inner.len()
}
}
pub trait AsyncFrameWrite: Send + 'static {
fn begin_write_frame(self: Pin<&mut Self>, len: usize) -> std::io::Result<()> {
let _ = (len,);
Ok(())
}
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut FrameReader,
) -> Poll<std::io::Result<()>>;
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
let _ = (cx,);
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
let _ = (cx,);
Poll::Ready(Ok(()))
}
}
impl<T> AsyncFrameWrite for T
where
T: AsyncWrite + Send + 'static,
{
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut FrameReader,
) -> Poll<std::io::Result<()>> {
match self.poll_write(cx, buf.as_ref())? {
Poll::Ready(x) => {
buf.advance(x);
Poll::Ready(Ok(()))
}
Poll::Pending => Poll::Pending,
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
self.poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
self.poll_close(cx)
}
}
pub trait AsyncFrameRead: Send + Sync + 'static {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<Bytes>>;
}
impl<T: Stream<Item = std::io::Result<Bytes>> + Sync + Send + 'static> AsyncFrameRead for T {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<Bytes>> {
self.poll_next(cx).map(|x| x.unwrap_or_else(|| Err(std::io::ErrorKind::BrokenPipe.into())))
}
}