futures_util/stream/
filter.rs1use futures_core::{Async, Future, IntoFuture, Poll, Stream};
2use futures_core::task;
3use futures_sink::{Sink};
4
5#[derive(Debug)]
10#[must_use = "streams do nothing unless polled"]
11pub struct Filter<S, R, P>
12    where S: Stream,
13          P: FnMut(&S::Item) -> R,
14          R: IntoFuture<Item=bool, Error=S::Error>,
15{
16    stream: S,
17    pred: P,
18    pending: Option<(R::Future, S::Item)>,
19}
20
21pub fn new<S, R, P>(s: S, pred: P) -> Filter<S, R, P>
22    where S: Stream,
23          P: FnMut(&S::Item) -> R,
24          R: IntoFuture<Item=bool, Error=S::Error>,
25{
26    Filter {
27        stream: s,
28        pred: pred,
29        pending: None,
30    }
31}
32
33impl<S, R, P> Filter<S, R, P>
34    where S: Stream,
35          P: FnMut(&S::Item) -> R,
36          R: IntoFuture<Item=bool, Error=S::Error>,
37{
38    pub fn get_ref(&self) -> &S {
41        &self.stream
42    }
43
44    pub fn get_mut(&mut self) -> &mut S {
50        &mut self.stream
51    }
52
53    pub fn into_inner(self) -> S {
58        self.stream
59    }
60}
61
62impl<S, R, P> Sink for Filter<S, R, P>
64    where S: Stream,
65          P: FnMut(&S::Item) -> R,
66          R: IntoFuture<Item=bool, Error=S::Error>,
67          S: Sink,
68{
69    type SinkItem = S::SinkItem;
70    type SinkError = S::SinkError;
71
72    delegate_sink!(stream);
73}
74
75impl<S, R, P> Stream for Filter<S, R, P>
76    where S: Stream,
77          P: FnMut(&S::Item) -> R,
78          R: IntoFuture<Item=bool, Error=S::Error>,
79{
80    type Item = S::Item;
81    type Error = S::Error;
82
83    fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<S::Item>, S::Error> {
84        loop {
85            if self.pending.is_none() {
86                let item = match try_ready!(self.stream.poll_next(cx)) {
87                    Some(e) => e,
88                    None => return Ok(Async::Ready(None)),
89                };
90                let fut = ((self.pred)(&item)).into_future();
91                self.pending = Some((fut, item));
92            }
93
94            match self.pending.as_mut().unwrap().0.poll(cx) {
95                Ok(Async::Ready(true)) => {
96                    let (_, item) = self.pending.take().unwrap();
97                    return Ok(Async::Ready(Some(item)));
98                }
99                Ok(Async::Ready(false)) => self.pending = None,
100                Ok(Async::Pending) => return Ok(Async::Pending),
101                Err(e) => {
102                    self.pending = None;
103                    return Err(e)
104                }
105            }
106        }
107    }
108}