scirs2_core/
parallel_ops.rs

1//! Parallel operations abstraction layer
2//!
3//! This module provides a unified interface for parallel operations across the SciRS2 project.
4//! It wraps Rayon functionality when the `parallel` feature is enabled, and provides
5//! sequential fallbacks when it's disabled.
6//!
7//! # Usage
8//!
9//! ```rust
10//! use scirs2_core::parallel_ops::*;
11//!
12//! // Works with or without the parallel feature
13//! let results: Vec<i32> = (0..1000)
14//!     .into_par_iter()
15//!     .map(|x| x * x)
16//!     .collect();
17//! ```
18
19// When parallel is enabled, directly re-export Rayon's prelude
20#[cfg(feature = "parallel")]
21pub use rayon::prelude::*;
22
23// Re-export ThreadPoolBuilder and ThreadPool when parallel is enabled
24#[cfg(feature = "parallel")]
25pub use rayon::{ThreadPool, ThreadPoolBuilder};
26
27// When parallel is disabled, provide sequential fallbacks
28#[cfg(not(feature = "parallel"))]
29mod sequential_fallbacks {
30    use std::iter;
31
32    /// Sequential fallback for IntoParallelIterator
33    pub trait IntoParallelIterator: Sized {
34        type Iter: Iterator<Item = Self::Item>;
35        type Item;
36
37        fn into_par_iter(self) -> Self::Iter;
38    }
39
40    /// Sequential fallback for ParallelIterator
41    pub trait ParallelIterator: Iterator + Sized {
42        fn map<F, R>(self, f: F) -> iter::Map<Self, F>
43        where
44            F: FnMut(Self::Item) -> R,
45        {
46            Iterator::map(self, f)
47        }
48
49        fn for_each<F>(self, f: F)
50        where
51            F: FnMut(Self::Item),
52        {
53            Iterator::for_each(self, f)
54        }
55
56        fn try_for_each<F, E>(self, f: F) -> Result<(), E>
57        where
58            F: FnMut(Self::Item) -> Result<(), E>,
59        {
60            Iterator::try_for_each(self, f)
61        }
62
63        fn filter<P>(self, predicate: P) -> iter::Filter<Self, P>
64        where
65            P: FnMut(&Self::Item) -> bool,
66        {
67            Iterator::filter(self, predicate)
68        }
69
70        fn collect<C>(self) -> C
71        where
72            C: FromIterator<Self::Item>,
73        {
74            Iterator::collect(self)
75        }
76
77        fn fold<T, F>(self, init: T, f: F) -> T
78        where
79            F: FnMut(T, Self::Item) -> T,
80        {
81            Iterator::fold(self, init, f)
82        }
83
84        fn reduce<F>(self, f: F) -> Option<Self::Item>
85        where
86            F: FnMut(Self::Item, Self::Item) -> Self::Item,
87        {
88            Iterator::reduce(self, f)
89        }
90
91        fn count(self) -> usize {
92            Iterator::count(self)
93        }
94
95        fn sum<S>(self) -> S
96        where
97            S: std::iter::Sum<Self::Item>,
98        {
99            Iterator::sum(self)
100        }
101
102        fn min(self) -> Option<Self::Item>
103        where
104            Self::Item: Ord,
105        {
106            Iterator::min(self)
107        }
108
109        fn max(self) -> Option<Self::Item>
110        where
111            Self::Item: Ord,
112        {
113            Iterator::max(self)
114        }
115    }
116
117    /// Sequential fallback for ParallelBridge
118    pub trait ParallelBridge: Iterator + Sized {
119        fn par_bridge(self) -> Self {
120            self
121        }
122    }
123
124    // Implement IntoParallelIterator for common types
125    impl IntoParallelIterator for std::ops::Range<usize> {
126        type Item = usize;
127        type Iter = std::ops::Range<usize>;
128
129        fn into_par_iter(self) -> Self::Iter {
130            self
131        }
132    }
133
134    impl<T> IntoParallelIterator for Vec<T> {
135        type Item = T;
136        type Iter = std::vec::IntoIter<T>;
137
138        fn into_par_iter(self) -> Self::Iter {
139            self.into_iter()
140        }
141    }
142
143    impl<'a, T> IntoParallelIterator for &'a [T] {
144        type Item = &'a T;
145        type Iter = std::slice::Iter<'a, T>;
146
147        fn into_par_iter(self) -> Self::Iter {
148            self.iter()
149        }
150    }
151
152    impl<'a, T> IntoParallelIterator for &'a mut [T] {
153        type Item = &'a mut T;
154        type Iter = std::slice::IterMut<'a, T>;
155
156        fn into_par_iter(self) -> Self::Iter {
157            self.iter_mut()
158        }
159    }
160
161    // Implement ParallelIterator for all iterators
162    impl<T: Iterator> ParallelIterator for T {}
163
164    // Implement ParallelBridge for all iterators
165    impl<T: Iterator> ParallelBridge for T {}
166
167    /// Sequential fallback for parallel scope
168    pub fn scope<'scope, F, R>(f: F) -> R
169    where
170        F: FnOnce(&()) -> R,
171    {
172        f(&())
173    }
174
175    /// Sequential fallback for parallel join
176    pub fn join<A, B, RA, RB>(a: A, b: B) -> (RA, RB)
177    where
178        A: FnOnce() -> RA,
179        B: FnOnce() -> RB,
180    {
181        (a(), b())
182    }
183
184    // Re-export traits
185    pub use self::{IntoParallelIterator, ParallelBridge, ParallelIterator};
186}
187
188// Re-export sequential fallbacks when parallel is disabled
189#[cfg(not(feature = "parallel"))]
190pub use sequential_fallbacks::*;
191
192/// Helper function to create a parallel iterator from a range
193#[allow(dead_code)]
194pub fn par_range(start: usize, end: usize) -> impl ParallelIterator<Item = usize> {
195    (start..end).into_par_iter()
196}
197
198/// Helper function for parallel chunks processing
199#[cfg(feature = "parallel")]
200#[allow(dead_code)]
201pub fn par_chunks<T: Sync>(slice: &[T], chunksize: usize) -> rayon::slice::Chunks<'_, T> {
202    slice.par_chunks(chunksize)
203}
204
205/// Sequential fallback for par_chunks
206#[cfg(not(feature = "parallel"))]
207#[allow(dead_code)]
208pub fn par_chunks<T>(_slice: &[T], chunksize: usize) -> std::slice::Chunks<'_, T> {
209    slice.chunks(chunk_size)
210}
211
212/// Helper function for parallel mutable chunks processing
213#[cfg(feature = "parallel")]
214#[allow(dead_code)]
215pub fn par_chunks_mut<T: Send>(
216    slice: &mut [T],
217    chunk_size: usize,
218) -> rayon::slice::ChunksMut<'_, T> {
219    slice.par_chunks_mut(chunk_size)
220}
221
222/// Sequential fallback for par_chunks_mut
223#[cfg(not(feature = "parallel"))]
224#[allow(dead_code)]
225pub fn par_chunks_mut<T>(_slice: &mut [T], chunksize: usize) -> std::slice::ChunksMut<'_, T> {
226    slice.chunks_mut(chunk_size)
227}
228
229/// Simple parallel map function that returns Result type
230#[cfg(feature = "parallel")]
231#[allow(dead_code)]
232pub fn parallel_map<T, U, F>(items: &[T], f: F) -> Vec<U>
233where
234    T: Sync,
235    U: Send,
236    F: Fn(&T) -> U + Sync + Send,
237{
238    use rayon::prelude::*;
239    items.par_iter().map(f).collect()
240}
241
242/// Sequential fallback for parallel_map
243#[cfg(not(feature = "parallel"))]
244#[allow(dead_code)]
245pub fn parallel_map<T, U, F>(items: &[T], f: F) -> Vec<U>
246where
247    F: Fn(&T) -> U,
248{
249    items.iter().map(f).collect()
250}
251
252/// Parallel map function that handles Results
253#[cfg(feature = "parallel")]
254#[allow(dead_code)]
255pub fn parallel_map_result<T, U, E, F>(items: &[T], f: F) -> Result<Vec<U>, E>
256where
257    T: Sync,
258    U: Send,
259    E: Send,
260    F: Fn(&T) -> Result<U, E> + Sync + Send,
261{
262    use rayon::prelude::*;
263    items.par_iter().map(f).collect()
264}
265
266/// Sequential fallback for parallel_map_result
267#[cfg(not(feature = "parallel"))]
268#[allow(dead_code)]
269pub fn parallel_map_result<T, U, E, F>(items: &[T], f: F) -> Result<Vec<U>, E>
270where
271    F: Fn(&T) -> Result<U, E>,
272{
273    items.iter().map(f).collect()
274}
275
276/// Check if parallel processing is available
277#[allow(dead_code)]
278pub fn is_parallel_enabled() -> bool {
279    cfg!(feature = "parallel")
280}
281
282/// Get the number of threads that would be used for parallel operations
283#[cfg(feature = "parallel")]
284#[allow(dead_code)]
285pub fn num_threads() -> usize {
286    rayon::current_num_threads()
287}
288
289/// Sequential fallback returns 1
290#[cfg(not(feature = "parallel"))]
291#[allow(dead_code)]
292pub fn num_threads() -> usize {
293    1
294}
295
296/// Alias for rayon compatibility - returns the number of threads in the current pool
297///
298/// This is an alias for `num_threads()` that matches rayon's API exactly.
299/// Useful for code migrating from rayon to scirs2_core.
300#[cfg(feature = "parallel")]
301#[allow(dead_code)]
302pub fn current_num_threads() -> usize {
303    rayon::current_num_threads()
304}
305
306/// Sequential fallback for current_num_threads
307#[cfg(not(feature = "parallel"))]
308#[allow(dead_code)]
309pub fn current_num_threads() -> usize {
310    1
311}
312
313/// Alias for num_threads() for compatibility
314#[allow(dead_code)]
315pub fn get_num_threads() -> usize {
316    num_threads()
317}
318
319/// Set the number of threads for parallel operations
320#[cfg(feature = "parallel")]
321#[allow(dead_code)]
322pub fn set_num_threads(numthreads: usize) {
323    rayon::ThreadPoolBuilder::new()
324        .num_threads(numthreads)
325        .build_global()
326        .expect("Failed to initialize thread pool");
327}
328
329/// Sequential fallback does nothing
330#[cfg(not(feature = "parallel"))]
331#[allow(dead_code)]
332pub fn threads(_: usize) {
333    // No-op for sequential fallback
334}
335
336/// Parallel-aware scope helper
337#[cfg(feature = "parallel")]
338pub use rayon::scope as par_scope;
339
340/// Sequential fallback for par_scope
341#[cfg(not(feature = "parallel"))]
342pub use sequential_fallbacks::scope as par_scope;
343
344/// Parallel join helper
345#[cfg(feature = "parallel")]
346pub use rayon::join as par_join;
347
348/// Sequential fallback for par_join
349#[cfg(not(feature = "parallel"))]
350pub use sequential_fallbacks::join as par_join;
351
352/// Parallel map operation on array data with chunking
353///
354/// This function processes array data in parallel chunks using the provided mapper function.
355///
356/// # Arguments
357///
358/// * `data` - The data to process (e.g., array view)
359/// * `chunk_size` - Size of each chunk for parallel processing
360/// * `mapper` - Function that processes a chunk and returns a result
361/// * `reducer` - Function that combines two results into one
362///
363/// # Returns
364///
365/// The final reduced result
366#[cfg(feature = "parallel")]
367#[allow(dead_code)]
368pub fn parallel_map_reduce<D, T, M, Red>(data: D, mapper: M, reducer: Red) -> T
369where
370    D: Send + Sync,
371    T: Send + Clone,
372    M: Fn(D) -> T + Sync + Send + Clone,
373    Red: Fn(T, T) -> T + Sync + Send,
374{
375    // For simplicity, we'll just apply the mapper once since we can't easily chunk arbitrary data
376    // In practice, this would need to be specialized for specific data types
377    mapper(data)
378}
379
380/// Sequential fallback for parallel_map_reduce
381#[cfg(not(feature = "parallel"))]
382#[allow(dead_code)]
383pub fn parallel_map_reduce<D, T, M, Red>(data: D, chunksize: usize, mapper: M, reducer: Red) -> T
384where
385    T: Clone,
386    M: Fn(D) -> T,
387    Red: Fn(T, T) -> T,
388{
389    mapper(data)
390}
391
392/// Parallel map-collect operation on a collection
393///
394/// This function maps over a collection in parallel and collects the results.
395///
396/// # Arguments
397///
398/// * `items` - The items to process
399/// * `mapper` - Function that processes each item
400///
401/// # Returns
402///
403/// Vector of mapped results
404#[cfg(feature = "parallel")]
405#[allow(dead_code)]
406pub fn parallel_map_collect<I, T, U, M>(items: I, mapper: M) -> Vec<U>
407where
408    I: IntoParallelIterator<Item = T>,
409    T: Send,
410    U: Send,
411    M: Fn(T) -> U + Sync + Send,
412{
413    use rayon::prelude::*;
414    items.into_par_iter().map(mapper).collect()
415}
416
417/// Sequential fallback for parallel_map_collect
418#[cfg(not(feature = "parallel"))]
419#[allow(dead_code)]
420pub fn parallel_map_collect<I, T, U, M>(items: I, mapper: M) -> Vec<U>
421where
422    I: IntoIterator<Item = T>,
423    M: Fn(T) -> U,
424{
425    items.into_iter().map(mapper).collect()
426}
427
428/// Parallel map-reduce operation on indexed chunks
429///
430/// This function takes a range of indices, splits them into chunks of the specified size,
431/// processes each chunk in parallel using the mapper function, and then reduces the results
432/// using the reducer function.
433///
434/// # Arguments
435///
436/// * `range` - The range of indices to process (e.g., 0..n)
437/// * `chunk_size` - Size of each chunk for parallel processing
438/// * `mapper` - Function that processes a slice of indices and returns a result
439/// * `reducer` - Function that combines two results into one
440///
441/// # Returns
442///
443/// The final reduced result
444#[cfg(feature = "parallel")]
445#[allow(dead_code)]
446pub fn parallel_map_reduce_indexed<R, T, M, Red>(
447    range: R,
448    chunk_size: usize,
449    mapper: M,
450    reducer: Red,
451) -> T
452where
453    R: Iterator<Item = usize> + Send,
454    T: Send + Clone,
455    M: Fn(&[usize]) -> T + Sync + Send,
456    Red: Fn(T, T) -> T + Sync + Send,
457{
458    use rayon::prelude::*;
459
460    let indices: Vec<usize> = range.collect();
461
462    indices
463        .chunks(chunk_size)
464        .collect::<Vec<_>>()
465        .into_par_iter()
466        .map(&mapper)
467        .reduce_with(reducer)
468        .unwrap_or_else(|| mapper(&[]))
469}
470
471/// Sequential fallback for parallel_map_reduce_indexed
472#[cfg(not(feature = "parallel"))]
473#[allow(dead_code)]
474pub fn parallel_map_reduce_indexed<R, T, M, Red>(
475    range: R,
476    chunk_size: usize,
477    mapper: M,
478    reducer: Red,
479) -> T
480where
481    R: Iterator<Item = usize>,
482    T: Clone,
483    M: Fn(&[usize]) -> T,
484    Red: Fn(T, T) -> T,
485{
486    let indices: Vec<usize> = range.collect();
487
488    let mut results = Vec::new();
489    for chunk in indices.chunks(chunk_size) {
490        results.push(mapper(chunk));
491    }
492
493    results
494        .into_iter()
495        .reduce(reducer)
496        .unwrap_or_else(|| mapper(&[]))
497}
498
499#[cfg(test)]
500#[allow(clippy::items_after_test_module)]
501mod tests {
502    use super::*;
503
504    #[test]
505    fn test_par_range() {
506        let result: Vec<usize> = par_range(0, 10).collect();
507        assert_eq!(result, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
508    }
509
510    #[test]
511    fn test_par_map() {
512        let data = vec![1, 2, 3, 4, 5];
513        let result: Vec<i32> = data.into_par_iter().map(|x| x * 2).collect();
514        assert_eq!(result, vec![2, 4, 6, 8, 10]);
515    }
516
517    #[test]
518    fn test_par_filter() {
519        let data = vec![1, 2, 3, 4, 5, 6];
520        let result: Vec<i32> = data.into_par_iter().filter(|x| x % 2 == 0).collect();
521        assert_eq!(result, vec![2, 4, 6]);
522    }
523
524    #[test]
525    fn test_par_try_for_each() {
526        let data = vec![1, 2, 3, 4, 5];
527        let result =
528            data.into_par_iter()
529                .try_for_each(|x| if x < 6 { Ok(()) } else { Err("Too large") });
530        assert!(result.is_ok());
531    }
532
533    #[test]
534    fn test_par_chunks() {
535        let data = vec![1, 2, 3, 4, 5, 6, 7, 8];
536        let chunks: Vec<Vec<i32>> = par_chunks(&data, 3).map(|chunk| chunk.to_vec()).collect();
537        assert_eq!(chunks.len(), 3);
538        assert_eq!(chunks[0], vec![1, 2, 3]);
539        assert_eq!(chunks[1], vec![4, 5, 6]);
540        assert_eq!(chunks[2], vec![7, 8]);
541    }
542
543    #[test]
544    fn test_is_parallel_enabled() {
545        let enabled = is_parallel_enabled();
546        #[cfg(feature = "parallel")]
547        assert!(enabled);
548        #[cfg(not(feature = "parallel"))]
549        assert!(!enabled);
550    }
551
552    #[test]
553    fn test_num_threads() {
554        let threads = num_threads();
555        #[cfg(feature = "parallel")]
556        assert!(threads > 0);
557        #[cfg(not(feature = "parallel"))]
558        assert_eq!(threads, 1);
559    }
560}
561
562/// Parallel scan (prefix sum) operation
563///
564/// Computes cumulative operation (like cumulative sum) in parallel.
565/// Returns a vector where each element is the result of applying the operation
566/// to all preceding elements including itself.
567#[cfg(feature = "parallel")]
568pub fn parallel_scan<T, F>(items: &[T], init: T, op: F) -> Vec<T>
569where
570    T: Clone + Send + Sync,
571    F: Fn(&T, &T) -> T + Sync,
572{
573    use rayon::prelude::*;
574
575    if items.is_empty() {
576        return Vec::new();
577    }
578
579    let mut result = vec![init.clone(); items.len()];
580
581    // Simple sequential approach for now - can be optimized with proper parallel scan
582    result[0] = op(&init, &items[0]);
583    for i in 1..items.len() {
584        result[i] = op(&result[i - 1], &items[i]);
585    }
586
587    result
588}
589
590/// Sequential fallback for parallel_scan
591#[cfg(not(feature = "parallel"))]
592pub fn parallel_scan<T, F>(items: &[T], init: T, op: F) -> Vec<T>
593where
594    T: Clone,
595    F: Fn(&T, &T) -> T,
596{
597    if items.is_empty() {
598        return Vec::new();
599    }
600
601    let mut result = vec![init.clone(); items.len()];
602    result[0] = op(&init, &items[0]);
603    for i in 1..items.len() {
604        result[i] = op(&result[i - 1], &items[i]);
605    }
606
607    result
608}
609
610/// Parallel matrix row operations
611///
612/// Applies an operation to each row of a matrix represented as a slice of slices.
613/// Useful for BLAS-like operations on matrices.
614#[cfg(feature = "parallel")]
615pub fn parallel_matrix_rows<T, U, F>(matrix: &[&[T]], op: F) -> Vec<U>
616where
617    T: Sync,
618    U: Send,
619    F: Fn(&[T]) -> U + Sync,
620{
621    use rayon::prelude::*;
622    matrix.par_iter().map(|row| op(row)).collect()
623}
624
625/// Sequential fallback for parallel_matrix_rows
626#[cfg(not(feature = "parallel"))]
627pub fn parallel_matrix_rows<T, U, F>(matrix: &[&[T]], op: F) -> Vec<U>
628where
629    F: Fn(&[T]) -> U,
630{
631    matrix.iter().map(|row| op(row)).collect()
632}
633
634/// Parallel zip operation for multiple arrays
635///
636/// Processes multiple arrays element-wise in parallel, similar to zip but
637/// optimized for scientific computing workloads.
638#[cfg(feature = "parallel")]
639pub fn parallel_zip<T, U, V, F>(a: &[T], b: &[U], op: F) -> Vec<V>
640where
641    T: Sync,
642    U: Sync,
643    V: Send,
644    F: Fn(&T, &U) -> V + Sync,
645{
646    use rayon::prelude::*;
647    a.par_iter()
648        .zip(b.par_iter())
649        .map(|(x, y)| op(x, y))
650        .collect()
651}
652
653/// Sequential fallback for parallel_zip
654#[cfg(not(feature = "parallel"))]
655pub fn parallel_zip<T, U, V, F>(a: &[T], b: &[U], op: F) -> Vec<V>
656where
657    F: Fn(&T, &U) -> V,
658{
659    a.iter().zip(b.iter()).map(|(x, y)| op(x, y)).collect()
660}
661
662/// Parallel sorting with custom comparison
663///
664/// Sorts a vector in parallel using a custom comparison function.
665/// More efficient than sequential sort for large datasets.
666#[cfg(feature = "parallel")]
667pub fn parallel_sort<T, F>(items: &mut [T], compare: F)
668where
669    T: Send,
670    F: Fn(&T, &T) -> std::cmp::Ordering + Sync,
671{
672    use rayon::slice::ParallelSliceMut;
673    items.par_sort_by(compare);
674}
675
676/// Sequential fallback for parallel_sort
677#[cfg(not(feature = "parallel"))]
678pub fn parallel_sort<T, F>(items: &mut [T], compare: F)
679where
680    F: Fn(&T, &T) -> std::cmp::Ordering,
681{
682    items.sort_by(compare);
683}
684
685/// Work-stealing parallel map for unbalanced workloads
686///
687/// Uses work-stealing to balance load when work per item varies significantly.
688/// Automatically adjusts chunk sizes based on work completion rates.
689#[cfg(feature = "parallel")]
690pub fn parallel_map_work_stealing<T, U, F>(items: &[T], op: F) -> Vec<U>
691where
692    T: Sync,
693    U: Send,
694    F: Fn(&T) -> U + Sync,
695{
696    use rayon::prelude::*;
697
698    // Start with smaller chunks to enable better work stealing
699    let chunk_size = std::cmp::max(1, items.len() / (num_threads() * 4));
700
701    items
702        .par_chunks(chunk_size)
703        .flat_map(|chunk| chunk.par_iter().map(&op))
704        .collect()
705}
706
707/// Sequential fallback for parallel_map_work_stealing
708#[cfg(not(feature = "parallel"))]
709pub fn parallel_map_work_stealing<T, U, F>(items: &[T], op: F) -> Vec<U>
710where
711    F: Fn(&T) -> U,
712{
713    items.iter().map(op).collect()
714}
715
716/// NUMA-aware parallel processing
717///
718/// Attempts to optimize parallel operations for NUMA (Non-Uniform Memory Access) systems
719/// by keeping work close to where data resides in memory.
720#[cfg(feature = "parallel")]
721pub fn parallel_map_numa_aware<T, U, F>(items: &[T], op: F) -> Vec<U>
722where
723    T: Sync,
724    U: Send,
725    F: Fn(&T) -> U + Sync,
726{
727    use rayon::prelude::*;
728
729    let num_cpus = num_threads();
730    let chunk_size = std::cmp::max(1, items.len() / num_cpus);
731
732    // Try to keep chunks aligned to cache lines and NUMA boundaries
733    let aligned_chunk_size = ((chunk_size + 63) / 64) * 64; // Align to 64-element boundaries
734
735    items
736        .par_chunks(aligned_chunk_size)
737        .flat_map(|chunk| chunk.par_iter().map(&op))
738        .collect()
739}
740
741/// Sequential fallback for parallel_map_numa_aware
742#[cfg(not(feature = "parallel"))]
743pub fn parallel_map_numa_aware<T, U, F>(items: &[T], op: F) -> Vec<U>
744where
745    F: Fn(&T) -> U,
746{
747    items.iter().map(op).collect()
748}
749
750/// Parallel reduction with tree-based approach
751///
752/// Uses a binary tree reduction pattern for optimal performance on large datasets.
753/// More efficient than linear reduction for associative operations.
754#[cfg(feature = "parallel")]
755pub fn parallel_tree_reduce<T, F>(items: &[T], op: F) -> Option<T>
756where
757    T: Clone + Send + Sync,
758    F: Fn(T, T) -> T + Sync,
759{
760    use rayon::prelude::*;
761
762    if items.is_empty() {
763        return None;
764    }
765
766    // Use Rayon's reduce which implements tree reduction internally
767    Some(items.par_iter().cloned().reduce(|| items[0].clone(), &op))
768}
769
770/// Sequential fallback for parallel_tree_reduce
771#[cfg(not(feature = "parallel"))]
772pub fn parallel_tree_reduce<T, F>(items: &[T], op: F) -> Option<T>
773where
774    T: Clone,
775    F: Fn(T, T) -> T,
776{
777    items.iter().cloned().reduce(op)
778}
779
780/// Parallel batch processing with progress tracking
781///
782/// Processes items in batches and provides progress information.
783/// Useful for long-running operations where progress feedback is needed.
784#[cfg(feature = "parallel")]
785pub fn parallel_batch_process<T, U, F, P>(
786    items: &[T],
787    batch_size: usize,
788    processor: F,
789    progress_callback: P,
790) -> Vec<U>
791where
792    T: Sync,
793    U: Send,
794    F: Fn(&[T]) -> Vec<U> + Sync,
795    P: Fn(usize, usize) + Sync,
796{
797    use rayon::prelude::*;
798
799    let total_batches = (items.len() + batch_size - 1) / batch_size;
800    let results: Vec<Vec<U>> = items
801        .par_chunks(batch_size)
802        .enumerate()
803        .map(|(batch_idx, chunk)| {
804            let result = processor(chunk);
805            progress_callback(batch_idx + 1, total_batches);
806            result
807        })
808        .collect();
809
810    results.into_iter().flatten().collect()
811}
812
813/// Sequential fallback for parallel_batch_process
814#[cfg(not(feature = "parallel"))]
815pub fn parallel_batch_process<T, U, F, P>(
816    items: &[T],
817    batch_size: usize,
818    processor: F,
819    progress_callback: P,
820) -> Vec<U>
821where
822    F: Fn(&[T]) -> Vec<U>,
823    P: Fn(usize, usize),
824{
825    let total_batches = (items.len() + batch_size - 1) / batch_size;
826    let mut results = Vec::new();
827
828    for (batch_idx, chunk) in items.chunks(batch_size).enumerate() {
829        results.extend(processor(chunk));
830        progress_callback(batch_idx + 1, total_batches);
831    }
832
833    results
834}