fluxus_transformers/operator/
builder.rs1use super::{FilterOperator, MapOperator, WindowReduceOperator};
2use fluxus_utils::window::WindowConfig;
3
4pub struct OperatorBuilder;
6
7type AveragePair<T> = (T, usize);
9type AverageReduceFn<T> =
10 Box<dyn Fn(AveragePair<T>, AveragePair<T>) -> AveragePair<T> + Send + Sync>;
11
12impl OperatorBuilder {
13 pub fn map<In, Out, F>(func: F) -> MapOperator<In, Out, F>
15 where
16 F: Fn(In) -> Out + Send + Sync,
17 {
18 MapOperator::new(func)
19 }
20
21 pub fn filter<T, F>(predicate: F) -> FilterOperator<T, F>
23 where
24 F: Fn(&T) -> bool + Send + Sync,
25 {
26 FilterOperator::new(predicate)
27 }
28
29 pub fn window_reduce<T, F>(func: F, window: WindowConfig) -> WindowReduceOperator<T, F>
31 where
32 F: Fn(T, T) -> T + Send + Sync,
33 T: Clone,
34 {
35 WindowReduceOperator::new(func, window)
36 }
37
38 pub fn sum_window<T>(window: WindowConfig) -> WindowReduceOperator<T, impl Fn(T, T) -> T>
40 where
41 T: std::ops::Add<Output = T> + Clone + Send,
42 {
43 Self::window_reduce(|a, b| a + b, window)
44 }
45
46 pub fn count_window(
48 window: WindowConfig,
49 ) -> WindowReduceOperator<usize, impl Fn(usize, usize) -> usize> {
50 Self::window_reduce(|count, _| count + 1, window)
51 }
52
53 pub fn avg_window<T>(
55 window: WindowConfig,
56 ) -> WindowReduceOperator<AveragePair<T>, AverageReduceFn<T>>
57 where
58 T: std::ops::Add<Output = T> + Clone + Send + 'static,
59 {
60 Self::window_reduce(
61 Box::new(|(sum1, count1), (sum2, count2)| (sum1 + sum2, count1 + count2)),
62 window,
63 )
64 }
65}