tokio_stream_util/try_stream/ext/try_unfold.rs
1use core::fmt;
2use core::pin::Pin;
3use futures_core::future::TryFuture;
4use futures_core::task::{Context, Poll};
5use tokio_stream::Stream;
6
7use super::FusedStream;
8
9/// Creates a `TryStream` from a seed and a closure returning a `TryFuture`.
10///
11/// This function is the dual for the `TryStream::try_fold()` adapter: while
12/// `TryStream::try_fold()` reduces a `TryStream` to one single value,
13/// `try_unfold()` creates a `TryStream` from a seed value.
14///
15/// `try_unfold()` will call the provided closure with the provided seed, then
16/// wait for the returned `TryFuture` to complete with `(a, b)`. It will then
17/// yield the value `a`, and use `b` as the next internal state.
18///
19/// If the closure returns `None` instead of `Some(TryFuture)`, then the
20/// `try_unfold()` will stop producing items and return `Poll::Ready(None)` in
21/// future calls to `poll()`.
22///
23/// In case of error generated by the returned `TryFuture`, the error will be
24/// returned by the `TryStream`. The `TryStream` will then yield
25/// `Poll::Ready(None)` in future calls to `poll()`.
26///
27/// This function can typically be used when wanting to go from the "world of
28/// futures" to the "world of streams": the provided closure can build a
29/// `TryFuture` using other library functions working on futures, and
30/// `try_unfold()` will turn it into a `TryStream` by repeating the operation.
31///
32/// # Example
33///
34/// ```
35/// use tokio_stream::StreamExt;
36///
37/// #[derive(Debug, PartialEq)]
38/// struct SomeError;
39///
40/// #[tokio::main]
41/// async fn main() {
42/// let stream = tokio_stream_util::try_stream::try_unfold(0, |state| async move {
43/// if state < 0 {
44/// return Err(SomeError);
45/// }
46///
47/// if state <= 2 {
48/// let next_state = state + 1;
49/// let yielded = state * 2;
50/// Ok(Some((yielded, next_state)))
51/// } else {
52/// Ok(None)
53/// }
54/// });
55///
56/// assert_eq!(stream.collect::<Vec<_>>().await, vec![Ok(0), Ok(2), Ok(4)]);
57/// }
58/// ```
59pub fn try_unfold<T, F, Fut, Item>(init: T, f: F) -> TryUnfold<T, F, Fut>
60where
61 F: FnMut(T) -> Fut,
62 Fut: TryFuture<Ok = Option<(Item, T)>>,
63{
64 TryUnfold {
65 f,
66 state: Some(init),
67 fut: None,
68 }
69}
70
71/// Stream for the [`try_unfold`] function.
72#[must_use = "streams do nothing unless polled"]
73pub struct TryUnfold<T, F, Fut> {
74 f: F,
75 state: Option<T>,
76 fut: Option<Fut>,
77}
78
79impl<T, F, Fut> Unpin for TryUnfold<T, F, Fut> where Fut: Unpin {}
80
81impl<T, F, Fut, Item> FusedStream for TryUnfold<T, F, Fut>
82where
83 F: FnMut(T) -> Fut,
84 Fut: TryFuture<Ok = Option<(Item, T)>>,
85{
86 fn is_terminated(&self) -> bool {
87 self.state.is_none() && self.fut.is_none()
88 }
89}
90
91impl<T, F, Fut> fmt::Debug for TryUnfold<T, F, Fut>
92where
93 T: fmt::Debug,
94 Fut: fmt::Debug,
95{
96 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97 f.debug_struct("TryUnfold")
98 .field("state", &self.state)
99 .field("fut", &self.fut)
100 .finish()
101 }
102}
103
104impl<T, F, Fut, Item> Stream for TryUnfold<T, F, Fut>
105where
106 F: FnMut(T) -> Fut,
107 Fut: TryFuture<Ok = Option<(Item, T)>>,
108{
109 type Item = Result<Item, Fut::Error>;
110
111 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
112 let this = unsafe { self.get_unchecked_mut() };
113
114 if let Some(state) = this.state.take() {
115 let fut = (this.f)(state);
116 unsafe { Pin::new_unchecked(&mut this.fut) }.set(Some(fut));
117 }
118
119 let mut fut_pinned = unsafe { Pin::new_unchecked(&mut this.fut) };
120
121 match fut_pinned.as_mut().as_pin_mut() {
122 None => {
123 // The future previously errored or stream is done.
124 Poll::Ready(None)
125 }
126 Some(future) => {
127 let step = match future.try_poll(cx) {
128 Poll::Ready(step) => step,
129 Poll::Pending => return Poll::Pending,
130 };
131 fut_pinned.set(None);
132
133 match step {
134 Ok(Some((item, next_state))) => {
135 this.state = Some(next_state);
136 Poll::Ready(Some(Ok(item)))
137 }
138 Ok(None) => Poll::Ready(None),
139 Err(e) => Poll::Ready(Some(Err(e))),
140 }
141 }
142 }
143 }
144}