futures_util/stream/
then.rs1use futures_core::{Async, IntoFuture, Future, Poll, Stream};
2use futures_core::task;
3use futures_sink::{ Sink};
4
5#[derive(Debug)]
10#[must_use = "streams do nothing unless polled"]
11pub struct Then<S, U, F>
12 where U: IntoFuture,
13{
14 stream: S,
15 future: Option<U::Future>,
16 f: F,
17}
18
19pub fn new<S, U, F>(s: S, f: F) -> Then<S, U, F>
20 where S: Stream,
21 F: FnMut(Result<S::Item, S::Error>) -> U,
22 U: IntoFuture,
23{
24 Then {
25 stream: s,
26 future: None,
27 f: f,
28 }
29}
30
31impl<S, U, F> Sink for Then<S, U, F>
33 where S: Sink, U: IntoFuture,
34{
35 type SinkItem = S::SinkItem;
36 type SinkError = S::SinkError;
37
38 delegate_sink!(stream);
39}
40
41impl<S, U, F> Stream for Then<S, U, F>
42 where S: Stream,
43 F: FnMut(Result<S::Item, S::Error>) -> U,
44 U: IntoFuture,
45{
46 type Item = U::Item;
47 type Error = U::Error;
48
49 fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<U::Item>, U::Error> {
50 if self.future.is_none() {
51 let item = match self.stream.poll_next(cx) {
52 Ok(Async::Pending) => return Ok(Async::Pending),
53 Ok(Async::Ready(None)) => return Ok(Async::Ready(None)),
54 Ok(Async::Ready(Some(e))) => Ok(e),
55 Err(e) => Err(e),
56 };
57 self.future = Some((self.f)(item).into_future());
58 }
59 assert!(self.future.is_some());
60 match self.future.as_mut().unwrap().poll(cx) {
61 Ok(Async::Ready(e)) => {
62 self.future = None;
63 Ok(Async::Ready(Some(e)))
64 }
65 Err(e) => {
66 self.future = None;
67 Err(e)
68 }
69 Ok(Async::Pending) => Ok(Async::Pending)
70 }
71 }
72}