use futures::{Async, Poll, Stream};
use std::fmt;
pub struct GreedyFold<S, F, A> {
stream: S,
fold_fn: F,
acc: A,
exhausted: bool,
}
impl<S, F, A> fmt::Debug for GreedyFold<S, F, A> {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
f.debug_struct("GreedyFold")
.field("exhausted", &self.exhausted)
.finish()
}
}
pub trait GreedilyFoldable: Stream {
fn greedy_fold<F, A>(self, acc: A, fold_fn: F) -> GreedyFold<Self, F, A>
where
F: FnMut(A, Self::Item) -> A,
A: Clone,
Self: Sized;
}
impl<T: Stream> GreedilyFoldable for T {
fn greedy_fold<F, A>(self, acc: A, fold_fn: F) -> GreedyFold<Self, F, A>
where
F: FnMut(A, Self::Item) -> A,
A: Clone,
Self: Sized,
{
GreedyFold::new(self, acc, fold_fn)
}
}
impl<S, F, A> GreedyFold<S, F, A>
where
S: Stream,
F: FnMut(A, S::Item) -> A,
A: Clone,
{
fn new(stream: S, acc: A, fold_fn: F) -> Self {
GreedyFold {
stream,
fold_fn,
acc,
exhausted: false,
}
}
}
impl<S, F, A> Stream for GreedyFold<S, F, A>
where
S: Stream,
F: FnMut(A, S::Item) -> A,
A: Clone,
{
type Item = A;
type Error = S::Error;
fn poll(&mut self) -> Poll<Option<A>, S::Error> {
if self.exhausted {
return Ok(Async::Ready(None));
}
let mut acc = self.acc.clone();
let mut some_items = false;
loop {
match self.stream.poll()? {
Async::Ready(None) => {
self.exhausted = true;
return Ok(if some_items {
Async::Ready(Some(acc))
} else {
Async::Ready(None)
});
}
Async::NotReady => {
return Ok(Async::Ready(Some(acc)));
}
Async::Ready(Some(item)) => {
some_items = true;
acc = (self.fold_fn)(acc, item);
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::executor;
use futures::sync::mpsc;
#[test]
fn test_fold_empty_stream() {
let (mut sender, receiver) = mpsc::channel(1_024);
let stream = GreedyFold::new(receiver, 0, |acc, i| acc + i);
sender.try_send(1).unwrap();
sender.try_send(2).unwrap();
sender.try_send(3).unwrap();
sender.try_send(4).unwrap();
let folded: Vec<_> = stream.take(4).wait().into_iter().collect();
assert_eq!(folded, vec![Ok(10), Ok(0), Ok(0), Ok(0)]);
}
#[test]
fn test_iterative_fold() {
let (mut sender, receiver) = mpsc::channel(1_024);
let stream = GreedyFold::new(receiver, 0, |acc, i| acc + i);
let mut exec = executor::spawn(stream);
sender.try_send(1).unwrap();
sender.try_send(2).unwrap();
let result = exec.wait_stream();
assert_eq!(result, Some(Ok(3)));
sender.try_send(3).unwrap();
sender.try_send(4).unwrap();
sender.try_send(5).unwrap();
let result = exec.wait_stream();
assert_eq!(result, Some(Ok(12)));
}
#[test]
fn test_iterative_fold_side_effects() {
use std::cell::RefCell;
let (mut sender, receiver) = mpsc::channel(1_024);
let values = RefCell::new(Vec::new());
let stream = {
let stream = GreedyFold::new(receiver, (), |_, i| { values.borrow_mut().push(i); });
stream
};
let mut exec = executor::spawn(stream);
sender.try_send(1).unwrap();
sender.try_send(2).unwrap();
let result = exec.wait_stream();
assert_eq!(result, Some(Ok(())));
assert_eq!(*values.borrow(), vec![1, 2]);
sender.try_send(3).unwrap();
sender.try_send(4).unwrap();
sender.try_send(5).unwrap();
let result = exec.wait_stream();
assert_eq!(result, Some(Ok(())));
assert_eq!(*values.borrow(), vec![1, 2, 3, 4, 5]);
}
}