use bytes::BytesMut;
use cynthia::future::swap::{AsyncRead, AsyncWrite};
use futures_core::{ready, stream::Stream};
use futures_sink::Sink;
use log::trace;
use pin_project_lite::pin_project;
use std::borrow::{Borrow, BorrowMut};
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::common::codec::decoder::Decoder;
use crate::common::codec::encoder::Encoder;
pin_project! {
#[derive(Debug)]
pub(crate) struct FramedImpl<T, U, State> {
#[pin]
pub(crate) inner: T,
pub(crate) state: State,
pub(crate) codec: U,
}
}
const INITIAL_CAPACITY: usize = 8 * 1024;
const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY;
pub(crate) struct ReadFrame {
pub(crate) eof: bool,
pub(crate) is_readable: bool,
pub(crate) buffer: BytesMut,
}
pub(crate) struct WriteFrame {
pub(crate) buffer: BytesMut,
}
#[derive(Default)]
pub(crate) struct RWFrames {
pub(crate) read: ReadFrame,
pub(crate) write: WriteFrame,
}
impl Default for ReadFrame {
fn default() -> Self {
Self {
eof: false,
is_readable: false,
buffer: BytesMut::with_capacity(INITIAL_CAPACITY),
}
}
}
impl Default for WriteFrame {
fn default() -> Self {
Self {
buffer: BytesMut::with_capacity(INITIAL_CAPACITY),
}
}
}
impl From<BytesMut> for ReadFrame {
fn from(mut buffer: BytesMut) -> Self {
let size = buffer.capacity();
if size < INITIAL_CAPACITY {
buffer.reserve(INITIAL_CAPACITY - size);
}
Self {
buffer,
is_readable: size > 0,
eof: false,
}
}
}
impl From<BytesMut> for WriteFrame {
fn from(mut buffer: BytesMut) -> Self {
let size = buffer.capacity();
if size < INITIAL_CAPACITY {
buffer.reserve(INITIAL_CAPACITY - size);
}
Self { buffer }
}
}
impl Borrow<ReadFrame> for RWFrames {
fn borrow(&self) -> &ReadFrame {
&self.read
}
}
impl BorrowMut<ReadFrame> for RWFrames {
fn borrow_mut(&mut self) -> &mut ReadFrame {
&mut self.read
}
}
impl Borrow<WriteFrame> for RWFrames {
fn borrow(&self) -> &WriteFrame {
&self.write
}
}
impl BorrowMut<WriteFrame> for RWFrames {
fn borrow_mut(&mut self) -> &mut WriteFrame {
&mut self.write
}
}
impl<T, U, R> Stream for FramedImpl<T, U, R>
where
T: AsyncRead,
U: Decoder,
R: BorrowMut<ReadFrame>,
{
type Item = Result<U::Item, U::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
use crate::common::codec::util::poll_read_buf;
let mut pinned = self.project();
let state: &mut ReadFrame = pinned.state.borrow_mut();
loop {
if state.is_readable {
if state.eof {
let frame = pinned.codec.decode_eof(&mut state.buffer)?;
return Poll::Ready(frame.map(Ok));
}
trace!("attempting to decode a frame");
if let Some(frame) = pinned.codec.decode(&mut state.buffer)? {
trace!("frame decoded from buffer");
return Poll::Ready(Some(Ok(frame)));
}
state.is_readable = false;
}
assert!(!state.eof);
state.buffer.reserve(1);
let bytect = match poll_read_buf(pinned.inner.as_mut(), cx, &mut state.buffer)? {
Poll::Ready(ct) => ct,
Poll::Pending => return Poll::Pending,
};
if bytect == 0 {
state.eof = true;
}
state.is_readable = true;
}
}
}
impl<T, I, U, W> Sink<I> for FramedImpl<T, U, W>
where
T: AsyncWrite,
U: Encoder<I>,
U::Error: From<io::Error>,
W: BorrowMut<WriteFrame>,
{
type Error = U::Error;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.state.borrow().buffer.len() >= BACKPRESSURE_BOUNDARY {
self.as_mut().poll_flush(cx)
} else {
Poll::Ready(Ok(()))
}
}
fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
let pinned = self.project();
pinned
.codec
.encode(item, &mut pinned.state.borrow_mut().buffer)?;
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
use crate::common::codec::util::poll_write_buf;
trace!("flushing framed transport");
let mut pinned = self.project();
while !pinned.state.borrow_mut().buffer.is_empty() {
let WriteFrame { buffer } = pinned.state.borrow_mut();
trace!("writing; remaining={}", buffer.len());
let n = ready!(poll_write_buf(pinned.inner.as_mut(), cx, buffer))?;
if n == 0 {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to \
write frame to transport",
)
.into()));
}
}
ready!(pinned.inner.poll_flush(cx))?;
trace!("framed transport flushed");
Poll::Ready(Ok(()))
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
ready!(self.as_mut().poll_flush(cx))?;
ready!(self.project().inner.poll_close(cx))?;
Poll::Ready(Ok(()))
}
}