atomr_streams/
materializer.rs1use std::future::Future;
4
5use crate::runnable::RunnableGraph;
6use crate::sink::Sink;
7use crate::source::Source;
8
9#[derive(Default, Clone)]
10pub struct ActorMaterializer;
11
12impl ActorMaterializer {
13 pub fn new() -> Self {
14 Self
15 }
16
17 pub async fn run_collect<T: Send + 'static>(&self, source: Source<T>) -> Vec<T> {
19 Sink::collect(source).await
20 }
21
22 pub async fn run<M: Send + 'static>(&self, graph: RunnableGraph<M>) -> M {
24 graph.run().await
25 }
26
27 pub async fn run_with<T, F, Fut, Out>(&self, source: Source<T>, terminator: F) -> Out
29 where
30 T: Send + 'static,
31 F: FnOnce(Source<T>) -> Fut,
32 Fut: Future<Output = Out>,
33 {
34 terminator(source).await
35 }
36}
37
38#[cfg(test)]
39mod tests {
40 use super::*;
41 use crate::flow::Flow;
42
43 #[tokio::test]
44 async fn map_and_collect_pipeline() {
45 let mat = ActorMaterializer::new();
46 let source = Source::from_iter(vec![1, 2, 3, 4]);
47 let flow: Flow<i32, i32> = Flow::from_fn(|x| x * 2);
48 let result = mat.run_collect(source.via(flow)).await;
49 assert_eq!(result, vec![2, 4, 6, 8]);
50 }
51
52 #[tokio::test]
53 async fn fold_via_sink() {
54 let source = Source::from_iter(1..=5i32);
55 let sum = Sink::fold(source, 0, |acc, x| acc + x).await;
56 assert_eq!(sum, 15);
57 }
58
59 #[tokio::test]
60 async fn runnable_graph_to_seq() {
61 let mat = ActorMaterializer::new();
62 let graph = RunnableGraph::to_seq(Source::from_iter(vec![10, 20, 30]));
63 assert_eq!(mat.run(graph).await, vec![10, 20, 30]);
64 }
65}