futures_util/stream/
filter_map.rs1use futures_core::{Async, Future, IntoFuture, Poll, Stream};
2use futures_core::task;
3use futures_sink::{Sink};
4
5#[derive(Debug)]
10#[must_use = "streams do nothing unless polled"]
11pub struct FilterMap<S, R, F>
12 where S: Stream,
13 F: FnMut(S::Item) -> R,
14 R: IntoFuture<Error=S::Error>,
15{
16 stream: S,
17 f: F,
18 pending: Option<R::Future>,
19}
20
21pub fn new<S, R, F>(s: S, f: F) -> FilterMap<S, R, F>
22 where S: Stream,
23 F: FnMut(S::Item) -> R,
24 R: IntoFuture<Error=S::Error>,
25{
26 FilterMap {
27 stream: s,
28 f: f,
29 pending: None,
30 }
31}
32
33impl<S, R, F> FilterMap<S, R, F>
34 where S: Stream,
35 F: FnMut(S::Item) -> R,
36 R: IntoFuture<Error=S::Error>,
37{
38 pub fn get_ref(&self) -> &S {
41 &self.stream
42 }
43
44 pub fn get_mut(&mut self) -> &mut S {
50 &mut self.stream
51 }
52
53 pub fn into_inner(self) -> S {
58 self.stream
59 }
60}
61
62impl<S, R, F> Sink for FilterMap<S, R, F>
64 where S: Stream + Sink,
65 F: FnMut(S::Item) -> R,
66 R: IntoFuture<Error=S::Error>,
67{
68 type SinkItem = S::SinkItem;
69 type SinkError = S::SinkError;
70
71 delegate_sink!(stream);
72}
73
74impl<S, R, F, B> Stream for FilterMap<S, R, F>
75 where S: Stream,
76 F: FnMut(S::Item) -> R,
77 R: IntoFuture<Item=Option<B>, Error=S::Error>,
78{
79 type Item = B;
80 type Error = S::Error;
81
82 fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<B>, S::Error> {
83 loop {
84 if self.pending.is_none() {
85 let item = match try_ready!(self.stream.poll_next(cx)) {
86 Some(e) => e,
87 None => return Ok(Async::Ready(None)),
88 };
89 let fut = ((self.f)(item)).into_future();
90 self.pending = Some(fut);
91 }
92
93 match self.pending.as_mut().unwrap().poll(cx) {
94 x @ Ok(Async::Ready(Some(_))) => {
95 self.pending = None;
96 return x
97 }
98 Ok(Async::Ready(None)) => self.pending = None,
99 Ok(Async::Pending) => return Ok(Async::Pending),
100 Err(e) => {
101 self.pending = None;
102 return Err(e)
103 }
104 }
105 }
106 }
107}