Skip to main content

atomr_streams/
materializer.rs

1//! ActorMaterializer — runs graphs on a Tokio runtime.
2
3use std::future::Future;
4
5use crate::runnable::RunnableGraph;
6use crate::sink::Sink;
7use crate::source::Source;
8
9#[derive(Default, Clone)]
10pub struct ActorMaterializer;
11
12impl ActorMaterializer {
13    pub fn new() -> Self {
14        Self
15    }
16
17    /// Convenience: run a source into a collecting sink and return the result.
18    pub async fn run_collect<T: Send + 'static>(&self, source: Source<T>) -> Vec<T> {
19        Sink::collect(source).await
20    }
21
22    /// Run an arbitrary `RunnableGraph`.
23    pub async fn run<M: Send + 'static>(&self, graph: RunnableGraph<M>) -> M {
24        graph.run().await
25    }
26
27    /// Run a source against an arbitrary async terminator.
28    pub async fn run_with<T, F, Fut, Out>(&self, source: Source<T>, terminator: F) -> Out
29    where
30        T: Send + 'static,
31        F: FnOnce(Source<T>) -> Fut,
32        Fut: Future<Output = Out>,
33    {
34        terminator(source).await
35    }
36}
37
38#[cfg(test)]
39mod tests {
40    use super::*;
41    use crate::flow::Flow;
42
43    #[tokio::test]
44    async fn map_and_collect_pipeline() {
45        let mat = ActorMaterializer::new();
46        let source = Source::from_iter(vec![1, 2, 3, 4]);
47        let flow: Flow<i32, i32> = Flow::from_fn(|x| x * 2);
48        let result = mat.run_collect(source.via(flow)).await;
49        assert_eq!(result, vec![2, 4, 6, 8]);
50    }
51
52    #[tokio::test]
53    async fn fold_via_sink() {
54        let source = Source::from_iter(1..=5i32);
55        let sum = Sink::fold(source, 0, |acc, x| acc + x).await;
56        assert_eq!(sum, 15);
57    }
58
59    #[tokio::test]
60    async fn runnable_graph_to_seq() {
61        let mat = ActorMaterializer::new();
62        let graph = RunnableGraph::to_seq(Source::from_iter(vec![10, 20, 30]));
63        assert_eq!(mat.run(graph).await, vec![10, 20, 30]);
64    }
65}