marigold_impl/
marifold.rs

1use async_trait::async_trait;
2use futures::stream::Stream;
3use futures::stream::StreamExt;
4use futures::Future;
5use tracing::instrument;
6
7#[async_trait]
8pub trait Marifold<SInput, State, F, Fut> {
9    async fn marifold(
10        self,
11        init: State,
12        f: F,
13    ) -> futures::stream::Once<futures::stream::Fold<SInput, Fut, State, F>>;
14}
15
16/// This is an adapter trait that allows fold from StreamExt to return a Stream
17/// with a single value (the state after the parent stream has been exhausted).
18#[async_trait]
19impl<State, SInput, T, F, Fut> Marifold<SInput, State, F, Fut> for SInput
20where
21    SInput: Stream<Item = T> + Send + Sized,
22    T: Clone + Send + std::fmt::Debug,
23    F: FnMut(State, T) -> Fut + std::marker::Send + 'static,
24    State: std::fmt::Debug + std::marker::Send + 'static,
25    Fut: Future<Output = State>,
26{
27    #[instrument(skip(self, f))]
28    async fn marifold(
29        self,
30        init: State,
31        f: F,
32    ) -> futures::stream::Once<futures::stream::Fold<SInput, Fut, State, F>> {
33        futures::stream::once(self.fold(init, f))
34    }
35}
36
37#[cfg(test)]
38mod tests {
39    use super::Marifold;
40    use futures::stream::StreamExt;
41
42    #[tokio::test]
43    async fn fold() {
44        assert_eq!(
45            futures::stream::iter(0..5)
46                .marifold(0, |acc, x| async move { acc + x })
47                .await
48                .collect::<Vec<u8>>()
49                .await,
50            vec![10]
51        );
52    }
53}