use super::InStream;
use crate::consts::MAX_LOOPS;
use bytes::BytesMut;
use err_derive::Error;
use futures::prelude::*;
use std::collections::VecDeque;
use std::io;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
#[derive(Debug, Error)]
pub enum NonblockingInStreamError {
#[error(display = "Lagged and dropped {} messages", _0)]
Lagged(usize),
#[error(display = "Stream poll: {:?}", _0)]
StreamPoll(#[error(source, no_from)] io::Error),
#[error(display = "Codec error: {:?}", _0)]
Codec(#[source] io::Error),
}
def_into_error!(NonblockingInStreamError);
struct BytesMutQueue {
queue: VecDeque<BytesMut>,
buflen: usize,
}
impl BytesMutQueue {
fn new(buflen: usize) -> Self {
let queue = VecDeque::with_capacity(buflen);
Self { queue, buflen }
}
fn is_empty(&self) -> bool {
self.queue.is_empty()
}
fn push_back(&mut self, msg: BytesMut) -> Option<BytesMut> {
let ret = if self.queue.len() >= self.buflen {
self.queue.pop_front()
} else {
None
};
self.queue.push_back(msg);
ret
}
fn pop_front(&mut self) -> Option<BytesMut> {
self.queue.pop_front()
}
}
pub(super) struct NblkInStreamInner {
recv: InStream,
queue: BytesMutQueue,
ref_count: usize,
driver: Option<Waker>,
lagged: usize,
closed: bool,
reader: Option<Waker>,
is_bcast: bool,
}
impl NblkInStreamInner {
fn drive_recv(&mut self, cx: &mut Context) -> Result<Option<bool>, NonblockingInStreamError> {
if self.closed {
return Ok(None); }
let mut recvd = 0;
loop {
let msg = match self.recv.poll_next_unpin(cx) {
Poll::Pending => break,
Poll::Ready(None) => {
self.closed = true;
recvd = 1; break;
}
Poll::Ready(Some(Err(e))) => Err(NonblockingInStreamError::StreamPoll(e)),
Poll::Ready(Some(Ok(msg))) => Ok(msg),
}?;
if !(self.is_bcast && msg.is_empty()) && self.queue.push_back(msg).is_some() {
self.lagged += 1;
}
recvd += 1;
if recvd >= MAX_LOOPS {
return Ok(Some(true));
}
}
if recvd == 0 {
Ok(None)
} else {
Ok(Some(false))
}
}
fn run_driver(&mut self, cx: &mut Context) -> Result<(), NonblockingInStreamError> {
loop {
match self.drive_recv(cx)? {
None => break, Some(keep_going) => {
if let Some(task) = self.reader.take() {
task.wake();
}
if !keep_going {
break;
}
}
}
}
Ok(())
}
}
def_ref!(NblkInStreamInner, NblkInStreamRef, pub(self));
impl NblkInStreamRef {
fn new(recv: InStream, buflen: usize, is_bcast: bool) -> Self {
Self(Arc::new(Mutex::new(NblkInStreamInner {
recv,
queue: BytesMutQueue::new(buflen),
ref_count: 0,
driver: None,
lagged: 0,
closed: false,
reader: None,
is_bcast,
})))
}
}
def_driver!(pub(self), NblkInStreamRef; pub(self), NblkInStreamDriver; NonblockingInStreamError);
pub struct NonblockingInStream(NblkInStreamRef);
impl NonblockingInStream {
pub fn new(recv: InStream, buflen: usize) -> Self {
NonblockingInStream::new_x(recv, buflen, true)
}
pub fn new_unicast(recv: InStream, buflen: usize) -> Self {
NonblockingInStream::new_x(recv, buflen, false)
}
fn new_x(recv: InStream, buflen: usize, is_bcast: bool) -> Self {
let inner = NblkInStreamRef::new(recv, buflen, is_bcast);
let driver = NblkInStreamDriver(inner.clone());
tokio::spawn(async move { driver.await });
Self(inner)
}
}
impl futures::stream::FusedStream for NonblockingInStream {
fn is_terminated(&self) -> bool {
let inner = self.0.lock().unwrap();
inner.lagged == 0 && inner.queue.is_empty() && inner.closed
}
}
impl Stream for NonblockingInStream {
type Item = Result<BytesMut, NonblockingInStreamError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let mut inner = self.0.lock().unwrap();
let reader = inner.reader.take();
if inner.lagged != 0 {
let lagged = inner.lagged;
inner.lagged = 0;
return Poll::Ready(Some(Err(NonblockingInStreamError::Lagged(lagged))));
}
match inner.queue.pop_front() {
Some(item) => Poll::Ready(Some(Ok(item))),
None => {
if inner.closed {
Poll::Ready(None)
} else {
inner.reader.replace(match reader {
Some(w) if w.will_wake(cx.waker()) => w,
_ => cx.waker().clone(),
});
Poll::Pending
}
}
}
}
}