futures_util/stream/
and_then.rs

1use futures_core::{IntoFuture, Future, Poll, Async, Stream};
2use futures_core::task;
3use futures_sink::{Sink};
4
5/// A stream combinator which chains a computation onto values produced by a
6/// stream.
7///
8/// This structure is produced by the `Stream::and_then` method.
9#[derive(Debug)]
10#[must_use = "streams do nothing unless polled"]
11pub struct AndThen<S, U, F>
12    where U: IntoFuture,
13{
14    stream: S,
15    future: Option<U::Future>,
16    f: F,
17}
18
19pub fn new<S, U, F>(s: S, f: F) -> AndThen<S, U, F>
20    where S: Stream,
21          F: FnMut(S::Item) -> U,
22          U: IntoFuture<Error=S::Error>,
23{
24    AndThen {
25        stream: s,
26        future: None,
27        f: f,
28    }
29}
30
31impl<S, U, F> AndThen<S, U, F>
32    where U: IntoFuture,
33{
34    /// Acquires a reference to the underlying stream that this combinator is
35    /// pulling from.
36    pub fn get_ref(&self) -> &S {
37        &self.stream
38    }
39
40    /// Acquires a mutable reference to the underlying stream that this
41    /// combinator is pulling from.
42    ///
43    /// Note that care must be taken to avoid tampering with the state of the
44    /// stream which may otherwise confuse this combinator.
45    pub fn get_mut(&mut self) -> &mut S {
46        &mut self.stream
47    }
48
49    /// Consumes this combinator, returning the underlying stream.
50    ///
51    /// Note that this may discard intermediate state of this combinator, so
52    /// care should be taken to avoid losing resources when this is called.
53    pub fn into_inner(self) -> S {
54        self.stream
55    }
56}
57
58// Forwarding impl of Sink from the underlying stream
59impl<S, U: IntoFuture, F> Sink for AndThen<S, U, F>
60    where S: Sink
61{
62    type SinkItem = S::SinkItem;
63    type SinkError = S::SinkError;
64
65    delegate_sink!(stream);
66}
67
68impl<S, U, F> Stream for AndThen<S, U, F>
69    where S: Stream,
70          F: FnMut(S::Item) -> U,
71          U: IntoFuture<Error=S::Error>,
72{
73    type Item = U::Item;
74    type Error = S::Error;
75
76    fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<U::Item>, S::Error> {
77        if self.future.is_none() {
78            let item = match try_ready!(self.stream.poll_next(cx)) {
79                None => return Ok(Async::Ready(None)),
80                Some(e) => e,
81            };
82            self.future = Some((self.f)(item).into_future());
83        }
84        assert!(self.future.is_some());
85        match self.future.as_mut().unwrap().poll(cx) {
86            Ok(Async::Ready(e)) => {
87                self.future = None;
88                Ok(Async::Ready(Some(e)))
89            }
90            Err(e) => {
91                self.future = None;
92                Err(e)
93            }
94            Ok(Async::Pending) => Ok(Async::Pending)
95        }
96    }
97}