Skip to main content

pipe_it/
sink.rs

1use crate::handler::Handler;
2use crate::Context;
3use std::future::Future;
4
5/// A pipeline aggregator that distributes input to multiple handlers and collects their outputs.
6pub struct Sink<H> {
7    handlers: H,
8    pub(crate) concurrent: bool,
9}
10
11/// Creates a new Sink with the given handlers.
12/// Defaults to sequential execution.
13/// Use `.concurrent()` to enable concurrent execution.
14pub fn sink<H>(handlers: H) -> Sink<H> {
15    Sink {
16        handlers,
17        concurrent: false,
18    }
19}
20
21impl<H> Sink<H> {
22    /// Enable concurrent execution of the handlers using `futures::join!`.
23    pub fn concurrent(mut self) -> Self {
24        self.concurrent = true;
25        self
26    }
27
28    /// Enable sequential execution of the handlers (default).
29    pub fn sequential(mut self) -> Self {
30        self.concurrent = false;
31        self
32    }
33}
34
35macro_rules! impl_sink_for_tuple {
36    ($( $H:ident, $O:ident, $Args:ident, $idx:tt );+ $(;)?) => {
37        impl<I, $( $H, $O, $Args ),+> Handler<I, ( $( $O, )+ ), ( $( $Args, )+ )> for Sink<( $( $H, )+ )>
38        where
39            I: Clone + Send + Sync + 'static,
40            $(
41                $O: Send + 'static,
42                $H: Handler<I, $O, $Args>,
43                $Args: Send + Sync + 'static,
44            )+
45        {
46            fn call(&self, ctx: Context<I>) -> impl Future<Output = ( $( $O, )+ )> + Send {
47                async move {
48                    if self.concurrent {
49                        // Create futures for all handlers
50                        #[allow(non_snake_case)]
51                        let ( $( $H, )+ ) = (
52                            $(
53                                self.handlers.$idx.call(ctx.clone()),
54                            )+
55                        );
56                        // Await them concurrently
57                        futures::join!( $( $H ),+ )
58                    } else {
59                        // Execute handlers sequentially
60                        #[allow(non_snake_case)]
61                        let ( $( $H, )+ ) = (
62                            $(
63                                self.handlers.$idx.call(ctx.clone()).await,
64                            )+
65                        );
66                        ( $( $H, )+ )
67                    }
68                }
69            }
70        }
71    };
72}
73
74impl_sink_for_tuple!(H1, O1, A1, 0);
75impl_sink_for_tuple!(H1, O1, A1, 0; H2, O2, A2, 1);
76impl_sink_for_tuple!(H1, O1, A1, 0; H2, O2, A2, 1; H3, O3, A3, 2);
77impl_sink_for_tuple!(H1, O1, A1, 0; H2, O2, A2, 1; H3, O3, A3, 2; H4, O4, A4, 3);
78impl_sink_for_tuple!(H1, O1, A1, 0; H2, O2, A2, 1; H3, O3, A3, 2; H4, O4, A4, 3; H5, O5, A5, 4);
79impl_sink_for_tuple!(H1, O1, A1, 0; H2, O2, A2, 1; H3, O3, A3, 2; H4, O4, A4, 3; H5, O5, A5, 4; H6, O6, A6, 5);
80impl_sink_for_tuple!(H1, O1, A1, 0; H2, O2, A2, 1; H3, O3, A3, 2; H4, O4, A4, 3; H5, O5, A5, 4; H6, O6, A6, 5; H7, O7, A7, 6);
81impl_sink_for_tuple!(H1, O1, A1, 0; H2, O2, A2, 1; H3, O3, A3, 2; H4, O4, A4, 3; H5, O5, A5, 4; H6, O6, A6, 5; H7, O7, A7, 6; H8, O8, A8, 7);
82impl_sink_for_tuple!(H1, O1, A1, 0; H2, O2, A2, 1; H3, O3, A3, 2; H4, O4, A4, 3; H5, O5, A5, 4; H6, O6, A6, 5; H7, O7, A7, 6; H8, O8, A8, 7; H9, O9, A9, 8);
83impl_sink_for_tuple!(H1, O1, A1, 0; H2, O2, A2, 1; H3, O3, A3, 2; H4, O4, A4, 3; H5, O5, A5, 4; H6, O6, A6, 5; H7, O7, A7, 6; H8, O8, A8, 7; H9, O9, A9, 8; H10, O10, A10, 9);
84
85#[cfg(test)]
86mod tests {
87    use super::*;
88    use crate::{Context, Input};
89    use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
90
91    #[tokio::test]
92    async fn test_sink_sequential() {
93        async fn task1(input: Input<i32>) -> i32 { *input + 1 }
94        async fn task2(input: Input<i32>) -> String { format!("val: {}", *input) }
95
96        // Sequential (default)
97        let pipeline = sink((task1, task2));
98        let ctx = Context::empty(10);
99        let result = pipeline.call(ctx).await;
100
101        assert_eq!(result, (11, "val: 10".to_string()));
102    }
103
104    #[tokio::test]
105    async fn test_sink_concurrent() {
106        async fn slow_task(input: Input<i32>) -> i32 { 
107            // In a real concurrent test we might sleep, but for logic check:
108            *input * 2 
109        }
110        async fn fast_task(input: Input<i32>) -> i32 { 
111            *input + 5 
112        }
113
114        // Concurrent
115        let pipeline = sink((slow_task, fast_task)).concurrent();
116        let ctx = Context::empty(10);
117        let result = pipeline.call(ctx).await;
118
119        assert_eq!(result, (20, 15));
120    }
121
122    #[tokio::test]
123    async fn test_sink_side_effects() {
124        // Test that both handlers actually ran
125        let counter = Arc::new(AtomicUsize::new(0));
126        
127        async fn inc1(c: crate::Res<Arc<AtomicUsize>>) -> () {
128            c.fetch_add(1, Ordering::SeqCst);
129        }
130        
131        // We need to wrap closures or define functions that take compatible args.
132        // But functions carrying state is hard without closures.
133        // Let's rely on Shared resources for state.
134        
135        let pipeline = sink((inc1, inc1));
136        let ctx = Context::new((), crate::Shared::new().insert(counter.clone()));
137        
138        pipeline.call(ctx).await;
139        
140        assert_eq!(counter.load(Ordering::SeqCst), 2);
141    }
142}
143