ruvector_scipix/optimize/
parallel.rs

1//! Parallel processing utilities for OCR pipeline
2//!
3//! Provides parallel image preprocessing, batch OCR, and pipelined execution.
4
5use rayon::prelude::*;
6use image::DynamicImage;
7use std::sync::Arc;
8use tokio::sync::Semaphore;
9
10use super::parallel_enabled;
11
12/// Parallel preprocessing of multiple images
13pub fn parallel_preprocess<F>(images: Vec<DynamicImage>, preprocess_fn: F) -> Vec<DynamicImage>
14where
15    F: Fn(DynamicImage) -> DynamicImage + Sync + Send,
16{
17    if !parallel_enabled() {
18        return images.into_iter().map(preprocess_fn).collect();
19    }
20
21    images.into_par_iter().map(preprocess_fn).collect()
22}
23
24/// Parallel processing with error handling
25pub fn parallel_preprocess_result<F, E>(
26    images: Vec<DynamicImage>,
27    preprocess_fn: F
28) -> Vec<std::result::Result<DynamicImage, E>>
29where
30    F: Fn(DynamicImage) -> std::result::Result<DynamicImage, E> + Sync + Send,
31    E: Send,
32{
33    if !parallel_enabled() {
34        return images.into_iter().map(preprocess_fn).collect();
35    }
36
37    images.into_par_iter().map(preprocess_fn).collect()
38}
39
40/// Pipeline parallel execution for OCR workflow
41///
42/// Executes stages in a pipeline: preprocess | detect | recognize
43/// Each stage can start processing the next item while previous stages
44/// continue with subsequent items.
45pub struct PipelineExecutor<T, U, V> {
46    stage1: Arc<dyn Fn(T) -> U + Send + Sync>,
47    stage2: Arc<dyn Fn(U) -> V + Send + Sync>,
48}
49
50impl<T, U, V> PipelineExecutor<T, U, V>
51where
52    T: Send,
53    U: Send,
54    V: Send,
55{
56    pub fn new<F1, F2>(stage1: F1, stage2: F2) -> Self
57    where
58        F1: Fn(T) -> U + Send + Sync + 'static,
59        F2: Fn(U) -> V + Send + Sync + 'static,
60    {
61        Self {
62            stage1: Arc::new(stage1),
63            stage2: Arc::new(stage2),
64        }
65    }
66
67    /// Execute pipeline on multiple inputs
68    pub fn execute_batch(&self, inputs: Vec<T>) -> Vec<V> {
69        if !parallel_enabled() {
70            return inputs.into_iter()
71                .map(|input| {
72                    let stage1_out = (self.stage1)(input);
73                    (self.stage2)(stage1_out)
74                })
75                .collect();
76        }
77
78        inputs.into_par_iter()
79            .map(|input| {
80                let stage1_out = (self.stage1)(input);
81                (self.stage2)(stage1_out)
82            })
83            .collect()
84    }
85}
86
87/// Three-stage pipeline executor
88pub struct Pipeline3<T, U, V, W> {
89    stage1: Arc<dyn Fn(T) -> U + Send + Sync>,
90    stage2: Arc<dyn Fn(U) -> V + Send + Sync>,
91    stage3: Arc<dyn Fn(V) -> W + Send + Sync>,
92}
93
94impl<T, U, V, W> Pipeline3<T, U, V, W>
95where
96    T: Send,
97    U: Send,
98    V: Send,
99    W: Send,
100{
101    pub fn new<F1, F2, F3>(stage1: F1, stage2: F2, stage3: F3) -> Self
102    where
103        F1: Fn(T) -> U + Send + Sync + 'static,
104        F2: Fn(U) -> V + Send + Sync + 'static,
105        F3: Fn(V) -> W + Send + Sync + 'static,
106    {
107        Self {
108            stage1: Arc::new(stage1),
109            stage2: Arc::new(stage2),
110            stage3: Arc::new(stage3),
111        }
112    }
113
114    pub fn execute_batch(&self, inputs: Vec<T>) -> Vec<W> {
115        if !parallel_enabled() {
116            return inputs.into_iter()
117                .map(|input| {
118                    let out1 = (self.stage1)(input);
119                    let out2 = (self.stage2)(out1);
120                    (self.stage3)(out2)
121                })
122                .collect();
123        }
124
125        inputs.into_par_iter()
126            .map(|input| {
127                let out1 = (self.stage1)(input);
128                let out2 = (self.stage2)(out1);
129                (self.stage3)(out2)
130            })
131            .collect()
132    }
133}
134
135/// Parallel map with configurable chunk size
136pub fn parallel_map_chunked<T, U, F>(
137    items: Vec<T>,
138    chunk_size: usize,
139    map_fn: F,
140) -> Vec<U>
141where
142    T: Send,
143    U: Send,
144    F: Fn(T) -> U + Sync + Send,
145{
146    if !parallel_enabled() {
147        return items.into_iter().map(map_fn).collect();
148    }
149
150    items
151        .into_par_iter()
152        .with_min_len(chunk_size)
153        .map(map_fn)
154        .collect()
155}
156
157/// Async parallel executor with concurrency limit
158pub struct AsyncParallelExecutor {
159    semaphore: Arc<Semaphore>,
160}
161
162impl AsyncParallelExecutor {
163    /// Create executor with maximum concurrency limit
164    pub fn new(max_concurrent: usize) -> Self {
165        Self {
166            semaphore: Arc::new(Semaphore::new(max_concurrent)),
167        }
168    }
169
170    /// Execute async tasks with concurrency limit
171    pub async fn execute<T, F, Fut>(&self, tasks: Vec<T>, executor: F) -> Vec<Fut::Output>
172    where
173        T: Send + 'static,
174        F: Fn(T) -> Fut + Send + Sync + Clone + 'static,
175        Fut: std::future::Future + Send + 'static,
176        Fut::Output: Send + 'static,
177    {
178        let mut handles = Vec::new();
179
180        for task in tasks {
181            let permit = self.semaphore.clone().acquire_owned().await.unwrap();
182            let executor = executor.clone();
183
184            let handle = tokio::spawn(async move {
185                let result = executor(task).await;
186                drop(permit); // Release semaphore
187                result
188            });
189
190            handles.push(handle);
191        }
192
193        // Wait for all tasks to complete
194        let mut results = Vec::new();
195        for handle in handles {
196            if let Ok(result) = handle.await {
197                results.push(result);
198            }
199        }
200
201        results
202    }
203
204    /// Execute with error handling
205    pub async fn execute_result<T, F, Fut, R, E>(
206        &self,
207        tasks: Vec<T>,
208        executor: F,
209    ) -> Vec<std::result::Result<R, E>>
210    where
211        T: Send + 'static,
212        F: Fn(T) -> Fut + Send + Sync + Clone + 'static,
213        Fut: std::future::Future<Output = std::result::Result<R, E>> + Send + 'static,
214        R: Send + 'static,
215        E: Send + 'static,
216    {
217        let mut handles = Vec::new();
218
219        for task in tasks {
220            let permit = self.semaphore.clone().acquire_owned().await.unwrap();
221            let executor = executor.clone();
222
223            let handle = tokio::spawn(async move {
224                let result = executor(task).await;
225                drop(permit);
226                result
227            });
228
229            handles.push(handle);
230        }
231
232        let mut results = Vec::new();
233        for handle in handles {
234            match handle.await {
235                Ok(result) => results.push(result),
236                Err(_) => continue, // Task panicked
237            }
238        }
239
240        results
241    }
242}
243
244/// Work-stealing parallel iterator for unbalanced workloads
245pub fn parallel_unbalanced<T, U, F>(items: Vec<T>, map_fn: F) -> Vec<U>
246where
247    T: Send,
248    U: Send,
249    F: Fn(T) -> U + Sync + Send,
250{
251    if !parallel_enabled() {
252        return items.into_iter().map(map_fn).collect();
253    }
254
255    // Use adaptive strategy for unbalanced work
256    items
257        .into_par_iter()
258        .with_min_len(1) // Allow fine-grained work stealing
259        .map(map_fn)
260        .collect()
261}
262
263/// Get optimal thread count for current system
264pub fn optimal_thread_count() -> usize {
265    rayon::current_num_threads()
266}
267
268/// Set global thread pool size
269pub fn set_thread_count(threads: usize) {
270    rayon::ThreadPoolBuilder::new()
271        .num_threads(threads)
272        .build_global()
273        .ok();
274}
275
276#[cfg(test)]
277mod tests {
278    use super::*;
279
280    #[test]
281    fn test_parallel_map() {
282        let data: Vec<i32> = (0..100).collect();
283        let result = parallel_map_chunked(data, 10, |x| x * 2);
284
285        assert_eq!(result.len(), 100);
286        assert_eq!(result[0], 0);
287        assert_eq!(result[50], 100);
288        assert_eq!(result[99], 198);
289    }
290
291    #[test]
292    fn test_pipeline_executor() {
293        let pipeline = PipelineExecutor::new(
294            |x: i32| x + 1,
295            |x: i32| x * 2,
296        );
297
298        let inputs = vec![1, 2, 3, 4, 5];
299        let results = pipeline.execute_batch(inputs);
300
301        assert_eq!(results, vec![4, 6, 8, 10, 12]);
302    }
303
304    #[test]
305    fn test_pipeline3() {
306        let pipeline = Pipeline3::new(
307            |x: i32| x + 1,
308            |x: i32| x * 2,
309            |x: i32| x - 1,
310        );
311
312        let inputs = vec![1, 2, 3];
313        let results = pipeline.execute_batch(inputs);
314
315        // (1+1)*2-1 = 3, (2+1)*2-1 = 5, (3+1)*2-1 = 7
316        assert_eq!(results, vec![3, 5, 7]);
317    }
318
319    #[tokio::test]
320    async fn test_async_executor() {
321        let executor = AsyncParallelExecutor::new(2);
322
323        let tasks = vec![1, 2, 3, 4, 5];
324        let results = executor.execute(tasks, |x| async move {
325            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
326            x * 2
327        }).await;
328
329        assert_eq!(results.len(), 5);
330        assert!(results.contains(&2));
331        assert!(results.contains(&10));
332    }
333
334    #[test]
335    fn test_optimal_threads() {
336        let threads = optimal_thread_count();
337        assert!(threads > 0);
338        assert!(threads <= num_cpus::get());
339    }
340}