use std::borrow::BorrowMut;
use std::io;
use std::pin::Pin;
use std::task::{ready, Context, Poll};
use tokio::io::AsyncRead;
use super::{ChannelMsg, ChannelReadHalf};
#[derive(Debug)]
pub struct ChannelRx<R> {
channel: R,
buffer: Option<(ChannelMsg, usize)>,
ext: Option<u32>,
}
impl<R> ChannelRx<R> {
pub fn new(channel: R, ext: Option<u32>) -> Self {
Self {
channel,
buffer: None,
ext,
}
}
}
impl<R> AsyncRead for ChannelRx<R>
where
R: BorrowMut<ChannelReadHalf> + Unpin,
{
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let (msg, mut idx) = match self.buffer.take() {
Some(msg) => msg,
None => match ready!(self.channel.borrow_mut().receiver.poll_recv(cx)) {
Some(msg) => (msg, 0),
None => return Poll::Ready(Ok(())),
},
};
match (&msg, self.ext) {
(ChannelMsg::Data { data }, None) => {
let readable = buf.remaining().min(data.len() - idx);
#[allow(clippy::indexing_slicing)]
buf.put_slice(&data[idx..idx + readable]);
idx += readable;
if idx != data.len() {
self.buffer = Some((msg, idx));
}
Poll::Ready(Ok(()))
}
(ChannelMsg::ExtendedData { data, ext }, Some(target)) if *ext == target => {
let readable = buf.remaining().min(data.len() - idx);
#[allow(clippy::indexing_slicing)]
buf.put_slice(&data[idx..idx + readable]);
idx += readable;
if idx != data.len() {
self.buffer = Some((msg, idx));
}
Poll::Ready(Ok(()))
}
(ChannelMsg::Eof, _) => {
self.channel.borrow_mut().receiver.close();
Poll::Ready(Ok(()))
}
_ => {
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
}