marigold_impl/
marifold.rs1use async_trait::async_trait;
2use futures::stream::Stream;
3use futures::stream::StreamExt;
4use futures::Future;
5use tracing::instrument;
6
7#[async_trait]
8pub trait Marifold<SInput, State, F, Fut> {
9 async fn marifold(
10 self,
11 init: State,
12 f: F,
13 ) -> futures::stream::Once<futures::stream::Fold<SInput, Fut, State, F>>;
14}
15
16#[async_trait]
19impl<State, SInput, T, F, Fut> Marifold<SInput, State, F, Fut> for SInput
20where
21 SInput: Stream<Item = T> + Send + Sized,
22 T: Clone + Send + std::fmt::Debug,
23 F: FnMut(State, T) -> Fut + std::marker::Send + 'static,
24 State: std::fmt::Debug + std::marker::Send + 'static,
25 Fut: Future<Output = State>,
26{
27 #[instrument(skip(self, f))]
28 async fn marifold(
29 self,
30 init: State,
31 f: F,
32 ) -> futures::stream::Once<futures::stream::Fold<SInput, Fut, State, F>> {
33 futures::stream::once(self.fold(init, f))
34 }
35}
36
37#[cfg(test)]
38mod tests {
39 use super::Marifold;
40 use futures::stream::StreamExt;
41
42 #[tokio::test]
43 async fn fold() {
44 assert_eq!(
45 futures::stream::iter(0..5)
46 .marifold(0, |acc, x| async move { acc + x })
47 .await
48 .collect::<Vec<u8>>()
49 .await,
50 vec![10]
51 );
52 }
53}