oxirs_core/
parallel.rs

1//! Parallel processing abstractions for OxiRS
2//!
3//! This module provides unified parallel processing operations across the OxiRS ecosystem.
4//! All parallel operations must go through this module - direct Rayon usage in other modules is forbidden.
5
6#[cfg(feature = "parallel")]
7pub use rayon::{
8    // Re-export commonly used items
9    current_num_threads,
10    join,
11    // Re-export all of rayon's prelude
12    prelude::*,
13    scope,
14    spawn,
15    ThreadPool,
16    ThreadPoolBuilder,
17};
18
19// Sequential fallbacks when parallel feature is disabled
20#[cfg(not(feature = "parallel"))]
21pub use self::sequential::*;
22
23/// Check if parallel processing is enabled
24pub fn is_parallel_enabled() -> bool {
25    cfg!(feature = "parallel")
26}
27
28/// Get the number of threads available for parallel operations
29pub fn num_threads() -> usize {
30    #[cfg(feature = "parallel")]
31    {
32        current_num_threads()
33    }
34    #[cfg(not(feature = "parallel"))]
35    {
36        1
37    }
38}
39
40/// Process a slice in parallel chunks
41pub fn par_chunks<T, F, R>(slice: &[T], chunk_size: usize, f: F) -> Vec<R>
42where
43    T: Sync,
44    F: Fn(&[T]) -> R + Sync + Send,
45    R: Send,
46{
47    #[cfg(feature = "parallel")]
48    {
49        slice.par_chunks(chunk_size).map(f).collect()
50    }
51    #[cfg(not(feature = "parallel"))]
52    {
53        slice.chunks(chunk_size).map(f).collect()
54    }
55}
56
57/// Execute two closures potentially in parallel
58pub fn par_join<A, B, RA, RB>(a: A, b: B) -> (RA, RB)
59where
60    A: FnOnce() -> RA + Send,
61    B: FnOnce() -> RB + Send,
62    RA: Send,
63    RB: Send,
64{
65    #[cfg(feature = "parallel")]
66    {
67        join(a, b)
68    }
69    #[cfg(not(feature = "parallel"))]
70    {
71        (a(), b())
72    }
73}
74
75/// Map a function over a slice in parallel
76pub fn map<T, R, F>(slice: &[T], f: F) -> Vec<R>
77where
78    T: Sync,
79    F: Fn(&T) -> R + Sync + Send,
80    R: Send,
81{
82    #[cfg(feature = "parallel")]
83    {
84        slice.par_iter().map(f).collect()
85    }
86    #[cfg(not(feature = "parallel"))]
87    {
88        slice.iter().map(f).collect()
89    }
90}
91
92/// Map a function over a slice in parallel (alias for map)
93pub fn parallel_map<T, R, F>(slice: &[T], f: F) -> Vec<R>
94where
95    T: Sync,
96    F: Fn(&T) -> R + Sync + Send,
97    R: Send,
98{
99    map(slice, f)
100}
101
102/// Sequential implementations for when parallel feature is disabled
103#[cfg(not(feature = "parallel"))]
104mod sequential {
105    use std::iter::Iterator;
106
107    /// Sequential iterator trait (mimics ParallelIterator)
108    pub trait ParallelIterator: Iterator + Sized {
109        fn map<F, R>(self, f: F) -> Map<Self, F>
110        where
111            F: FnMut(Self::Item) -> R,
112        {
113            Map { iter: self, f }
114        }
115
116        fn filter<F>(self, f: F) -> Filter<Self, F>
117        where
118            F: FnMut(&Self::Item) -> bool,
119        {
120            Filter { iter: self, f }
121        }
122
123        fn filter_map<F, R>(self, f: F) -> FilterMap<Self, F>
124        where
125            F: FnMut(Self::Item) -> Option<R>,
126        {
127            FilterMap { iter: self, f }
128        }
129
130        fn flat_map<F, I>(self, f: F) -> FlatMap<Self, F>
131        where
132            F: FnMut(Self::Item) -> I,
133            I: IntoIterator,
134        {
135            FlatMap { iter: self, f }
136        }
137
138        fn for_each<F>(self, mut f: F)
139        where
140            F: FnMut(Self::Item),
141        {
142            for item in self {
143                f(item);
144            }
145        }
146
147        fn collect<C>(self) -> C
148        where
149            C: FromIterator<Self::Item>,
150        {
151            C::from_iter(self)
152        }
153
154        fn fold<T, F>(self, init: T, mut f: F) -> T
155        where
156            F: FnMut(T, Self::Item) -> T,
157        {
158            let mut accum = init;
159            for item in self {
160                accum = f(accum, item);
161            }
162            accum
163        }
164
165        fn reduce<F>(mut self, f: F) -> Option<Self::Item>
166        where
167            F: FnMut(Self::Item, Self::Item) -> Self::Item,
168        {
169            self.next().map(|first| self.fold(first, f))
170        }
171
172        fn sum<S>(self) -> S
173        where
174            S: std::iter::Sum<Self::Item>,
175        {
176            self.collect::<Vec<_>>().into_iter().sum()
177        }
178
179        fn min(self) -> Option<Self::Item>
180        where
181            Self::Item: Ord,
182        {
183            self.reduce(std::cmp::min)
184        }
185
186        fn max(self) -> Option<Self::Item>
187        where
188            Self::Item: Ord,
189        {
190            self.reduce(std::cmp::max)
191        }
192
193        fn any<F>(self, mut f: F) -> bool
194        where
195            F: FnMut(Self::Item) -> bool,
196        {
197            for item in self {
198                if f(item) {
199                    return true;
200                }
201            }
202            false
203        }
204
205        fn all<F>(self, mut f: F) -> bool
206        where
207            F: FnMut(Self::Item) -> bool,
208        {
209            for item in self {
210                if !f(item) {
211                    return false;
212                }
213            }
214            true
215        }
216    }
217
218    /// Implement ParallelIterator for all iterators
219    impl<I: Iterator> ParallelIterator for I {}
220
221    /// Map iterator
222    pub struct Map<I, F> {
223        iter: I,
224        f: F,
225    }
226
227    impl<I, F, R> Iterator for Map<I, F>
228    where
229        I: Iterator,
230        F: FnMut(I::Item) -> R,
231    {
232        type Item = R;
233
234        fn next(&mut self) -> Option<Self::Item> {
235            self.iter.next().map(&mut self.f)
236        }
237    }
238
239    /// Filter iterator
240    pub struct Filter<I, F> {
241        iter: I,
242        f: F,
243    }
244
245    impl<I, F> Iterator for Filter<I, F>
246    where
247        I: Iterator,
248        F: FnMut(&I::Item) -> bool,
249    {
250        type Item = I::Item;
251
252        fn next(&mut self) -> Option<Self::Item> {
253            while let Some(item) = self.iter.next() {
254                if (self.f)(&item) {
255                    return Some(item);
256                }
257            }
258            None
259        }
260    }
261
262    /// FilterMap iterator
263    pub struct FilterMap<I, F> {
264        iter: I,
265        f: F,
266    }
267
268    impl<I, F, R> Iterator for FilterMap<I, F>
269    where
270        I: Iterator,
271        F: FnMut(I::Item) -> Option<R>,
272    {
273        type Item = R;
274
275        fn next(&mut self) -> Option<Self::Item> {
276            while let Some(item) = self.iter.next() {
277                if let Some(result) = (self.f)(item) {
278                    return Some(result);
279                }
280            }
281            None
282        }
283    }
284
285    /// FlatMap iterator
286    pub struct FlatMap<I, F> {
287        iter: I,
288        f: F,
289    }
290
291    impl<I, F, J> Iterator for FlatMap<I, F>
292    where
293        I: Iterator,
294        F: FnMut(I::Item) -> J,
295        J: IntoIterator,
296    {
297        type Item = J::Item;
298
299        fn next(&mut self) -> Option<Self::Item> {
300            // Simplified implementation - in real code this would need to handle
301            // the inner iterator state properly
302            None
303        }
304    }
305
306    /// Sequential parallel iterator extension trait
307    pub trait IntoParallelIterator {
308        type Item;
309        type Iter: ParallelIterator<Item = Self::Item>;
310
311        fn into_par_iter(self) -> Self::Iter;
312    }
313
314    /// Implementation for ranges
315    impl IntoParallelIterator for std::ops::Range<usize> {
316        type Item = usize;
317        type Iter = std::ops::Range<usize>;
318
319        fn into_par_iter(self) -> Self::Iter {
320            self
321        }
322    }
323
324    /// Implementation for slices
325    impl<'a, T> IntoParallelIterator for &'a [T] {
326        type Item = &'a T;
327        type Iter = std::slice::Iter<'a, T>;
328
329        fn into_par_iter(self) -> Self::Iter {
330            self.iter()
331        }
332    }
333
334    /// Implementation for mutable slices
335    impl<'a, T> IntoParallelIterator for &'a mut [T] {
336        type Item = &'a mut T;
337        type Iter = std::slice::IterMut<'a, T>;
338
339        fn into_par_iter(self) -> Self::Iter {
340            self.iter_mut()
341        }
342    }
343
344    /// Implementation for Vec
345    impl<T> IntoParallelIterator for Vec<T> {
346        type Item = T;
347        type Iter = std::vec::IntoIter<T>;
348
349        fn into_par_iter(self) -> Self::Iter {
350            self.into_iter()
351        }
352    }
353
354    /// Extension trait for slices
355    pub trait ParallelSlice<T> {
356        fn par_chunks(&self, chunk_size: usize) -> std::slice::Chunks<'_, T>;
357        fn par_chunks_mut(&mut self, chunk_size: usize) -> std::slice::ChunksMut<'_, T>;
358    }
359
360    impl<T> ParallelSlice<T> for [T] {
361        fn par_chunks(&self, chunk_size: usize) -> std::slice::Chunks<'_, T> {
362            self.chunks(chunk_size)
363        }
364
365        fn par_chunks_mut(&mut self, chunk_size: usize) -> std::slice::ChunksMut<'_, T> {
366            self.chunks_mut(chunk_size)
367        }
368    }
369
370    /// Sequential scope (no-op)
371    pub fn scope<'scope, F, R>(f: F) -> R
372    where
373        F: FnOnce() -> R,
374    {
375        f()
376    }
377
378    /// Sequential join
379    pub fn join<A, B, RA, RB>(a: A, b: B) -> (RA, RB)
380    where
381        A: FnOnce() -> RA,
382        B: FnOnce() -> RB,
383    {
384        (a(), b())
385    }
386
387    /// Sequential spawn (just executes immediately)
388    pub fn spawn<F, R>(f: F) -> R
389    where
390        F: FnOnce() -> R,
391    {
392        f()
393    }
394
395    /// Get current number of threads (always 1 in sequential mode)
396    pub fn current_num_threads() -> usize {
397        1
398    }
399}