avila_parallel/
pipeline.rs1use std::marker::PhantomData;
6
7pub trait PipelineStage<T> {
9 type Output;
11 fn execute(&self, input: T) -> Self::Output;
13}
14
15pub struct Pipeline<T, F>
17where
18 F: Fn(T) -> T,
19{
20 stages: Vec<F>,
21 _phantom: PhantomData<T>,
22}
23
24impl<T> Pipeline<T, fn(T) -> T> {
25 pub fn new() -> Self {
27 Self {
28 stages: Vec::new(),
29 _phantom: PhantomData,
30 }
31 }
32}
33
34impl<T, F> Pipeline<T, F>
35where
36 F: Fn(T) -> T,
37{
38 pub fn add_stage(mut self, stage: F) -> Self {
40 self.stages.push(stage);
41 self
42 }
43
44 pub fn execute(&self, mut input: T) -> T {
46 for stage in &self.stages {
47 input = stage(input);
48 }
49 input
50 }
51}
52
53pub struct MapReduce<T, M, R>
55where
56 M: Fn(&T) -> T,
57 R: Fn(T, T) -> T,
58{
59 mapper: M,
60 reducer: R,
61 _phantom: PhantomData<T>,
62}
63
64impl<T, M, R> MapReduce<T, M, R>
65where
66 T: Send + Sync + Clone + 'static,
67 M: Fn(&T) -> T + Send + Sync,
68 R: Fn(T, T) -> T + Send + Sync,
69{
70 pub fn new(mapper: M, reducer: R) -> Self {
72 Self {
73 mapper,
74 reducer,
75 _phantom: PhantomData,
76 }
77 }
78
79 pub fn execute(&self, data: &[T]) -> Option<T> {
81 use crate::executor::parallel_map;
82
83 if data.is_empty() {
84 return None;
85 }
86
87 let mapped = parallel_map(data, &self.mapper);
88
89 let mut result = mapped.into_iter();
91 let first = result.next()?;
92
93 Some(result.fold(first, |acc, item| (self.reducer)(acc, item)))
94 }
95}
96
97pub struct BatchProcessor<T> {
99 batch_size: usize,
100 _phantom: PhantomData<T>,
101}
102
103impl<T> BatchProcessor<T> {
104 pub fn new(batch_size: usize) -> Self {
106 Self {
107 batch_size,
108 _phantom: PhantomData,
109 }
110 }
111
112 pub fn process<F, R>(&self, data: &[T], processor: F) -> Vec<R>
114 where
115 T: Sync,
116 R: Send,
117 F: Fn(&[T]) -> Vec<R> + Send + Sync,
118 {
119 use crate::advanced::parallel_chunks;
120
121 parallel_chunks(data, self.batch_size, processor)
122 .into_iter()
123 .flatten()
124 .collect()
125 }
126}
127
128#[cfg(test)]
129mod tests {
130 use super::*;
131
132 #[test]
133 fn test_map_reduce() {
134 let data: Vec<i32> = vec![1, 2, 3, 4, 5];
135 let mr = MapReduce::new(|x| x * 2, |a, b| a + b);
136 let result = mr.execute(&data);
137 assert_eq!(result, Some(30)); }
139
140 #[test]
141 fn test_batch_processor() {
142 let data: Vec<i32> = (1..=100).collect();
143 let processor = BatchProcessor::new(10);
144 let results: Vec<i32> = processor.process(&data, |batch| {
145 vec![batch.iter().copied().sum::<i32>()]
146 });
147 assert_eq!(results.len(), 10);
148 }
149}