futures_util/stream/
filter_map.rs

1use futures_core::{Async, Future, IntoFuture, Poll, Stream};
2use futures_core::task;
3use futures_sink::{Sink};
4
5/// A combinator used to filter the results of a stream and simultaneously map
6/// them to a different type.
7///
8/// This structure is returned by the `Stream::filter_map` method.
9#[derive(Debug)]
10#[must_use = "streams do nothing unless polled"]
11pub struct FilterMap<S, R, F>
12    where S: Stream,
13          F: FnMut(S::Item) -> R,
14          R: IntoFuture<Error=S::Error>,
15{
16    stream: S,
17    f: F,
18    pending: Option<R::Future>,
19}
20
21pub fn new<S, R, F>(s: S, f: F) -> FilterMap<S, R, F>
22    where S: Stream,
23          F: FnMut(S::Item) -> R,
24          R: IntoFuture<Error=S::Error>,
25{
26    FilterMap {
27        stream: s,
28        f: f,
29        pending: None,
30    }
31}
32
33impl<S, R, F> FilterMap<S, R, F>
34    where S: Stream,
35          F: FnMut(S::Item) -> R,
36          R: IntoFuture<Error=S::Error>,
37{
38    /// Acquires a reference to the underlying stream that this combinator is
39    /// pulling from.
40    pub fn get_ref(&self) -> &S {
41        &self.stream
42    }
43
44    /// Acquires a mutable reference to the underlying stream that this
45    /// combinator is pulling from.
46    ///
47    /// Note that care must be taken to avoid tampering with the state of the
48    /// stream which may otherwise confuse this combinator.
49    pub fn get_mut(&mut self) -> &mut S {
50        &mut self.stream
51    }
52
53    /// Consumes this combinator, returning the underlying stream.
54    ///
55    /// Note that this may discard intermediate state of this combinator, so
56    /// care should be taken to avoid losing resources when this is called.
57    pub fn into_inner(self) -> S {
58        self.stream
59    }
60}
61
62// Forwarding impl of Sink from the underlying stream
63impl<S, R, F> Sink for FilterMap<S, R, F>
64    where S: Stream + Sink,
65          F: FnMut(S::Item) -> R,
66          R: IntoFuture<Error=S::Error>,
67{
68    type SinkItem = S::SinkItem;
69    type SinkError = S::SinkError;
70
71    delegate_sink!(stream);
72}
73
74impl<S, R, F, B> Stream for FilterMap<S, R, F>
75    where S: Stream,
76          F: FnMut(S::Item) -> R,
77          R: IntoFuture<Item=Option<B>, Error=S::Error>,
78{
79    type Item = B;
80    type Error = S::Error;
81
82    fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<B>, S::Error> {
83        loop {
84            if self.pending.is_none() {
85                let item = match try_ready!(self.stream.poll_next(cx)) {
86                    Some(e) => e,
87                    None => return Ok(Async::Ready(None)),
88                };
89                let fut = ((self.f)(item)).into_future();
90                self.pending = Some(fut);
91            }
92
93            match self.pending.as_mut().unwrap().poll(cx) {
94                x @ Ok(Async::Ready(Some(_))) => {
95                    self.pending = None;
96                    return x
97                }
98                Ok(Async::Ready(None)) => self.pending = None,
99                Ok(Async::Pending) => return Ok(Async::Pending),
100                Err(e) => {
101                    self.pending = None;
102                    return Err(e)
103                }
104            }
105        }
106    }
107}