async_sink/ext/
unfold.rs

1use super::Sink;
2use crate::unfold_state::UnfoldState;
3use core::{
4    fmt,
5    future::Future,
6    marker::PhantomPinned,
7    pin::Pin,
8    task::{Context, Poll},
9};
10
11/// Sink for the [`unfold`] function.
12#[must_use = "sinks do nothing unless polled"]
13pub struct Unfold<T, F, Fut> {
14    function: F,
15    state: UnfoldState<T, Fut>,
16}
17
18struct UnfoldProj<'pin, T, F, Fut> {
19    function: &'pin mut F,
20    state: Pin<&'pin mut UnfoldState<T, Fut>>,
21}
22
23impl<T, F, Fut> Unfold<T, F, Fut> {
24    // Helper to get a mutable reference to the `function` field and a
25    // pinned mutable reference to the `state` field.
26    //
27    // # Safety
28    //
29    // This is `unsafe` because it returns a `Pin` to one of the fields of the
30    // struct. The caller must ensure that they don't move the struct while this
31    // `Pin` is in use.
32    unsafe fn project<'a>(self: Pin<&'a mut Self>) -> UnfoldProj<'a, T, F, Fut> {
33        let this = self.get_unchecked_mut();
34        UnfoldProj {
35            function: &mut this.function,
36            state: Pin::new_unchecked(&mut this.state),
37        }
38    }
39}
40
41impl<T, F, Fut> fmt::Debug for Unfold<T, F, Fut>
42where
43    T: fmt::Debug,
44    Fut: fmt::Debug,
45{
46    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47        f.debug_struct("Unfold")
48            .field("state", &self.state)
49            .finish()
50    }
51}
52
53/// Create a sink from a function which processes one item at a time.
54///
55/// # Examples
56///
57/// ```
58/// use core::pin::pin;
59/// use async_sink::SinkExt;
60/// use tokio::sync::Mutex;
61/// use std::sync::Arc;
62///
63/// #[tokio::main]
64/// async fn main() {
65/// let output: Arc<Mutex<Vec<usize>>> = Arc::new(tokio::sync::Mutex::new(Vec::new()));
66///
67/// let unfold = async_sink::unfold(0, |mut sum, i: usize| {
68///    let cb_output = output.clone();
69///    async move {
70///         sum += i;
71///         cb_output.clone().lock().await.push(sum);
72///         Ok::<_, core::convert::Infallible>(sum)
73///    }
74/// });
75/// let mut unfold = pin!(unfold);
76/// let input: [usize; 3] = [5, 15, 35];
77/// assert!(unfold.send_all(&mut tokio_stream::iter(input.iter().copied().map(|i| Ok(i)))).await.is_ok());
78/// assert_eq!(output.lock().await.as_slice(),input.iter().scan(0, |state, &x|
79///   { *state += x; Some(*state) }).collect::<Vec<usize>>().as_slice()
80/// );
81/// }
82/// ```
83pub fn unfold<T, F, Fut, Item, E>(init: T, function: F) -> Unfold<T, F, Fut>
84where
85    F: FnMut(T, Item) -> Fut,
86    Fut: Future<Output = Result<T, E>>,
87{
88    Unfold {
89        function,
90        state: UnfoldState::Value { value: init },
91    }
92}
93
94impl<T, F, Fut, Item, E> Sink<Item> for Unfold<T, F, Fut>
95where
96    F: FnMut(T, Item) -> Fut,
97    Fut: Future<Output = Result<T, E>>,
98{
99    type Error = E;
100
101    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
102        self.poll_flush(cx)
103    }
104
105    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
106        let mut this = unsafe { self.project() };
107        let future = match this.state.as_mut().take_value() {
108            Some(value) => (this.function)(value, item),
109            None => panic!("start_send called without poll_ready being called first"),
110        };
111        this.state.set(UnfoldState::Future {
112            future,
113            _pin: PhantomPinned,
114        });
115        Ok(())
116    }
117
118    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
119        let mut this = unsafe { self.project() };
120        if let Some(future) = this.state.as_mut().project_future() {
121            match future.poll(cx) {
122                Poll::Pending => Poll::Pending,
123                Poll::Ready(Ok(state)) => {
124                    this.state.set(UnfoldState::Value { value: state });
125                    Poll::Ready(Ok(()))
126                }
127                Poll::Ready(Err(err)) => {
128                    this.state.set(UnfoldState::Empty);
129                    Poll::Ready(Err(err))
130                }
131            }
132        } else {
133            Poll::Ready(Ok(()))
134        }
135    }
136
137    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
138        self.poll_flush(cx)
139    }
140}