futures_util/stream/
then.rs

1use futures_core::{Async, IntoFuture, Future, Poll, Stream};
2use futures_core::task;
3use futures_sink::{ Sink};
4
5/// A stream combinator which chains a computation onto each item produced by a
6/// stream.
7///
8/// This structure is produced by the `Stream::then` method.
9#[derive(Debug)]
10#[must_use = "streams do nothing unless polled"]
11pub struct Then<S, U, F>
12    where U: IntoFuture,
13{
14    stream: S,
15    future: Option<U::Future>,
16    f: F,
17}
18
19pub fn new<S, U, F>(s: S, f: F) -> Then<S, U, F>
20    where S: Stream,
21          F: FnMut(Result<S::Item, S::Error>) -> U,
22          U: IntoFuture,
23{
24    Then {
25        stream: s,
26        future: None,
27        f: f,
28    }
29}
30
31// Forwarding impl of Sink from the underlying stream
32impl<S, U, F> Sink for Then<S, U, F>
33    where S: Sink, U: IntoFuture,
34{
35    type SinkItem = S::SinkItem;
36    type SinkError = S::SinkError;
37
38    delegate_sink!(stream);
39}
40
41impl<S, U, F> Stream for Then<S, U, F>
42    where S: Stream,
43          F: FnMut(Result<S::Item, S::Error>) -> U,
44          U: IntoFuture,
45{
46    type Item = U::Item;
47    type Error = U::Error;
48
49    fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<U::Item>, U::Error> {
50        if self.future.is_none() {
51            let item = match self.stream.poll_next(cx) {
52                Ok(Async::Pending) => return Ok(Async::Pending),
53                Ok(Async::Ready(None)) => return Ok(Async::Ready(None)),
54                Ok(Async::Ready(Some(e))) => Ok(e),
55                Err(e) => Err(e),
56            };
57            self.future = Some((self.f)(item).into_future());
58        }
59        assert!(self.future.is_some());
60        match self.future.as_mut().unwrap().poll(cx) {
61            Ok(Async::Ready(e)) => {
62                self.future = None;
63                Ok(Async::Ready(Some(e)))
64            }
65            Err(e) => {
66                self.future = None;
67                Err(e)
68            }
69            Ok(Async::Pending) => Ok(Async::Pending)
70        }
71    }
72}