futures_util/stream/
filter.rs1use futures_core::{Async, Future, IntoFuture, Poll, Stream};
2use futures_core::task;
3use futures_sink::{Sink};
4
5#[derive(Debug)]
10#[must_use = "streams do nothing unless polled"]
11pub struct Filter<S, R, P>
12 where S: Stream,
13 P: FnMut(&S::Item) -> R,
14 R: IntoFuture<Item=bool, Error=S::Error>,
15{
16 stream: S,
17 pred: P,
18 pending: Option<(R::Future, S::Item)>,
19}
20
21pub fn new<S, R, P>(s: S, pred: P) -> Filter<S, R, P>
22 where S: Stream,
23 P: FnMut(&S::Item) -> R,
24 R: IntoFuture<Item=bool, Error=S::Error>,
25{
26 Filter {
27 stream: s,
28 pred: pred,
29 pending: None,
30 }
31}
32
33impl<S, R, P> Filter<S, R, P>
34 where S: Stream,
35 P: FnMut(&S::Item) -> R,
36 R: IntoFuture<Item=bool, Error=S::Error>,
37{
38 pub fn get_ref(&self) -> &S {
41 &self.stream
42 }
43
44 pub fn get_mut(&mut self) -> &mut S {
50 &mut self.stream
51 }
52
53 pub fn into_inner(self) -> S {
58 self.stream
59 }
60}
61
62impl<S, R, P> Sink for Filter<S, R, P>
64 where S: Stream,
65 P: FnMut(&S::Item) -> R,
66 R: IntoFuture<Item=bool, Error=S::Error>,
67 S: Sink,
68{
69 type SinkItem = S::SinkItem;
70 type SinkError = S::SinkError;
71
72 delegate_sink!(stream);
73}
74
75impl<S, R, P> Stream for Filter<S, R, P>
76 where S: Stream,
77 P: FnMut(&S::Item) -> R,
78 R: IntoFuture<Item=bool, Error=S::Error>,
79{
80 type Item = S::Item;
81 type Error = S::Error;
82
83 fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<S::Item>, S::Error> {
84 loop {
85 if self.pending.is_none() {
86 let item = match try_ready!(self.stream.poll_next(cx)) {
87 Some(e) => e,
88 None => return Ok(Async::Ready(None)),
89 };
90 let fut = ((self.pred)(&item)).into_future();
91 self.pending = Some((fut, item));
92 }
93
94 match self.pending.as_mut().unwrap().0.poll(cx) {
95 Ok(Async::Ready(true)) => {
96 let (_, item) = self.pending.take().unwrap();
97 return Ok(Async::Ready(Some(item)));
98 }
99 Ok(Async::Ready(false)) => self.pending = None,
100 Ok(Async::Pending) => return Ok(Async::Pending),
101 Err(e) => {
102 self.pending = None;
103 return Err(e)
104 }
105 }
106 }
107 }
108}