1use super::Sink;
2use crate::unfold_state::UnfoldState;
3use core::{
4 fmt,
5 future::Future,
6 marker::PhantomPinned,
7 pin::Pin,
8 task::{Context, Poll},
9};
10
11#[must_use = "sinks do nothing unless polled"]
13pub struct Unfold<T, F, Fut> {
14 function: F,
15 state: UnfoldState<T, Fut>,
16}
17
18struct UnfoldProj<'pin, T, F, Fut> {
19 function: &'pin mut F,
20 state: Pin<&'pin mut UnfoldState<T, Fut>>,
21}
22
23impl<T, F, Fut> Unfold<T, F, Fut> {
24 unsafe fn project<'a>(self: Pin<&'a mut Self>) -> UnfoldProj<'a, T, F, Fut> {
33 let this = self.get_unchecked_mut();
34 UnfoldProj {
35 function: &mut this.function,
36 state: Pin::new_unchecked(&mut this.state),
37 }
38 }
39}
40
41impl<T, F, Fut> fmt::Debug for Unfold<T, F, Fut>
42where
43 T: fmt::Debug,
44 Fut: fmt::Debug,
45{
46 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47 f.debug_struct("Unfold")
48 .field("state", &self.state)
49 .finish()
50 }
51}
52
53pub fn unfold<T, F, Fut, Item, E>(init: T, function: F) -> Unfold<T, F, Fut>
84where
85 F: FnMut(T, Item) -> Fut,
86 Fut: Future<Output = Result<T, E>>,
87{
88 Unfold {
89 function,
90 state: UnfoldState::Value { value: init },
91 }
92}
93
94impl<T, F, Fut, Item, E> Sink<Item> for Unfold<T, F, Fut>
95where
96 F: FnMut(T, Item) -> Fut,
97 Fut: Future<Output = Result<T, E>>,
98{
99 type Error = E;
100
101 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
102 self.poll_flush(cx)
103 }
104
105 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
106 let mut this = unsafe { self.project() };
107 let future = match this.state.as_mut().take_value() {
108 Some(value) => (this.function)(value, item),
109 None => panic!("start_send called without poll_ready being called first"),
110 };
111 this.state.set(UnfoldState::Future {
112 future,
113 _pin: PhantomPinned,
114 });
115 Ok(())
116 }
117
118 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
119 let mut this = unsafe { self.project() };
120 if let Some(future) = this.state.as_mut().project_future() {
121 match future.poll(cx) {
122 Poll::Pending => Poll::Pending,
123 Poll::Ready(Ok(state)) => {
124 this.state.set(UnfoldState::Value { value: state });
125 Poll::Ready(Ok(()))
126 }
127 Poll::Ready(Err(err)) => {
128 this.state.set(UnfoldState::Empty);
129 Poll::Ready(Err(err))
130 }
131 }
132 } else {
133 Poll::Ready(Ok(()))
134 }
135 }
136
137 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
138 self.poll_flush(cx)
139 }
140}