tokio_stream_util/try_stream/ext/
try_unfold.rs

1use core::fmt;
2use core::pin::Pin;
3use futures_core::future::TryFuture;
4use futures_core::task::{Context, Poll};
5use tokio_stream::Stream;
6
7use super::FusedStream;
8
9/// Creates a `TryStream` from a seed and a closure returning a `TryFuture`.
10///
11/// This function is the dual for the `TryStream::try_fold()` adapter: while
12/// `TryStream::try_fold()` reduces a `TryStream` to one single value,
13/// `try_unfold()` creates a `TryStream` from a seed value.
14///
15/// `try_unfold()` will call the provided closure with the provided seed, then
16/// wait for the returned `TryFuture` to complete with `(a, b)`. It will then
17/// yield the value `a`, and use `b` as the next internal state.
18///
19/// If the closure returns `None` instead of `Some(TryFuture)`, then the
20/// `try_unfold()` will stop producing items and return `Poll::Ready(None)` in
21/// future calls to `poll()`.
22///
23/// In case of error generated by the returned `TryFuture`, the error will be
24/// returned by the `TryStream`. The `TryStream` will then yield
25/// `Poll::Ready(None)` in future calls to `poll()`.
26///
27/// This function can typically be used when wanting to go from the "world of
28/// futures" to the "world of streams": the provided closure can build a
29/// `TryFuture` using other library functions working on futures, and
30/// `try_unfold()` will turn it into a `TryStream` by repeating the operation.
31///
32/// # Example
33///
34/// ```
35/// use tokio_stream::StreamExt;
36///
37/// #[derive(Debug, PartialEq)]
38/// struct SomeError;
39///
40///  #[tokio::main]
41///  async fn main() {
42///     let stream = tokio_stream_util::try_stream::try_unfold(0, |state| async move {
43///         if state < 0 {
44///             return Err(SomeError);
45///         }
46///
47///         if state <= 2 {
48///             let next_state = state + 1;
49///             let yielded = state * 2;
50///             Ok(Some((yielded, next_state)))
51///         } else {
52///             Ok(None)
53///         }
54///     });
55///
56///     assert_eq!(stream.collect::<Vec<_>>().await, vec![Ok(0), Ok(2), Ok(4)]);
57/// }
58/// ```
59pub fn try_unfold<T, F, Fut, Item>(init: T, f: F) -> TryUnfold<T, F, Fut>
60where
61    F: FnMut(T) -> Fut,
62    Fut: TryFuture<Ok = Option<(Item, T)>>,
63{
64    TryUnfold {
65        f,
66        state: Some(init),
67        fut: None,
68    }
69}
70
71/// Stream for the [`try_unfold`] function.
72#[must_use = "streams do nothing unless polled"]
73pub struct TryUnfold<T, F, Fut> {
74    f: F,
75    state: Option<T>,
76    fut: Option<Fut>,
77}
78
79impl<T, F, Fut> Unpin for TryUnfold<T, F, Fut> where Fut: Unpin {}
80
81impl<T, F, Fut, Item> FusedStream for TryUnfold<T, F, Fut>
82where
83    F: FnMut(T) -> Fut,
84    Fut: TryFuture<Ok = Option<(Item, T)>>,
85{
86    fn is_terminated(&self) -> bool {
87        self.state.is_none() && self.fut.is_none()
88    }
89}
90
91impl<T, F, Fut> fmt::Debug for TryUnfold<T, F, Fut>
92where
93    T: fmt::Debug,
94    Fut: fmt::Debug,
95{
96    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97        f.debug_struct("TryUnfold")
98            .field("state", &self.state)
99            .field("fut", &self.fut)
100            .finish()
101    }
102}
103
104impl<T, F, Fut, Item> Stream for TryUnfold<T, F, Fut>
105where
106    F: FnMut(T) -> Fut,
107    Fut: TryFuture<Ok = Option<(Item, T)>>,
108{
109    type Item = Result<Item, Fut::Error>;
110
111    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
112        let this = unsafe { self.get_unchecked_mut() };
113
114        if let Some(state) = this.state.take() {
115            let fut = (this.f)(state);
116            unsafe { Pin::new_unchecked(&mut this.fut) }.set(Some(fut));
117        }
118
119        let mut fut_pinned = unsafe { Pin::new_unchecked(&mut this.fut) };
120
121        match fut_pinned.as_mut().as_pin_mut() {
122            None => {
123                // The future previously errored or stream is done.
124                Poll::Ready(None)
125            }
126            Some(future) => {
127                let step = match future.try_poll(cx) {
128                    Poll::Ready(step) => step,
129                    Poll::Pending => return Poll::Pending,
130                };
131                fut_pinned.set(None);
132
133                match step {
134                    Ok(Some((item, next_state))) => {
135                        this.state = Some(next_state);
136                        Poll::Ready(Some(Ok(item)))
137                    }
138                    Ok(None) => Poll::Ready(None),
139                    Err(e) => Poll::Ready(Some(Err(e))),
140                }
141            }
142        }
143    }
144}