fluxus_transformers/operator/
builder.rs

1use super::{FilterOperator, MapOperator, WindowReduceOperator};
2use fluxus_utils::window::WindowConfig;
3
4/// Builder for creating stream operators
5pub struct OperatorBuilder;
6
7// Add type aliases at the module level
8type AveragePair<T> = (T, usize);
9type AverageReduceFn<T> =
10    Box<dyn Fn(AveragePair<T>, AveragePair<T>) -> AveragePair<T> + Send + Sync>;
11
12impl OperatorBuilder {
13    /// Create a new map operator
14    pub fn map<In, Out, F>(func: F) -> MapOperator<In, Out, F>
15    where
16        F: Fn(In) -> Out + Send + Sync,
17    {
18        MapOperator::new(func)
19    }
20
21    /// Create a new filter operator
22    pub fn filter<T, F>(predicate: F) -> FilterOperator<T, F>
23    where
24        F: Fn(&T) -> bool + Send + Sync,
25    {
26        FilterOperator::new(predicate)
27    }
28
29    /// Create a new window reduce operator
30    pub fn window_reduce<T, F>(func: F, window: WindowConfig) -> WindowReduceOperator<T, F>
31    where
32        F: Fn(T, T) -> T + Send + Sync,
33        T: Clone,
34    {
35        WindowReduceOperator::new(func, window)
36    }
37
38    /// Helper to create a sum operator with a window
39    pub fn sum_window<T>(window: WindowConfig) -> WindowReduceOperator<T, impl Fn(T, T) -> T>
40    where
41        T: std::ops::Add<Output = T> + Clone + Send,
42    {
43        Self::window_reduce(|a, b| a + b, window)
44    }
45
46    /// Helper to create a count operator with a window
47    pub fn count_window(
48        window: WindowConfig,
49    ) -> WindowReduceOperator<usize, impl Fn(usize, usize) -> usize> {
50        Self::window_reduce(|count, _| count + 1, window)
51    }
52
53    /// Helper to create an average operator with a window
54    pub fn avg_window<T>(
55        window: WindowConfig,
56    ) -> WindowReduceOperator<AveragePair<T>, AverageReduceFn<T>>
57    where
58        T: std::ops::Add<Output = T> + Clone + Send + 'static,
59    {
60        Self::window_reduce(
61            Box::new(|(sum1, count1), (sum2, count2)| (sum1 + sum2, count1 + count2)),
62            window,
63        )
64    }
65}