1use crate::handler::Handler;
2use crate::Context;
3use std::future::Future;
4
5pub struct Sink<H> {
7 handlers: H,
8 pub(crate) concurrent: bool,
9}
10
11pub fn sink<H>(handlers: H) -> Sink<H> {
15 Sink {
16 handlers,
17 concurrent: false,
18 }
19}
20
21impl<H> Sink<H> {
22 pub fn concurrent(mut self) -> Self {
24 self.concurrent = true;
25 self
26 }
27
28 pub fn sequential(mut self) -> Self {
30 self.concurrent = false;
31 self
32 }
33}
34
35macro_rules! impl_sink_for_tuple {
36 ($( $H:ident, $O:ident, $Args:ident, $idx:tt );+ $(;)?) => {
37 impl<I, $( $H, $O, $Args ),+> Handler<I, ( $( $O, )+ ), ( $( $Args, )+ )> for Sink<( $( $H, )+ )>
38 where
39 I: Clone + Send + Sync + 'static,
40 $(
41 $O: Send + 'static,
42 $H: Handler<I, $O, $Args>,
43 $Args: Send + Sync + 'static,
44 )+
45 {
46 fn call(&self, ctx: Context<I>) -> impl Future<Output = ( $( $O, )+ )> + Send {
47 async move {
48 if self.concurrent {
49 #[allow(non_snake_case)]
51 let ( $( $H, )+ ) = (
52 $(
53 self.handlers.$idx.call(ctx.clone()),
54 )+
55 );
56 futures::join!( $( $H ),+ )
58 } else {
59 #[allow(non_snake_case)]
61 let ( $( $H, )+ ) = (
62 $(
63 self.handlers.$idx.call(ctx.clone()).await,
64 )+
65 );
66 ( $( $H, )+ )
67 }
68 }
69 }
70 }
71 };
72}
73
74impl_sink_for_tuple!(H1, O1, A1, 0);
75impl_sink_for_tuple!(H1, O1, A1, 0; H2, O2, A2, 1);
76impl_sink_for_tuple!(H1, O1, A1, 0; H2, O2, A2, 1; H3, O3, A3, 2);
77impl_sink_for_tuple!(H1, O1, A1, 0; H2, O2, A2, 1; H3, O3, A3, 2; H4, O4, A4, 3);
78impl_sink_for_tuple!(H1, O1, A1, 0; H2, O2, A2, 1; H3, O3, A3, 2; H4, O4, A4, 3; H5, O5, A5, 4);
79impl_sink_for_tuple!(H1, O1, A1, 0; H2, O2, A2, 1; H3, O3, A3, 2; H4, O4, A4, 3; H5, O5, A5, 4; H6, O6, A6, 5);
80impl_sink_for_tuple!(H1, O1, A1, 0; H2, O2, A2, 1; H3, O3, A3, 2; H4, O4, A4, 3; H5, O5, A5, 4; H6, O6, A6, 5; H7, O7, A7, 6);
81impl_sink_for_tuple!(H1, O1, A1, 0; H2, O2, A2, 1; H3, O3, A3, 2; H4, O4, A4, 3; H5, O5, A5, 4; H6, O6, A6, 5; H7, O7, A7, 6; H8, O8, A8, 7);
82impl_sink_for_tuple!(H1, O1, A1, 0; H2, O2, A2, 1; H3, O3, A3, 2; H4, O4, A4, 3; H5, O5, A5, 4; H6, O6, A6, 5; H7, O7, A7, 6; H8, O8, A8, 7; H9, O9, A9, 8);
83impl_sink_for_tuple!(H1, O1, A1, 0; H2, O2, A2, 1; H3, O3, A3, 2; H4, O4, A4, 3; H5, O5, A5, 4; H6, O6, A6, 5; H7, O7, A7, 6; H8, O8, A8, 7; H9, O9, A9, 8; H10, O10, A10, 9);
84
85#[cfg(test)]
86mod tests {
87 use super::*;
88 use crate::{Context, Input};
89 use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
90
91 #[tokio::test]
92 async fn test_sink_sequential() {
93 async fn task1(input: Input<i32>) -> i32 { *input + 1 }
94 async fn task2(input: Input<i32>) -> String { format!("val: {}", *input) }
95
96 let pipeline = sink((task1, task2));
98 let ctx = Context::empty(10);
99 let result = pipeline.call(ctx).await;
100
101 assert_eq!(result, (11, "val: 10".to_string()));
102 }
103
104 #[tokio::test]
105 async fn test_sink_concurrent() {
106 async fn slow_task(input: Input<i32>) -> i32 {
107 *input * 2
109 }
110 async fn fast_task(input: Input<i32>) -> i32 {
111 *input + 5
112 }
113
114 let pipeline = sink((slow_task, fast_task)).concurrent();
116 let ctx = Context::empty(10);
117 let result = pipeline.call(ctx).await;
118
119 assert_eq!(result, (20, 15));
120 }
121
122 #[tokio::test]
123 async fn test_sink_side_effects() {
124 let counter = Arc::new(AtomicUsize::new(0));
126
127 async fn inc1(c: crate::Res<Arc<AtomicUsize>>) -> () {
128 c.fetch_add(1, Ordering::SeqCst);
129 }
130
131 let pipeline = sink((inc1, inc1));
136 let ctx = Context::new((), crate::Shared::new().insert(counter.clone()));
137
138 pipeline.call(ctx).await;
139
140 assert_eq!(counter.load(Ordering::SeqCst), 2);
141 }
142}
143