scirs2_stats/
parallel_enhanced_advanced.rs

1//! Advanced parallel statistical processing with intelligent optimization
2//!
3//! This module provides state-of-the-art parallel implementations that
4//! automatically adapt to system characteristics and data patterns for
5//! optimal performance across different hardware configurations.
6
7use crate::error::{StatsError, StatsResult};
8use crate::error_standardization::ErrorMessages;
9use crate::simd_enhanced_core::{mean_enhanced, variance_enhanced, ComprehensiveStats};
10use crossbeam;
11use scirs2_core::ndarray::{Array1, Array2, ArrayBase, ArrayView1, Data, Ix1, Ix2};
12use scirs2_core::numeric::{Float, NumCast, One, Zero};
13use scirs2_core::{
14    parallel_ops::*,
15    simd_ops::{PlatformCapabilities, SimdUnifiedOps},
16};
17use std::collections::VecDeque;
18use std::sync::{atomic::AtomicUsize, Arc, Mutex};
19use std::thread;
20
21/// Advanced parallel processing configuration
22#[derive(Debug, Clone)]
23pub struct AdvancedParallelConfig {
24    /// Minimum data size to trigger parallel processing
25    pub parallel_threshold: usize,
26    /// Number of worker threads (None = auto-detect)
27    pub num_threads: Option<usize>,
28    /// Enable NUMA-aware processing
29    pub numa_aware: bool,
30    /// Enable work stealing for better load balancing
31    pub work_stealing: bool,
32    /// Preferred chunk size strategy
33    pub chunk_strategy: ChunkStrategy,
34    /// Maximum memory usage for intermediate results (bytes)
35    pub max_memory_usage: usize,
36}
37
38impl Default for AdvancedParallelConfig {
39    fn default() -> Self {
40        Self {
41            parallel_threshold: 10_000,
42            num_threads: None,
43            numa_aware: true,
44            work_stealing: true,
45            chunk_strategy: ChunkStrategy::Adaptive,
46            max_memory_usage: 1024 * 1024 * 1024, // 1GB
47        }
48    }
49}
50
51/// Chunking strategies for optimal data access patterns
52#[derive(Debug, Clone, Copy)]
53pub enum ChunkStrategy {
54    /// Fixed chunk size
55    Fixed(usize),
56    /// Cache-aware chunking
57    CacheOptimal,
58    /// Adaptive chunking based on data characteristics
59    Adaptive,
60    /// Work-stealing with dynamic load balancing
61    WorkStealing,
62}
63
64/// Advanced parallel statistics processor
65pub struct AdvancedParallelProcessor<F: Float + std::fmt::Display> {
66    config: AdvancedParallelConfig,
67    capabilities: PlatformCapabilities,
68    #[allow(dead_code)]
69    thread_pool: Option<ThreadPool>,
70    #[allow(dead_code)]
71    work_queue: Arc<Mutex<VecDeque<ParallelTask<F>>>>,
72    #[allow(dead_code)]
73    active_workers: Arc<AtomicUsize>,
74}
75
76/// Task for parallel execution
77enum ParallelTask<F: Float + std::fmt::Display> {
78    Mean(Vec<F>),
79    Variance(Vec<F>, F, usize), // data, mean, ddof
80    Correlation(Vec<F>, Vec<F>),
81    Histogram(Vec<F>, usize),
82}
83
84/// Result of parallel computation
85pub enum ParallelResult<F: Float + std::fmt::Display> {
86    Mean(F),
87    Variance(F),
88    Correlation(F),
89    Histogram(Vec<usize>),
90}
91
92impl<F> AdvancedParallelProcessor<F>
93where
94    F: Float
95        + NumCast
96        + Send
97        + Sync
98        + SimdUnifiedOps
99        + Copy
100        + 'static
101        + Zero
102        + One
103        + std::fmt::Debug
104        + std::fmt::Display
105        + std::iter::Sum<F>,
106{
107    /// Create a new advanced parallel processor
108    pub fn new(config: AdvancedParallelConfig) -> Self {
109        let capabilities = PlatformCapabilities::detect();
110
111        Self {
112            config,
113            capabilities,
114            thread_pool: None,
115            work_queue: Arc::new(Mutex::new(VecDeque::new())),
116            active_workers: Arc::new(AtomicUsize::new(0)),
117        }
118    }
119
120    /// Initialize the thread pool with optimal configuration
121    pub fn initialize(&mut self) -> StatsResult<()> {
122        let num_threads = self
123            .config
124            .num_threads
125            .unwrap_or_else(|| self.optimal_thread_count());
126
127        self.thread_pool = Some(ThreadPool::new(num_threads, self.config.clone())?);
128        Ok(())
129    }
130
131    /// Compute mean using advanced parallel processing
132    pub fn mean_parallel_advanced<D>(&self, x: &ArrayBase<D, Ix1>) -> StatsResult<F>
133    where
134        D: Data<Elem = F> + Sync + Send,
135    {
136        if x.is_empty() {
137            return Err(ErrorMessages::empty_array("x"));
138        }
139
140        let n = x.len();
141
142        // Use sequential processing for small arrays
143        if n < self.config.parallel_threshold {
144            return mean_enhanced(x);
145        }
146
147        // Choose optimal parallel strategy
148        match self.config.chunk_strategy {
149            ChunkStrategy::WorkStealing => self.mean_work_stealing(x),
150            ChunkStrategy::Adaptive => self.mean_adaptive_chunking(x),
151            ChunkStrategy::CacheOptimal => self.mean_cache_optimal(x),
152            ChunkStrategy::Fixed(chunksize) => self.mean_fixed_chunks(x, chunksize),
153        }
154    }
155
156    /// Compute variance using advanced parallel processing with numerical stability
157    pub fn variance_parallel_advanced<D>(
158        &self,
159        x: &ArrayBase<D, Ix1>,
160        ddof: usize,
161    ) -> StatsResult<F>
162    where
163        D: Data<Elem = F> + Sync + Send,
164    {
165        let n = x.len();
166        if n == 0 {
167            return Err(ErrorMessages::empty_array("x"));
168        }
169        if n <= ddof {
170            return Err(ErrorMessages::insufficientdata(
171                "variance calculation",
172                ddof + 1,
173                n,
174            ));
175        }
176
177        if n < self.config.parallel_threshold {
178            return variance_enhanced(x, ddof);
179        }
180
181        // Use parallel Welford's algorithm for better numerical stability
182        self.variance_welford_parallel(x, ddof)
183    }
184
185    /// Compute correlation matrix in parallel for multivariate data
186    pub fn correlation_matrix_parallel<D>(&self, data: &ArrayBase<D, Ix2>) -> StatsResult<Array2<F>>
187    where
188        D: Data<Elem = F> + Sync + Send,
189    {
190        let (n_samples_, n_features) = data.dim();
191
192        if n_samples_ == 0 {
193            return Err(ErrorMessages::empty_array("data"));
194        }
195        if n_features == 0 {
196            return Err(ErrorMessages::insufficientdata(
197                "correlation matrix",
198                2,
199                n_features,
200            ));
201        }
202
203        let mut correlation_matrix = Array2::eye(n_features);
204
205        // Parallel computation of upper triangle
206        if n_features > 4 && n_samples_ > self.config.parallel_threshold {
207            self.correlation_matrix_parallel_upper_triangle(data, &mut correlation_matrix)?;
208        } else {
209            self.correlation_matrix_sequential(data, &mut correlation_matrix)?;
210        }
211
212        // Fill lower triangle (correlation matrix is symmetric)
213        for i in 0..n_features {
214            for j in 0..i {
215                correlation_matrix[[i, j]] = correlation_matrix[[j, i]];
216            }
217        }
218
219        Ok(correlation_matrix)
220    }
221
222    /// Batch parallel processing for multiple statistical operations
223    pub fn batch_statistics_parallel<D>(
224        &self,
225        x: &ArrayBase<D, Ix1>,
226        ddof: usize,
227    ) -> StatsResult<ComprehensiveStats<F>>
228    where
229        D: Data<Elem = F> + Sync + Send,
230    {
231        let n = x.len();
232        if n == 0 {
233            return Err(ErrorMessages::empty_array("x"));
234        }
235        if n <= ddof {
236            return Err(ErrorMessages::insufficientdata(
237                "comprehensive statistics",
238                ddof + 1,
239                n,
240            ));
241        }
242
243        if n < self.config.parallel_threshold {
244            // Use the enhanced SIMD version for smaller datasets
245            return crate::simd_enhanced_core::comprehensive_stats_simd(x, ddof);
246        }
247
248        // Parallel single-pass computation of all statistics
249        self.comprehensive_stats_single_pass_parallel(x, ddof)
250    }
251
252    /// Parallel bootstrap resampling with intelligent load balancing
253    pub fn bootstrap_parallel<D>(
254        &self,
255        x: &ArrayBase<D, Ix1>,
256        n_samples_: usize,
257        statistic_fn: impl Fn(&ArrayView1<F>) -> F + Send + Sync + Clone,
258        seed: Option<u64>,
259    ) -> StatsResult<Array1<F>>
260    where
261        D: Data<Elem = F> + Sync + Send,
262    {
263        if x.is_empty() {
264            return Err(ErrorMessages::empty_array("x"));
265        }
266        if n_samples_ == 0 {
267            return Err(ErrorMessages::insufficientdata("bootstrap", 1, 0));
268        }
269
270        let num_threads = self
271            .config
272            .num_threads
273            .unwrap_or_else(|| self.optimal_thread_count());
274        let samples_per_thread = n_samples_.div_ceil(num_threads);
275
276        // Parallel bootstrap computation with work stealing
277        self.bootstrap_work_stealing(x, n_samples_, samples_per_thread, statistic_fn, seed)
278    }
279
280    // Private helper methods
281
282    fn optimal_thread_count(&self) -> usize {
283        let logical_cores = std::thread::available_parallelism()
284            .map(|n| n.get())
285            .unwrap_or(4);
286
287        // Account for hyperthreading - usually optimal to use physical cores
288        // Simple heuristic: if we have more than 2 logical cores, assume hyperthreading
289
290        // For CPU-intensive tasks, use physical cores
291        // For memory-bound tasks, might benefit from more threads
292        // Use physical cores for better performance
293        if logical_cores > 2 {
294            logical_cores / 2
295        } else {
296            logical_cores
297        }
298    }
299
300    fn mean_work_stealing<D>(&self, x: &ArrayBase<D, Ix1>) -> StatsResult<F>
301    where
302        D: Data<Elem = F> + Sync + Send,
303    {
304        let n = x.len();
305        let num_threads = self
306            .config
307            .num_threads
308            .unwrap_or_else(|| self.optimal_thread_count());
309        let initial_chunksize = n.div_ceil(num_threads);
310
311        // Create work queue with initial chunks
312        let work_queue: Arc<Mutex<VecDeque<(usize, usize)>>> =
313            Arc::new(Mutex::new(VecDeque::new()));
314
315        for i in 0..num_threads {
316            let start = i * initial_chunksize;
317            let end = ((i + 1) * initial_chunksize).min(n);
318            if start < end {
319                work_queue.lock().unwrap().push_back((start, end));
320            }
321        }
322
323        let partial_sums: Arc<Mutex<Vec<F>>> = Arc::new(Mutex::new(Vec::new()));
324        let data_slice = x
325            .as_slice()
326            .ok_or(StatsError::InvalidInput("Data not contiguous".to_string()))?;
327
328        crossbeam::scope(|s| {
329            for _ in 0..num_threads {
330                let work_queue = Arc::clone(&work_queue);
331                let partial_sums = Arc::clone(&partial_sums);
332
333                s.spawn(move |_| {
334                    let mut local_sum = F::zero();
335
336                    while let Some((start, end)) = work_queue.lock().unwrap().pop_front() {
337                        // Process chunk safely
338                        for &val in &data_slice[start..end] {
339                            local_sum = local_sum + val;
340                        }
341
342                        // Split remaining work if chunk was large
343                        if end - start > 1000 {
344                            let mid = (start + end) / 2;
345                            if mid > start {
346                                work_queue.lock().unwrap().push_back((mid, end));
347                            }
348                        }
349                    }
350
351                    partial_sums.lock().unwrap().push(local_sum);
352                });
353            }
354        })
355        .unwrap();
356
357        let total_sum = partial_sums
358            .lock()
359            .unwrap()
360            .iter()
361            .fold(F::zero(), |acc, &val| acc + val);
362        Ok(total_sum / F::from(n).unwrap())
363    }
364
365    fn mean_adaptive_chunking<D>(&self, x: &ArrayBase<D, Ix1>) -> StatsResult<F>
366    where
367        D: Data<Elem = F> + Sync + Send,
368    {
369        let n = x.len();
370        let elementsize = std::mem::size_of::<F>();
371
372        // Adaptive chunk size based on cache hierarchy
373        let l1_cache = 32 * 1024; // 32KB L1 cache (typical)
374        let l2_cache = 256 * 1024; // 256KB L2 cache (typical)
375
376        let chunksize = if n * elementsize <= l1_cache {
377            n // Fits in L1, no chunking needed
378        } else if n * elementsize <= l2_cache {
379            l1_cache / elementsize // Chunk to fit in L1
380        } else {
381            l2_cache / elementsize // Chunk to fit in L2
382        };
383
384        let num_chunks = n.div_ceil(chunksize);
385        let _num_threads = self
386            .config
387            .num_threads
388            .unwrap_or_else(|| self.optimal_thread_count());
389
390        // Use thread pool for processing
391        let chunks: Vec<_> = (0..num_chunks)
392            .map(|i| {
393                let start = i * chunksize;
394                let end = ((i + 1) * chunksize).min(n);
395                x.slice(scirs2_core::ndarray::s![start..end])
396            })
397            .collect();
398
399        let partial_sums: Vec<F> = chunks
400            .into_par_iter()
401            .map(|chunk| {
402                if self.capabilities.simd_available && chunk.len() > 64 {
403                    F::simd_sum(&chunk)
404                } else {
405                    chunk.iter().fold(F::zero(), |acc, &val| acc + val)
406                }
407            })
408            .collect();
409
410        let total_sum = partial_sums
411            .into_iter()
412            .fold(F::zero(), |acc, val| acc + val);
413        Ok(total_sum / F::from(n).unwrap())
414    }
415
416    fn mean_cache_optimal<D>(&self, x: &ArrayBase<D, Ix1>) -> StatsResult<F>
417    where
418        D: Data<Elem = F> + Sync + Send,
419    {
420        // Use cache-oblivious algorithm for optimal performance
421        Self::mean_cache_oblivious_static(x, 0, x.len())
422    }
423
424    #[allow(dead_code)]
425    fn mean_cache_oblivious<D>(
426        &self,
427        x: &ArrayBase<D, Ix1>,
428        start: usize,
429        len: usize,
430    ) -> StatsResult<F>
431    where
432        D: Data<Elem = F> + Sync + Send,
433    {
434        Self::mean_cache_oblivious_static(x, start, len)
435    }
436
437    // Static version that can be used in threads
438    fn mean_cache_oblivious_static<D>(
439        x: &ArrayBase<D, Ix1>,
440        start: usize,
441        len: usize,
442    ) -> StatsResult<F>
443    where
444        D: Data<Elem = F> + Sync + Send,
445        F: Float + Send + Sync + 'static + std::fmt::Display,
446    {
447        const CACHE_THRESHOLD: usize = 1024; // Empirically determined threshold
448
449        if len <= CACHE_THRESHOLD {
450            // Base case: compute directly
451            let slice = x.slice(scirs2_core::ndarray::s![start..start + len]);
452            let sum = slice.iter().fold(F::zero(), |acc, &val| acc + val);
453            Ok(sum / F::from(len).unwrap())
454        } else {
455            // Divide and conquer (sequential to avoid lifetime issues)
456            let mid = len / 2;
457            let left_result = Self::mean_cache_oblivious_static(x, start, mid)?;
458            let right_result = Self::mean_cache_oblivious_static(x, start + mid, len - mid)?;
459
460            // Combine results weighted by size
461            let left_weight = F::from(mid).unwrap();
462            let right_weight = F::from(len - mid).unwrap();
463            let total_weight = F::from(len).unwrap();
464
465            Ok((left_result * left_weight + right_result * right_weight) / total_weight)
466        }
467    }
468
469    fn mean_fixed_chunks<D>(&self, x: &ArrayBase<D, Ix1>, chunksize: usize) -> StatsResult<F>
470    where
471        D: Data<Elem = F> + Sync + Send,
472    {
473        let n = x.len();
474        let chunks: Vec<_> = x
475            .exact_chunks(chunksize)
476            .into_iter()
477            .chain(if !n.is_multiple_of(chunksize) {
478                vec![x.slice(scirs2_core::ndarray::s![n - (n % chunksize)..])]
479            } else {
480                vec![]
481            })
482            .collect();
483
484        let partial_sums: Vec<F> = chunks
485            .into_par_iter()
486            .map(|chunk| chunk.iter().fold(F::zero(), |acc, &val| acc + val))
487            .collect();
488
489        let total_sum = partial_sums
490            .into_iter()
491            .fold(F::zero(), |acc, val| acc + val);
492        Ok(total_sum / F::from(n).unwrap())
493    }
494
495    fn variance_welford_parallel<D>(&self, x: &ArrayBase<D, Ix1>, ddof: usize) -> StatsResult<F>
496    where
497        D: Data<Elem = F> + Sync + Send,
498    {
499        // Parallel Welford's algorithm implementation
500        let n = x.len();
501        let num_threads = self
502            .config
503            .num_threads
504            .unwrap_or_else(|| self.optimal_thread_count());
505        let chunksize = n.div_ceil(num_threads);
506
507        let results: Vec<(F, F, usize)> = (0..num_threads)
508            .into_par_iter()
509            .map(|i| {
510                let start = i * chunksize;
511                let end = ((i + 1) * chunksize).min(n);
512
513                if start >= end {
514                    return (F::zero(), F::zero(), 0);
515                }
516
517                let chunk = x.slice(scirs2_core::ndarray::s![start..end]);
518                let mut mean = F::zero();
519                let mut m2 = F::zero();
520                let count = chunk.len();
521
522                for (j, &val) in chunk.iter().enumerate() {
523                    let n = F::from(j + 1).unwrap();
524                    let delta = val - mean;
525                    mean = mean + delta / n;
526                    let delta2 = val - mean;
527                    m2 = m2 + delta * delta2;
528                }
529
530                (mean, m2, count)
531            })
532            .collect();
533
534        // Combine results using parallel reduction
535        let (_final_mean, final_m2, final_count) = results.into_iter().fold(
536            (F::zero(), F::zero(), 0),
537            |(mean_a, m2_a, count_a), (mean_b, m2_b, count_b)| {
538                if count_b == 0 {
539                    return (mean_a, m2_a, count_a);
540                }
541                if count_a == 0 {
542                    return (mean_b, m2_b, count_b);
543                }
544
545                let total_count = count_a + count_b;
546                let count_a_f = F::from(count_a).unwrap();
547                let count_b_f = F::from(count_b).unwrap();
548                let total_count_f = F::from(total_count).unwrap();
549
550                let delta = mean_b - mean_a;
551                let combined_mean = (mean_a * count_a_f + mean_b * count_b_f) / total_count_f;
552                let combined_m2 =
553                    m2_a + m2_b + delta * delta * count_a_f * count_b_f / total_count_f;
554
555                (combined_mean, combined_m2, total_count)
556            },
557        );
558
559        Ok(final_m2 / F::from(n - ddof).unwrap())
560    }
561
562    fn correlation_matrix_parallel_upper_triangle<D>(
563        &self,
564        data: &ArrayBase<D, Ix2>,
565        correlation_matrix: &mut Array2<F>,
566    ) -> StatsResult<()>
567    where
568        D: Data<Elem = F> + Sync + Send,
569    {
570        let (_, n_features) = data.dim();
571
572        // Generate pairs for upper triangle
573        let pairs: Vec<(usize, usize)> = (0..n_features)
574            .flat_map(|i| (i + 1..n_features).map(move |j| (i, j)))
575            .collect();
576
577        let results: Vec<((usize, usize), F)> = pairs
578            .into_par_iter()
579            .map(|(i, j)| {
580                let x = data.column(i);
581                let y = data.column(j);
582                let corr = crate::simd_enhanced_core::correlation_simd_enhanced(&x, &y)
583                    .unwrap_or(F::zero());
584                ((i, j), corr)
585            })
586            .collect();
587
588        // Fill the correlation _matrix
589        for ((i, j), corr) in results {
590            correlation_matrix[[i, j]] = corr;
591        }
592
593        Ok(())
594    }
595
596    fn correlation_matrix_sequential<D>(
597        &self,
598        data: &ArrayBase<D, Ix2>,
599        correlation_matrix: &mut Array2<F>,
600    ) -> StatsResult<()>
601    where
602        D: Data<Elem = F> + Sync + Send,
603    {
604        let (_, n_features) = data.dim();
605
606        for i in 0..n_features {
607            for j in i + 1..n_features {
608                let x = data.column(i);
609                let y = data.column(j);
610                let corr = crate::simd_enhanced_core::correlation_simd_enhanced(&x, &y)?;
611                correlation_matrix[[i, j]] = corr;
612            }
613        }
614
615        Ok(())
616    }
617
618    fn comprehensive_stats_single_pass_parallel<D>(
619        &self,
620        x: &ArrayBase<D, Ix1>,
621        ddof: usize,
622    ) -> StatsResult<ComprehensiveStats<F>>
623    where
624        D: Data<Elem = F> + Sync + Send,
625    {
626        let n = x.len();
627        let num_threads = self
628            .config
629            .num_threads
630            .unwrap_or_else(|| self.optimal_thread_count());
631        let chunksize = n.div_ceil(num_threads);
632
633        // Parallel computation of all moments
634        let results: Vec<(F, F, F, F, usize)> = (0..num_threads)
635            .into_par_iter()
636            .map(|i| {
637                let start = i * chunksize;
638                let end = ((i + 1) * chunksize).min(n);
639
640                if start >= end {
641                    return (F::zero(), F::zero(), F::zero(), F::zero(), 0);
642                }
643
644                let chunk = x.slice(scirs2_core::ndarray::s![start..end]);
645                let count = chunk.len();
646                let count_f = F::from(count).unwrap();
647
648                // Single pass computation of all moments
649                let mean = chunk.iter().fold(F::zero(), |acc, &val| acc + val) / count_f;
650
651                let (m2, m3, m4) =
652                    chunk
653                        .iter()
654                        .fold((F::zero(), F::zero(), F::zero()), |(m2, m3, m4), &val| {
655                            let dev = val - mean;
656                            let dev2 = dev * dev;
657                            let dev3 = dev2 * dev;
658                            let dev4 = dev2 * dev2;
659                            (m2 + dev2, m3 + dev3, m4 + dev4)
660                        });
661
662                (mean, m2, m3, m4, count)
663            })
664            .collect();
665
666        // Combine results
667        let (total_mean, total_m2_, total_m3, total_m4, total_count) = results.into_iter().fold(
668            (F::zero(), F::zero(), F::zero(), F::zero(), 0),
669            |(mean_acc, m2_acc, m3_acc, m4_acc, count_acc), (mean, m2, m3, m4, count)| {
670                if count == 0 {
671                    return (mean_acc, m2_acc, m3_acc, m4_acc, count_acc);
672                }
673                if count_acc == 0 {
674                    return (mean, m2, m3, m4, count);
675                }
676
677                // Combine means
678                let total_count = count_acc + count;
679                let count_f = F::from(count).unwrap();
680                let count_acc_f = F::from(count_acc).unwrap();
681                let total_count_f = F::from(total_count).unwrap();
682
683                let combined_mean = (mean_acc * count_acc_f + mean * count_f) / total_count_f;
684
685                // For simplicity, recalculate moments (could be optimized further)
686                (
687                    combined_mean,
688                    m2_acc + m2,
689                    m3_acc + m3,
690                    m4_acc + m4,
691                    total_count,
692                )
693            },
694        );
695
696        let variance = total_m2_ / F::from(n - ddof).unwrap();
697        let std = variance.sqrt();
698
699        let skewness = if variance > F::epsilon() {
700            (total_m3 / F::from(n).unwrap()) / variance.powf(F::from(1.5).unwrap())
701        } else {
702            F::zero()
703        };
704
705        let kurtosis = if variance > F::epsilon() {
706            (total_m4 / F::from(n).unwrap()) / (variance * variance) - F::from(3.0).unwrap()
707        } else {
708            F::zero()
709        };
710
711        Ok(ComprehensiveStats {
712            mean: total_mean,
713            variance,
714            std,
715            skewness,
716            kurtosis,
717            count: n,
718        })
719    }
720
721    fn bootstrap_work_stealing<D>(
722        &self,
723        x: &ArrayBase<D, Ix1>,
724        n_samples_: usize,
725        samples_per_thread: usize,
726        statistic_fn: impl Fn(&ArrayView1<F>) -> F + Send + Sync + Clone,
727        seed: Option<u64>,
728    ) -> StatsResult<Array1<F>>
729    where
730        D: Data<Elem = F> + Sync + Send,
731    {
732        use scirs2_core::random::ChaCha8Rng;
733        use scirs2_core::random::{Rng, SeedableRng};
734
735        let num_threads = self
736            .config
737            .num_threads
738            .unwrap_or_else(|| self.optimal_thread_count());
739        let _results: Vec<F> = Vec::with_capacity(n_samples_);
740
741        let data_vec: Vec<F> = x.iter().cloned().collect();
742        let data_arc = Arc::new(data_vec);
743
744        let partial_results: Arc<Mutex<Vec<F>>> = Arc::new(Mutex::new(Vec::new()));
745
746        crossbeam::scope(|s| {
747            for thread_id in 0..num_threads {
748                let data_arc = Arc::clone(&data_arc);
749                let partial_results = Arc::clone(&partial_results);
750                let statistic_fn = statistic_fn.clone();
751
752                s.spawn(move |_| {
753                    let mut rng = if let Some(seed) = seed {
754                        ChaCha8Rng::seed_from_u64(seed + thread_id as u64)
755                    } else {
756                        ChaCha8Rng::from_rng(&mut scirs2_core::random::thread_rng())
757                    };
758
759                    let mut local_results = Vec::with_capacity(samples_per_thread);
760                    let ndata = data_arc.len();
761
762                    for _ in 0..samples_per_thread {
763                        // Generate bootstrap sample
764                        let bootstrap_indices: Vec<usize> =
765                            (0..ndata).map(|_| rng.gen_range(0..ndata)).collect();
766
767                        let bootstrap_sample: Vec<F> =
768                            bootstrap_indices.into_iter().map(|i| data_arc[i]).collect();
769
770                        let sample_array = Array1::from(bootstrap_sample);
771                        let statistic = statistic_fn(&sample_array.view());
772                        local_results.push(statistic);
773                    }
774
775                    partial_results.lock().unwrap().extend(local_results);
776                });
777            }
778        })
779        .unwrap();
780
781        let mut all_results = partial_results.lock().unwrap();
782        all_results.truncate(n_samples_); // Ensure exact number of _samples
783
784        Ok(Array1::from(all_results.clone()))
785    }
786}
787
788/// Simple thread pool for parallel execution
789struct ThreadPool {
790    workers: Vec<thread::JoinHandle<()>>,
791    sender: std::sync::mpsc::Sender<Message>,
792}
793
794type Job = Box<dyn FnOnce() + Send + 'static>;
795
796enum Message {
797    NewJob(Job),
798    Terminate,
799}
800
801impl ThreadPool {
802    fn new(size: usize, config: AdvancedParallelConfig) -> StatsResult<ThreadPool> {
803        if size == 0 {
804            return Err(ErrorMessages::invalid_probability("thread count", 0.0));
805        }
806
807        let (sender, receiver) = std::sync::mpsc::channel();
808        let receiver = Arc::new(Mutex::new(receiver));
809        let mut workers = Vec::with_capacity(size);
810
811        for _id in 0..size {
812            let receiver = Arc::clone(&receiver);
813
814            let worker = thread::spawn(move || loop {
815                let message = receiver.lock().unwrap().recv().unwrap();
816
817                match message {
818                    Message::NewJob(job) => {
819                        job();
820                    }
821                    Message::Terminate => {
822                        break;
823                    }
824                }
825            });
826
827            workers.push(worker);
828        }
829
830        Ok(ThreadPool { workers, sender })
831    }
832
833    #[allow(dead_code)]
834    fn execute<F>(&self, f: F)
835    where
836        F: FnOnce() + Send + 'static,
837    {
838        let job = Box::new(f);
839        self.sender.send(Message::NewJob(job)).unwrap();
840    }
841}
842
843impl Drop for ThreadPool {
844    fn drop(&mut self) {
845        for _ in &self.workers {
846            self.sender.send(Message::Terminate).unwrap();
847        }
848
849        for worker in &mut self.workers {
850            if let Some(handle) = worker.thread().name() {
851                println!("Shutting down worker {}", handle);
852            }
853        }
854    }
855}
856
857/// Convenience function to create an advanced parallel processor
858#[allow(dead_code)]
859pub fn create_advanced_parallel_processor<F>() -> AdvancedParallelProcessor<F>
860where
861    F: Float
862        + NumCast
863        + Send
864        + Sync
865        + SimdUnifiedOps
866        + Copy
867        + 'static
868        + Zero
869        + One
870        + std::fmt::Debug
871        + std::fmt::Display
872        + std::iter::Sum<F>,
873{
874    AdvancedParallelProcessor::new(AdvancedParallelConfig::default())
875}
876
877/// Convenience function to create a processor with custom configuration
878#[allow(dead_code)]
879pub fn create_configured_parallel_processor<F>(
880    config: AdvancedParallelConfig,
881) -> AdvancedParallelProcessor<F>
882where
883    F: Float
884        + NumCast
885        + Send
886        + Sync
887        + SimdUnifiedOps
888        + Copy
889        + 'static
890        + Zero
891        + One
892        + std::fmt::Debug
893        + std::fmt::Display
894        + std::iter::Sum<F>,
895{
896    AdvancedParallelProcessor::new(config)
897}