use core::pin::Pin;
use core::task::Context;
use async_compat::Compat;
use futures::task::Poll;
use futures::{io, AsyncRead, AsyncWrite, Sink, Stream};
use pin_project_lite::pin_project;
use tokio_util::bytes::{Bytes, BytesMut};
use tokio_util::codec::length_delimited::Builder;
use tokio_util::codec::{self, LengthDelimitedCodec};
pin_project! {
pub struct FramedRead<R> {
#[pin]
inner: codec::FramedRead<Compat<R>, LengthDelimitedCodec>,
}
}
impl<R: AsyncRead> FramedRead<R> {
#[inline]
pub fn new(reader: R) -> Self {
Self { inner: length_delimited_codec().new_read(Compat::new(reader)) }
}
}
impl<R: AsyncRead> Stream for FramedRead<R> {
type Item = io::Result<BytesMut>;
#[inline]
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
self.project().inner.poll_next(cx)
}
}
pin_project! {
pub struct FramedWrite<W> {
#[pin]
inner: codec::FramedWrite<Compat<W>, LengthDelimitedCodec>,
}
}
impl<W: AsyncWrite> FramedWrite<W> {
#[inline]
pub fn new(writer: W) -> Self {
Self { inner: length_delimited_codec().new_write(Compat::new(writer)) }
}
}
impl<W: AsyncWrite> Sink<Vec<u8>> for FramedWrite<W> {
type Error = io::Error;
#[inline]
fn poll_ready(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_ready(ctx)
}
#[inline]
fn start_send(
self: Pin<&mut Self>,
item: Vec<u8>,
) -> Result<(), Self::Error> {
self.project().inner.start_send(Bytes::from(item))
}
#[inline]
fn poll_flush(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_flush(ctx)
}
#[inline]
fn poll_close(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_close(ctx)
}
}
#[cfg(any(target_pointer_width = "64", target_pointer_width = "128"))]
#[inline]
fn length_delimited_codec() -> Builder {
let mut builder = LengthDelimitedCodec::builder();
builder
.length_field_type::<u64>()
.max_frame_length(u64::MAX as usize);
builder
}