futures_util/stream/
and_then.rs1use futures_core::{IntoFuture, Future, Poll, Async, Stream};
2use futures_core::task;
3use futures_sink::{Sink};
4
5#[derive(Debug)]
10#[must_use = "streams do nothing unless polled"]
11pub struct AndThen<S, U, F>
12 where U: IntoFuture,
13{
14 stream: S,
15 future: Option<U::Future>,
16 f: F,
17}
18
19pub fn new<S, U, F>(s: S, f: F) -> AndThen<S, U, F>
20 where S: Stream,
21 F: FnMut(S::Item) -> U,
22 U: IntoFuture<Error=S::Error>,
23{
24 AndThen {
25 stream: s,
26 future: None,
27 f: f,
28 }
29}
30
31impl<S, U, F> AndThen<S, U, F>
32 where U: IntoFuture,
33{
34 pub fn get_ref(&self) -> &S {
37 &self.stream
38 }
39
40 pub fn get_mut(&mut self) -> &mut S {
46 &mut self.stream
47 }
48
49 pub fn into_inner(self) -> S {
54 self.stream
55 }
56}
57
58impl<S, U: IntoFuture, F> Sink for AndThen<S, U, F>
60 where S: Sink
61{
62 type SinkItem = S::SinkItem;
63 type SinkError = S::SinkError;
64
65 delegate_sink!(stream);
66}
67
68impl<S, U, F> Stream for AndThen<S, U, F>
69 where S: Stream,
70 F: FnMut(S::Item) -> U,
71 U: IntoFuture<Error=S::Error>,
72{
73 type Item = U::Item;
74 type Error = S::Error;
75
76 fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<U::Item>, S::Error> {
77 if self.future.is_none() {
78 let item = match try_ready!(self.stream.poll_next(cx)) {
79 None => return Ok(Async::Ready(None)),
80 Some(e) => e,
81 };
82 self.future = Some((self.f)(item).into_future());
83 }
84 assert!(self.future.is_some());
85 match self.future.as_mut().unwrap().poll(cx) {
86 Ok(Async::Ready(e)) => {
87 self.future = None;
88 Ok(Async::Ready(Some(e)))
89 }
90 Err(e) => {
91 self.future = None;
92 Err(e)
93 }
94 Ok(Async::Pending) => Ok(Async::Pending)
95 }
96 }
97}