avila_parallel/
pipeline.rs

1//! Parallel pipeline for chaining operations efficiently
2//!
3//! This module provides a high-performance pipeline for composing parallel operations
4
5use std::marker::PhantomData;
6
7/// A parallel pipeline stage
8pub trait PipelineStage<T> {
9    /// The output type of this pipeline stage
10    type Output;
11    /// Execute this pipeline stage on the given input
12    fn execute(&self, input: T) -> Self::Output;
13}
14
15/// Pipeline builder for composing operations
16pub struct Pipeline<T, F>
17where
18    F: Fn(T) -> T,
19{
20    stages: Vec<F>,
21    _phantom: PhantomData<T>,
22}
23
24impl<T> Pipeline<T, fn(T) -> T> {
25    /// Create a new empty pipeline
26    pub fn new() -> Self {
27        Self {
28            stages: Vec::new(),
29            _phantom: PhantomData,
30        }
31    }
32}
33
34impl<T, F> Pipeline<T, F>
35where
36    F: Fn(T) -> T,
37{
38    /// Add a stage to the pipeline
39    pub fn add_stage(mut self, stage: F) -> Self {
40        self.stages.push(stage);
41        self
42    }
43
44    /// Execute the pipeline
45    pub fn execute(&self, mut input: T) -> T {
46        for stage in &self.stages {
47            input = stage(input);
48        }
49        input
50    }
51}
52
53/// Parallel map-reduce pipeline
54pub struct MapReduce<T, M, R>
55where
56    M: Fn(&T) -> T,
57    R: Fn(T, T) -> T,
58{
59    mapper: M,
60    reducer: R,
61    _phantom: PhantomData<T>,
62}
63
64impl<T, M, R> MapReduce<T, M, R>
65where
66    T: Send + Sync + Clone + 'static,
67    M: Fn(&T) -> T + Send + Sync,
68    R: Fn(T, T) -> T + Send + Sync,
69{
70    /// Create a new map-reduce pipeline
71    pub fn new(mapper: M, reducer: R) -> Self {
72        Self {
73            mapper,
74            reducer,
75            _phantom: PhantomData,
76        }
77    }
78
79    /// Execute map-reduce on data
80    pub fn execute(&self, data: &[T]) -> Option<T> {
81        use crate::executor::parallel_map;
82
83        if data.is_empty() {
84            return None;
85        }
86
87        let mapped = parallel_map(data, &self.mapper);
88
89        // Parallel reduce
90        let mut result = mapped.into_iter();
91        let first = result.next()?;
92
93        Some(result.fold(first, |acc, item| (self.reducer)(acc, item)))
94    }
95}
96
97/// Batch processor for efficient bulk operations
98pub struct BatchProcessor<T> {
99    batch_size: usize,
100    _phantom: PhantomData<T>,
101}
102
103impl<T> BatchProcessor<T> {
104    /// Create a new batch processor
105    pub fn new(batch_size: usize) -> Self {
106        Self {
107            batch_size,
108            _phantom: PhantomData,
109        }
110    }
111
112    /// Process data in batches
113    pub fn process<F, R>(&self, data: &[T], processor: F) -> Vec<R>
114    where
115        T: Sync,
116        R: Send,
117        F: Fn(&[T]) -> Vec<R> + Send + Sync,
118    {
119        use crate::advanced::parallel_chunks;
120
121        parallel_chunks(data, self.batch_size, processor)
122            .into_iter()
123            .flatten()
124            .collect()
125    }
126}
127
128#[cfg(test)]
129mod tests {
130    use super::*;
131
132    #[test]
133    fn test_map_reduce() {
134        let data: Vec<i32> = vec![1, 2, 3, 4, 5];
135        let mr = MapReduce::new(|x| x * 2, |a, b| a + b);
136        let result = mr.execute(&data);
137        assert_eq!(result, Some(30)); // (1+2+3+4+5)*2 = 30
138    }
139
140    #[test]
141    fn test_batch_processor() {
142        let data: Vec<i32> = (1..=100).collect();
143        let processor = BatchProcessor::new(10);
144        let results: Vec<i32> = processor.process(&data, |batch| {
145            vec![batch.iter().copied().sum::<i32>()]
146        });
147        assert_eq!(results.len(), 10);
148    }
149}