futures_util/stream/unfold.rs
1use core::mem;
2
3use futures_core::{Future, IntoFuture, Async, Poll, Stream};
4use futures_core::task;
5
6/// Creates a `Stream` from a seed and a closure returning a `Future`.
7///
8/// This function is the dual for the `Stream::fold()` adapter: while
9/// `Stream::fold()` reduces a `Stream` to one single value, `unfold()` creates a
10/// `Stream` from a seed value.
11///
12/// `unfold()` will call the provided closure with the provided seed, then wait
13/// for the returned `Future` to complete with `(a, b)`. It will then yield the
14/// value `a`, and use `b` as the next internal state.
15///
16/// If the closure returns `None` instead of `Some(Future)`, then the `unfold()`
17/// will stop producing items and return `Ok(Async::Ready(None))` in future
18/// calls to `poll()`.
19///
20/// In case of error generated by the returned `Future`, the error will be
21/// returned by the `Stream`. The `Stream` will then yield
22/// `Ok(Async::Ready(None))` in future calls to `poll()`.
23///
24/// This function can typically be used when wanting to go from the "world of
25/// futures" to the "world of streams": the provided closure can build a
26/// `Future` using other library functions working on futures, and `unfold()`
27/// will turn it into a `Stream` by repeating the operation.
28///
29/// # Example
30///
31/// ```rust
32/// # extern crate futures;
33/// # extern crate futures_executor;
34///
35/// use futures::prelude::*;
36/// use futures::stream;
37/// use futures::future;
38/// use futures_executor::block_on;
39///
40/// # fn main() {
41/// let mut stream = stream::unfold(0, |state| {
42/// if state <= 2 {
43/// let next_state = state + 1;
44/// let yielded = state * 2;
45/// let fut = future::ok::<_, u32>((yielded, next_state));
46/// Some(fut)
47/// } else {
48/// None
49/// }
50/// });
51///
52/// let result = block_on(stream.collect());
53/// assert_eq!(result, Ok(vec![0, 2, 4]));
54/// # }
55/// ```
56pub fn unfold<T, F, Fut, It>(init: T, f: F) -> Unfold<T, F, Fut>
57 where F: FnMut(T) -> Fut,
58 Fut: IntoFuture<Item = Option<(It, T)>>,
59{
60 Unfold {
61 f: f,
62 state: State::Ready(init),
63 }
64}
65
66/// A stream which creates futures, polls them and return their result
67///
68/// This stream is returned by the `futures::stream::unfold` method
69#[derive(Debug)]
70#[must_use = "streams do nothing unless polled"]
71pub struct Unfold<T, F, Fut> where Fut: IntoFuture {
72 f: F,
73 state: State<T, Fut::Future>,
74}
75
76impl <T, F, Fut, It> Stream for Unfold<T, F, Fut>
77 where F: FnMut(T) -> Fut,
78 Fut: IntoFuture<Item = Option<(It, T)>>,
79{
80 type Item = It;
81 type Error = Fut::Error;
82
83 fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<It>, Fut::Error> {
84 loop {
85 match mem::replace(&mut self.state, State::Empty) {
86 // State::Empty may happen if the future returned an error
87 State::Empty => { return Ok(Async::Ready(None)); }
88 State::Ready(state) => {
89 self.state = State::Processing((self.f)(state).into_future());
90 }
91 State::Processing(mut fut) => {
92 match fut.poll(cx)? {
93 Async:: Ready(Some((item, next_state))) => {
94 self.state = State::Ready(next_state);
95 return Ok(Async::Ready(Some(item)));
96 }
97 Async:: Ready(None) => {
98 return Ok(Async::Ready(None))
99 }
100 Async::Pending => {
101 self.state = State::Processing(fut);
102 return Ok(Async::Pending);
103 }
104 }
105 }
106 }
107 }
108 }
109}
110
111#[derive(Debug)]
112enum State<T, F> where F: Future {
113 /// Placeholder state when doing work, or when the returned Future generated an error
114 Empty,
115
116 /// Ready to generate new future; current internal state is the `T`
117 Ready(T),
118
119 /// Working on a future generated previously
120 Processing(F),
121}