ruvector_scipix/optimize/
parallel.rs1use rayon::prelude::*;
6use image::DynamicImage;
7use std::sync::Arc;
8use tokio::sync::Semaphore;
9
10use super::parallel_enabled;
11
12pub fn parallel_preprocess<F>(images: Vec<DynamicImage>, preprocess_fn: F) -> Vec<DynamicImage>
14where
15 F: Fn(DynamicImage) -> DynamicImage + Sync + Send,
16{
17 if !parallel_enabled() {
18 return images.into_iter().map(preprocess_fn).collect();
19 }
20
21 images.into_par_iter().map(preprocess_fn).collect()
22}
23
24pub fn parallel_preprocess_result<F, E>(
26 images: Vec<DynamicImage>,
27 preprocess_fn: F
28) -> Vec<std::result::Result<DynamicImage, E>>
29where
30 F: Fn(DynamicImage) -> std::result::Result<DynamicImage, E> + Sync + Send,
31 E: Send,
32{
33 if !parallel_enabled() {
34 return images.into_iter().map(preprocess_fn).collect();
35 }
36
37 images.into_par_iter().map(preprocess_fn).collect()
38}
39
40pub struct PipelineExecutor<T, U, V> {
46 stage1: Arc<dyn Fn(T) -> U + Send + Sync>,
47 stage2: Arc<dyn Fn(U) -> V + Send + Sync>,
48}
49
50impl<T, U, V> PipelineExecutor<T, U, V>
51where
52 T: Send,
53 U: Send,
54 V: Send,
55{
56 pub fn new<F1, F2>(stage1: F1, stage2: F2) -> Self
57 where
58 F1: Fn(T) -> U + Send + Sync + 'static,
59 F2: Fn(U) -> V + Send + Sync + 'static,
60 {
61 Self {
62 stage1: Arc::new(stage1),
63 stage2: Arc::new(stage2),
64 }
65 }
66
67 pub fn execute_batch(&self, inputs: Vec<T>) -> Vec<V> {
69 if !parallel_enabled() {
70 return inputs.into_iter()
71 .map(|input| {
72 let stage1_out = (self.stage1)(input);
73 (self.stage2)(stage1_out)
74 })
75 .collect();
76 }
77
78 inputs.into_par_iter()
79 .map(|input| {
80 let stage1_out = (self.stage1)(input);
81 (self.stage2)(stage1_out)
82 })
83 .collect()
84 }
85}
86
87pub struct Pipeline3<T, U, V, W> {
89 stage1: Arc<dyn Fn(T) -> U + Send + Sync>,
90 stage2: Arc<dyn Fn(U) -> V + Send + Sync>,
91 stage3: Arc<dyn Fn(V) -> W + Send + Sync>,
92}
93
94impl<T, U, V, W> Pipeline3<T, U, V, W>
95where
96 T: Send,
97 U: Send,
98 V: Send,
99 W: Send,
100{
101 pub fn new<F1, F2, F3>(stage1: F1, stage2: F2, stage3: F3) -> Self
102 where
103 F1: Fn(T) -> U + Send + Sync + 'static,
104 F2: Fn(U) -> V + Send + Sync + 'static,
105 F3: Fn(V) -> W + Send + Sync + 'static,
106 {
107 Self {
108 stage1: Arc::new(stage1),
109 stage2: Arc::new(stage2),
110 stage3: Arc::new(stage3),
111 }
112 }
113
114 pub fn execute_batch(&self, inputs: Vec<T>) -> Vec<W> {
115 if !parallel_enabled() {
116 return inputs.into_iter()
117 .map(|input| {
118 let out1 = (self.stage1)(input);
119 let out2 = (self.stage2)(out1);
120 (self.stage3)(out2)
121 })
122 .collect();
123 }
124
125 inputs.into_par_iter()
126 .map(|input| {
127 let out1 = (self.stage1)(input);
128 let out2 = (self.stage2)(out1);
129 (self.stage3)(out2)
130 })
131 .collect()
132 }
133}
134
135pub fn parallel_map_chunked<T, U, F>(
137 items: Vec<T>,
138 chunk_size: usize,
139 map_fn: F,
140) -> Vec<U>
141where
142 T: Send,
143 U: Send,
144 F: Fn(T) -> U + Sync + Send,
145{
146 if !parallel_enabled() {
147 return items.into_iter().map(map_fn).collect();
148 }
149
150 items
151 .into_par_iter()
152 .with_min_len(chunk_size)
153 .map(map_fn)
154 .collect()
155}
156
157pub struct AsyncParallelExecutor {
159 semaphore: Arc<Semaphore>,
160}
161
162impl AsyncParallelExecutor {
163 pub fn new(max_concurrent: usize) -> Self {
165 Self {
166 semaphore: Arc::new(Semaphore::new(max_concurrent)),
167 }
168 }
169
170 pub async fn execute<T, F, Fut>(&self, tasks: Vec<T>, executor: F) -> Vec<Fut::Output>
172 where
173 T: Send + 'static,
174 F: Fn(T) -> Fut + Send + Sync + Clone + 'static,
175 Fut: std::future::Future + Send + 'static,
176 Fut::Output: Send + 'static,
177 {
178 let mut handles = Vec::new();
179
180 for task in tasks {
181 let permit = self.semaphore.clone().acquire_owned().await.unwrap();
182 let executor = executor.clone();
183
184 let handle = tokio::spawn(async move {
185 let result = executor(task).await;
186 drop(permit); result
188 });
189
190 handles.push(handle);
191 }
192
193 let mut results = Vec::new();
195 for handle in handles {
196 if let Ok(result) = handle.await {
197 results.push(result);
198 }
199 }
200
201 results
202 }
203
204 pub async fn execute_result<T, F, Fut, R, E>(
206 &self,
207 tasks: Vec<T>,
208 executor: F,
209 ) -> Vec<std::result::Result<R, E>>
210 where
211 T: Send + 'static,
212 F: Fn(T) -> Fut + Send + Sync + Clone + 'static,
213 Fut: std::future::Future<Output = std::result::Result<R, E>> + Send + 'static,
214 R: Send + 'static,
215 E: Send + 'static,
216 {
217 let mut handles = Vec::new();
218
219 for task in tasks {
220 let permit = self.semaphore.clone().acquire_owned().await.unwrap();
221 let executor = executor.clone();
222
223 let handle = tokio::spawn(async move {
224 let result = executor(task).await;
225 drop(permit);
226 result
227 });
228
229 handles.push(handle);
230 }
231
232 let mut results = Vec::new();
233 for handle in handles {
234 match handle.await {
235 Ok(result) => results.push(result),
236 Err(_) => continue, }
238 }
239
240 results
241 }
242}
243
244pub fn parallel_unbalanced<T, U, F>(items: Vec<T>, map_fn: F) -> Vec<U>
246where
247 T: Send,
248 U: Send,
249 F: Fn(T) -> U + Sync + Send,
250{
251 if !parallel_enabled() {
252 return items.into_iter().map(map_fn).collect();
253 }
254
255 items
257 .into_par_iter()
258 .with_min_len(1) .map(map_fn)
260 .collect()
261}
262
263pub fn optimal_thread_count() -> usize {
265 rayon::current_num_threads()
266}
267
268pub fn set_thread_count(threads: usize) {
270 rayon::ThreadPoolBuilder::new()
271 .num_threads(threads)
272 .build_global()
273 .ok();
274}
275
276#[cfg(test)]
277mod tests {
278 use super::*;
279
280 #[test]
281 fn test_parallel_map() {
282 let data: Vec<i32> = (0..100).collect();
283 let result = parallel_map_chunked(data, 10, |x| x * 2);
284
285 assert_eq!(result.len(), 100);
286 assert_eq!(result[0], 0);
287 assert_eq!(result[50], 100);
288 assert_eq!(result[99], 198);
289 }
290
291 #[test]
292 fn test_pipeline_executor() {
293 let pipeline = PipelineExecutor::new(
294 |x: i32| x + 1,
295 |x: i32| x * 2,
296 );
297
298 let inputs = vec![1, 2, 3, 4, 5];
299 let results = pipeline.execute_batch(inputs);
300
301 assert_eq!(results, vec![4, 6, 8, 10, 12]);
302 }
303
304 #[test]
305 fn test_pipeline3() {
306 let pipeline = Pipeline3::new(
307 |x: i32| x + 1,
308 |x: i32| x * 2,
309 |x: i32| x - 1,
310 );
311
312 let inputs = vec![1, 2, 3];
313 let results = pipeline.execute_batch(inputs);
314
315 assert_eq!(results, vec![3, 5, 7]);
317 }
318
319 #[tokio::test]
320 async fn test_async_executor() {
321 let executor = AsyncParallelExecutor::new(2);
322
323 let tasks = vec![1, 2, 3, 4, 5];
324 let results = executor.execute(tasks, |x| async move {
325 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
326 x * 2
327 }).await;
328
329 assert_eq!(results.len(), 5);
330 assert!(results.contains(&2));
331 assert!(results.contains(&10));
332 }
333
334 #[test]
335 fn test_optimal_threads() {
336 let threads = optimal_thread_count();
337 assert!(threads > 0);
338 assert!(threads <= num_cpus::get());
339 }
340}