use futures::{executor, Async, Poll, Stream};
use std::fmt;
struct TakeWhileReady<S> {
stream: S,
exhausted: bool,
}
impl<S> fmt::Debug for TakeWhileReady<S> {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
f.debug_struct("TakeWhileReady")
.field("exhausted", &self.exhausted)
.finish()
}
}
impl<S> TakeWhileReady<S>
where
S: Stream,
{
pub fn new(stream: S) -> Self {
TakeWhileReady {
stream,
exhausted: false,
}
}
}
impl<S> Stream for TakeWhileReady<S>
where
S: Stream,
{
type Item = S::Item;
type Error = S::Error;
fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
if self.exhausted {
return Ok(Async::Ready(None));
}
match self.stream.poll()? {
Async::Ready(None) | Async::NotReady => {
self.exhausted = true;
Ok(Async::Ready(None))
}
Async::Ready(Some(item)) => Ok(Async::Ready(Some(item))),
}
}
}
pub fn poll_events<S: Stream<Item = (), Error = ()>>(stream: &mut S) {
let events = TakeWhileReady::new(stream.by_ref()).for_each(|_| Ok(()));
let mut spawn = executor::spawn(events);
spawn.wait_future().expect("Error polling events");
}
#[cfg(test)]
mod tests {
use super::*;
use futures::executor;
use futures::sync::mpsc;
#[test]
fn test_take_while_ready() {
use futures::stream::{iter_ok, poll_fn};
use futures::Async;
let mut waiting = false;
let stream = iter_ok::<_, ()>(1..4).chain(poll_fn(move || {
if waiting {
Ok(Async::NotReady) } else {
waiting = true;
Ok(Async::Ready(Some(4)))
}
}));
let stream = TakeWhileReady::new(stream);
let collected: Vec<_> = stream.wait().into_iter().collect();
assert_eq!(collected, vec![Ok(1), Ok(2), Ok(3), Ok(4)]);
}
#[test]
fn test_take_while_ready_with_executor() {
let (mut sender, mut receiver) = mpsc::channel(16);
{
let folded = TakeWhileReady::new(receiver.by_ref()).fold(0, |acc, i| Ok(acc + i));
let mut exec = executor::spawn(folded);
sender.try_send(1).unwrap();
sender.try_send(2).unwrap();
let result = exec.wait_future();
assert_eq!(result, Ok(3));
}
{
let folded = TakeWhileReady::new(receiver.by_ref()).fold(0, |acc, i| Ok(acc + i));
let mut exec = executor::spawn(folded);
sender.try_send(3).unwrap();
sender.try_send(4).unwrap();
sender.try_send(5).unwrap();
let result = exec.wait_future();
assert_eq!(result, Ok(12));
}
}
}