scirs2_stats/
parallel_stats_enhanced.rs

1//! Enhanced parallel statistical operations with adaptive processing
2//!
3//! This module provides enhanced parallel implementations with:
4//! - Adaptive threshold selection based on system capabilities
5//! - Better load balancing for heterogeneous data
6//! - Cache-aware chunking strategies
7//! - NUMA-aware processing for large systems
8
9use crate::descriptive_simd::{mean_simd, variance_simd};
10use crate::error::{StatsError, StatsResult};
11use scirs2_core::ndarray::{s, Array1, Array2, ArrayBase, ArrayView2, Data, Ix1, Ix2};
12use scirs2_core::numeric::{Float, NumCast};
13use scirs2_core::parallel_ops::{num_threads, par_chunks, parallel_map, ParallelIterator};
14use scirs2_core::simd_ops::{PlatformCapabilities, SimdUnifiedOps};
15use std::f64::consts::PI;
16use std::sync::Arc;
17
18/// Adaptive threshold calculator based on system capabilities
19pub struct AdaptiveThreshold {
20    base_threshold: usize,
21    cpu_cores: usize,
22    simd_available: bool,
23    cache_linesize: usize,
24}
25
26impl Default for AdaptiveThreshold {
27    fn default() -> Self {
28        Self::new()
29    }
30}
31
32impl AdaptiveThreshold {
33    /// Create a new adaptive threshold calculator
34    pub fn new() -> Self {
35        let caps = PlatformCapabilities::detect();
36        Self {
37            base_threshold: 10_000,
38            cpu_cores: num_threads(),
39            simd_available: caps.simd_available,
40            cache_linesize: 64, // Common cache line size
41        }
42    }
43
44    /// Calculate the optimal threshold for parallel processing
45    pub fn calculate(&self, elementsize: usize, operationcomplexity: f64) -> usize {
46        // Adjust threshold based on:
47        // 1. Number of CPU cores
48        // 2. SIMD availability (SIMD reduces the need for parallelism)
49        // 3. Operation _complexity
50        // 4. Cache efficiency
51
52        let simd_factor = if self.simd_available { 2.0 } else { 1.0 };
53        let core_factor = (self.cpu_cores as f64).sqrt();
54        let cache_factor = (self.cache_linesize / elementsize).max(1) as f64;
55
56        let adjusted_threshold =
57            (self.base_threshold as f64 * simd_factor / core_factor / operationcomplexity
58                * cache_factor.sqrt()) as usize;
59
60        adjusted_threshold.max(1000) // Minimum threshold
61    }
62
63    /// Calculate optimal chunk size for cache efficiency
64    pub fn optimal_chunksize(&self, total_elements: usize, elementsize: usize) -> usize {
65        // L1 cache is typically 32KB per core
66        let l1_cachesize = 32 * 1024;
67        let elements_per_cache = l1_cachesize / elementsize;
68
69        // Optimal chunk size should fit in L1 cache
70        let ideal_chunk = elements_per_cache / 2; // Leave room for other data
71
72        // Balance between cache efficiency and parallelism
73        let min_chunks = self.cpu_cores * 4; // At least 4 chunks per core
74        let max_chunksize = total_elements / min_chunks;
75
76        ideal_chunk.min(max_chunksize).max(64)
77    }
78}
79
80/// Parallel histogram computation with adaptive binning
81pub struct ParallelHistogram<F: Float> {
82    bins: Vec<F>,
83    counts: Vec<usize>,
84    min_val: F,
85    max_val: F,
86    n_bins: usize,
87}
88
89impl<F: Float + NumCast + Send + Sync + std::fmt::Display> ParallelHistogram<F> {
90    /// Create a new parallel histogram
91    pub fn new<D>(data: &ArrayBase<D, Ix1>, nbins: usize) -> StatsResult<Self>
92    where
93        D: Data<Elem = F> + Sync,
94    {
95        if data.is_empty() {
96            return Err(StatsError::InvalidArgument(
97                "Cannot create histogram from empty data".to_string(),
98            ));
99        }
100
101        let threshold = AdaptiveThreshold::new();
102        let parallel_threshold = threshold.calculate(std::mem::size_of::<F>(), 1.0);
103
104        // Find min/max in parallel if data is large enough
105        let (min_val, max_val) = if data.len() >= parallel_threshold {
106            let chunksize = threshold.optimal_chunksize(data.len(), std::mem::size_of::<F>());
107
108            let (min, max) = par_chunks(data.as_slice().unwrap(), chunksize)
109                .map(|chunk| {
110                    let mut local_min = chunk[0];
111                    let mut local_max = chunk[0];
112                    for &val in chunk.iter().skip(1) {
113                        if val < local_min {
114                            local_min = val;
115                        }
116                        if val > local_max {
117                            local_max = val;
118                        }
119                    }
120                    (local_min, local_max)
121                })
122                .reduce(
123                    || (F::infinity(), F::neg_infinity()),
124                    |(min1, max1), (min2, max2)| {
125                        (
126                            if min1 < min2 { min1 } else { min2 },
127                            if max1 > max2 { max1 } else { max2 },
128                        )
129                    },
130                );
131            (min, max)
132        } else {
133            // Sequential for small data
134            let mut min = data[0];
135            let mut max = data[0];
136            for &val in data.iter().skip(1) {
137                if val < min {
138                    min = val;
139                }
140                if val > max {
141                    max = val;
142                }
143            }
144            (min, max)
145        };
146
147        // Create _bins
148        let range = max_val - min_val;
149        let bin_width = range / F::from(nbins).unwrap();
150
151        let bins: Vec<F> = (0..=nbins)
152            .map(|i| min_val + bin_width * F::from(i).unwrap())
153            .collect();
154
155        let mut histogram = Self {
156            bins,
157            counts: vec![0; nbins],
158            min_val,
159            max_val,
160            n_bins: nbins,
161        };
162
163        // Compute counts
164        histogram.compute_counts(data)?;
165
166        Ok(histogram)
167    }
168
169    /// Compute histogram counts in parallel
170    fn compute_counts<D>(&mut self, data: &ArrayBase<D, Ix1>) -> StatsResult<()>
171    where
172        D: Data<Elem = F> + Sync,
173    {
174        let threshold = AdaptiveThreshold::new();
175        let parallel_threshold = threshold.calculate(std::mem::size_of::<F>(), 2.0);
176
177        if data.len() < parallel_threshold {
178            // Sequential computation
179            let bin_width = (self.max_val - self.min_val) / F::from(self.n_bins).unwrap();
180
181            for &val in data.iter() {
182                if val >= self.min_val && val <= self.max_val {
183                    let bin_idx = ((val - self.min_val) / bin_width)
184                        .floor()
185                        .to_usize()
186                        .unwrap()
187                        .min(self.n_bins - 1);
188                    self.counts[bin_idx] += 1;
189                }
190            }
191        } else {
192            // Parallel computation with thread-local histograms
193            let chunksize = threshold.optimal_chunksize(data.len(), std::mem::size_of::<F>());
194            let bin_width = (self.max_val - self.min_val) / F::from(self.n_bins).unwrap();
195            let n_bins = self.n_bins;
196            let min_val = self.min_val;
197
198            // Each thread maintains its own histogram
199            let local_histograms: Vec<Vec<usize>> = par_chunks(data.as_slice().unwrap(), chunksize)
200                .map(|chunk| {
201                    let mut local_counts = vec![0; n_bins];
202
203                    for &val in chunk {
204                        if val >= min_val && val <= self.max_val {
205                            let bin_idx = ((val - min_val) / bin_width)
206                                .floor()
207                                .to_usize()
208                                .unwrap()
209                                .min(n_bins - 1);
210                            local_counts[bin_idx] += 1;
211                        }
212                    }
213
214                    local_counts
215                })
216                .collect();
217
218            // Merge local histograms
219            for local_counts in local_histograms {
220                for (i, count) in local_counts.into_iter().enumerate() {
221                    self.counts[i] += count;
222                }
223            }
224        }
225
226        Ok(())
227    }
228
229    /// Get histogram bins and counts
230    pub fn get_histogram(&self) -> (&[F], &[usize]) {
231        (&self.bins[..self.n_bins], &self.counts)
232    }
233}
234
235/// Parallel kernel density estimation
236#[allow(dead_code)]
237pub fn kde_parallel<F, D>(
238    data: &ArrayBase<D, Ix1>,
239    eval_points: &Array1<F>,
240    bandwidth: F,
241) -> StatsResult<Array1<F>>
242where
243    F: Float + NumCast + Send + Sync + SimdUnifiedOps,
244    D: Data<Elem = F> + Sync,
245{
246    if data.is_empty() || eval_points.is_empty() {
247        return Err(StatsError::InvalidArgument("Empty input data".to_string()));
248    }
249
250    let n = data.len();
251    let threshold = AdaptiveThreshold::new();
252    let parallel_threshold = threshold.calculate(std::mem::size_of::<F>(), 3.0);
253
254    // Gaussian kernel constant
255    let norm_const =
256        F::one() / (F::from(2.0 * PI).unwrap().sqrt() * bandwidth * F::from(n).unwrap());
257
258    if eval_points.len() * n < parallel_threshold {
259        // Sequential KDE
260        let mut result = Array1::zeros(eval_points.len());
261
262        for (i, &x) in eval_points.iter().enumerate() {
263            let mut density = F::zero();
264            for &xi in data.iter() {
265                let u = (x - xi) / bandwidth;
266                density = density + (-u * u / F::from(2.0).unwrap()).exp();
267            }
268            result[i] = density * norm_const;
269        }
270
271        Ok(result)
272    } else {
273        // Parallel KDE
274        let eval_vec: Vec<F> = eval_points.to_vec();
275        let data_slice = data.as_slice().unwrap();
276
277        let densities: Vec<F> = parallel_map(&eval_vec, |&x| {
278            let chunksize = threshold.optimal_chunksize(n, std::mem::size_of::<F>());
279
280            // Compute density for this evaluation point in parallel
281            let density: F = par_chunks(data_slice, chunksize)
282                .map(|chunk| {
283                    let mut local_sum = F::zero();
284                    for &xi in chunk {
285                        let u = (x - xi) / bandwidth;
286                        local_sum = local_sum + (-u * u / F::from(2.0).unwrap()).exp();
287                    }
288                    local_sum
289                })
290                .reduce(|| F::zero(), |a, b| a + b);
291
292            density * norm_const
293        });
294
295        Ok(Array1::from_vec(densities))
296    }
297}
298
299/// Parallel moving statistics (rolling mean, std, etc.)
300pub struct ParallelMovingStats<F: Float> {
301    windowsize: usize,
302    data: Arc<Vec<F>>,
303}
304
305impl<F: Float + NumCast + Send + Sync + SimdUnifiedOps + std::fmt::Display> ParallelMovingStats<F> {
306    /// Create a new moving statistics calculator
307    pub fn new<D>(data: &ArrayBase<D, Ix1>, windowsize: usize) -> StatsResult<Self>
308    where
309        D: Data<Elem = F>,
310    {
311        if windowsize == 0 || windowsize > data.len() {
312            return Err(StatsError::InvalidArgument(
313                "Invalid window size".to_string(),
314            ));
315        }
316
317        Ok(Self {
318            windowsize,
319            data: Arc::new(data.to_vec()),
320        })
321    }
322
323    /// Compute moving average in parallel
324    pub fn moving_mean(&self) -> StatsResult<Array1<F>> {
325        let n = self.data.len();
326        let output_len = n - self.windowsize + 1;
327
328        let threshold = AdaptiveThreshold::new();
329        let parallel_threshold = threshold.calculate(std::mem::size_of::<F>(), 2.0);
330
331        if output_len < parallel_threshold {
332            // Sequential computation
333            let mut result = Array1::zeros(output_len);
334            let windowsize_f = F::from(self.windowsize).unwrap();
335
336            // Initial window sum
337            let mut window_sum = self.data[..self.windowsize]
338                .iter()
339                .fold(F::zero(), |acc, &x| acc + x);
340            result[0] = window_sum / windowsize_f;
341
342            // Sliding window
343            for i in 1..output_len {
344                window_sum = window_sum - self.data[i - 1] + self.data[i + self.windowsize - 1];
345                result[i] = window_sum / windowsize_f;
346            }
347
348            Ok(result)
349        } else {
350            // Parallel computation
351            let indices: Vec<usize> = (0..output_len).collect();
352            let data_ref = Arc::clone(&self.data);
353            let windowsize = self.windowsize;
354            let windowsize_f = F::from(windowsize).unwrap();
355
356            let means: Vec<F> = parallel_map(&indices, |&i| {
357                let window_sum = data_ref[i..i + windowsize]
358                    .iter()
359                    .fold(F::zero(), |acc, &x| acc + x);
360                window_sum / windowsize_f
361            });
362
363            Ok(Array1::from_vec(means))
364        }
365    }
366
367    /// Compute moving standard deviation in parallel
368    pub fn moving_std(&self, ddof: usize) -> StatsResult<Array1<F>> {
369        let n = self.data.len();
370        let output_len = n - self.windowsize + 1;
371
372        if self.windowsize <= ddof {
373            return Err(StatsError::InvalidArgument(
374                "Window size must be greater than ddof".to_string(),
375            ));
376        }
377
378        let threshold = AdaptiveThreshold::new();
379        let parallel_threshold = threshold.calculate(std::mem::size_of::<F>(), 3.0);
380
381        if output_len < parallel_threshold {
382            // Sequential computation using Welford's algorithm
383            let mut result = Array1::zeros(output_len);
384            let divisor = F::from(self.windowsize - ddof).unwrap();
385
386            for i in 0..output_len {
387                let window = &self.data[i..i + self.windowsize];
388                let mean = window.iter().fold(F::zero(), |acc, &x| acc + x)
389                    / F::from(self.windowsize).unwrap();
390
391                let variance = window
392                    .iter()
393                    .map(|&x| {
394                        let diff = x - mean;
395                        diff * diff
396                    })
397                    .fold(F::zero(), |acc, x| acc + x)
398                    / divisor;
399
400                result[i] = variance.sqrt();
401            }
402
403            Ok(result)
404        } else {
405            // Parallel computation
406            let indices: Vec<usize> = (0..output_len).collect();
407            let data_ref = Arc::clone(&self.data);
408            let windowsize = self.windowsize;
409            let divisor = F::from(windowsize - ddof).unwrap();
410
411            let stds: Vec<F> = parallel_map(&indices, |&i| {
412                let window = &data_ref[i..i + windowsize];
413                let mean =
414                    window.iter().fold(F::zero(), |acc, &x| acc + x) / F::from(windowsize).unwrap();
415
416                let variance = window
417                    .iter()
418                    .map(|&x| {
419                        let diff = x - mean;
420                        diff * diff
421                    })
422                    .fold(F::zero(), |acc, x| acc + x)
423                    / divisor;
424
425                variance.sqrt()
426            });
427
428            Ok(Array1::from_vec(stds))
429        }
430    }
431}
432
433/// Parallel computation of pairwise distances
434#[allow(dead_code)]
435pub fn pairwise_distances_parallel<F, D>(
436    x: &ArrayBase<D, Ix2>,
437    metric: &str,
438) -> StatsResult<Array2<F>>
439where
440    F: Float + NumCast + Send + Sync + SimdUnifiedOps,
441    D: Data<Elem = F> + Sync,
442{
443    let n = x.nrows();
444    let d = x.ncols();
445
446    let threshold = AdaptiveThreshold::new();
447    let parallel_threshold = threshold.calculate(std::mem::size_of::<F>() * d, 2.0);
448
449    if n * n < parallel_threshold {
450        // Sequential computation
451        let mut distances = Array2::zeros((n, n));
452
453        for i in 0..n {
454            for j in i + 1..n {
455                let dist = match metric {
456                    "euclidean" => {
457                        let mut sum = F::zero();
458                        for k in 0..d {
459                            let diff = x[(i, k)] - x[(j, k)];
460                            sum = sum + diff * diff;
461                        }
462                        sum.sqrt()
463                    }
464                    "manhattan" => {
465                        let mut sum = F::zero();
466                        for k in 0..d {
467                            sum = sum + (x[(i, k)] - x[(j, k)]).abs();
468                        }
469                        sum
470                    }
471                    _ => return Err(StatsError::InvalidArgument("Unknown metric".to_string())),
472                };
473
474                distances[(i, j)] = dist;
475                distances[(j, i)] = dist;
476            }
477        }
478
479        Ok(distances)
480    } else {
481        // Parallel computation
482        let mut distances = Array2::zeros((n, n));
483        let pairs: Vec<(usize, usize)> = (0..n)
484            .flat_map(|i| (i + 1..n).map(move |j| (i, j)))
485            .collect();
486
487        let computed_distances: Vec<((usize, usize), F)> = parallel_map(&pairs, |&(i, j)| {
488            let dist = match metric {
489                "euclidean" => {
490                    if d > 8 && F::simd_available() {
491                        // Use SIMD for distance computation
492                        let row_i = x.slice(s![i, ..]);
493                        let row_j = x.slice(s![j, ..]);
494                        let diff = F::simd_sub(&row_i, &row_j);
495                        let squared = F::simd_mul(&diff.view(), &diff.view());
496                        F::simd_sum(&squared.view()).sqrt()
497                    } else {
498                        let mut sum = F::zero();
499                        for k in 0..d {
500                            let diff = x[(i, k)] - x[(j, k)];
501                            sum = sum + diff * diff;
502                        }
503                        sum.sqrt()
504                    }
505                }
506                "manhattan" => {
507                    let mut sum = F::zero();
508                    for k in 0..d {
509                        sum = sum + (x[(i, k)] - x[(j, k)]).abs();
510                    }
511                    sum
512                }
513                _ => F::nan(), // Should not reach here
514            };
515            ((i, j), dist)
516        });
517
518        // Fill the distance matrix
519        for ((i, j), dist) in computed_distances {
520            distances[(i, j)] = dist;
521            distances[(j, i)] = dist;
522        }
523
524        Ok(distances)
525    }
526}
527
528/// Parallel cross-validation for model evaluation
529pub struct ParallelCrossValidation<F: Float> {
530    n_folds: usize,
531    shuffle: bool,
532    random_state: Option<u64>,
533    _phantom: std::marker::PhantomData<F>,
534}
535
536impl<F: Float + NumCast + Send + Sync + std::fmt::Display> ParallelCrossValidation<F> {
537    /// Create a new cross-validation splitter
538    pub fn new(n_folds: usize, shuffle: bool, randomstate: Option<u64>) -> Self {
539        Self {
540            n_folds,
541            shuffle,
542            random_state: randomstate,
543            _phantom: std::marker::PhantomData,
544        }
545    }
546
547    /// Perform parallel cross-validation
548    pub fn cross_val_score<D, M, S>(
549        &self,
550        x: &ArrayBase<D, Ix2>,
551        y: &Array1<F>,
552        model: M,
553        scorer: S,
554    ) -> StatsResult<Array1<F>>
555    where
556        D: Data<Elem = F> + Sync,
557        M: Fn(&ArrayView2<F>, &Array1<F>) -> StatsResult<()> + Send + Sync + Clone,
558        S: Fn(&ArrayView2<F>, &Array1<F>) -> StatsResult<F> + Send + Sync,
559    {
560        let n_samples_ = x.nrows();
561        if n_samples_ != y.len() {
562            return Err(StatsError::DimensionMismatch(
563                "X and y must have same number of samples".to_string(),
564            ));
565        }
566
567        // Create fold indices
568        let indices: Vec<usize> = if self.shuffle {
569            use crate::random::permutation_int;
570            permutation_int(n_samples_, self.random_state)
571                .unwrap()
572                .to_vec()
573        } else {
574            (0..n_samples_).collect()
575        };
576
577        // Split into folds
578        let foldsize = n_samples_ / self.n_folds;
579        let remainder = n_samples_ % self.n_folds;
580
581        let fold_indices: Vec<(usize, usize)> = (0..self.n_folds)
582            .map(|i| {
583                let start = i * foldsize + i.min(remainder);
584                let size = foldsize + if i < remainder { 1 } else { 0 };
585                (start, start + size)
586            })
587            .collect();
588
589        // Parallel cross-validation
590        let scores: Vec<F> = parallel_map(&fold_indices, |&(test_start, test_end)| {
591            // Create train/test splits
592            let test_indices = &indices[test_start..test_end];
593            let train_indices: Vec<usize> = indices[..test_start]
594                .iter()
595                .chain(indices[test_end..].iter())
596                .copied()
597                .collect();
598
599            // Create train/test data
600            let x_train = x.select(scirs2_core::ndarray::Axis(0), &train_indices);
601            let y_train = y.select(scirs2_core::ndarray::Axis(0), &train_indices);
602            let x_test = x.select(scirs2_core::ndarray::Axis(0), test_indices);
603            let y_test = y.select(scirs2_core::ndarray::Axis(0), test_indices);
604
605            // Train model on fold
606            let fold_model = model.clone();
607            fold_model(&x_train.view(), &y_train)?;
608
609            // Score on test set
610            scorer(&x_test.view(), &y_test)
611        })
612        .into_iter()
613        .collect::<StatsResult<Vec<_>>>()?;
614
615        Ok(Array1::from_vec(scores))
616    }
617}
618
619/// Parallel correlation matrix computation
620///
621/// Efficiently computes correlation matrix for multiple variables using parallel processing
622/// and SIMD operations where available.
623#[allow(dead_code)]
624pub fn corrcoef_parallel<F, D>(data: &ArrayBase<D, Ix2>, rowvar: bool) -> StatsResult<Array2<F>>
625where
626    F: Float + NumCast + Send + Sync + SimdUnifiedOps,
627    D: Data<Elem = F> + Sync,
628{
629    use crate::correlation_simd::pearson_r_simd;
630
631    let (n_vars, n_obs) = if rowvar {
632        (data.nrows(), data.ncols())
633    } else {
634        (data.ncols(), data.nrows())
635    };
636
637    if n_obs < 2 {
638        return Err(StatsError::InvalidArgument(
639            "Need at least 2 observations to compute correlation".to_string(),
640        ));
641    }
642
643    let threshold = AdaptiveThreshold::new();
644    let parallel_threshold = threshold.calculate(
645        std::mem::size_of::<F>() * n_obs,
646        2.0, // Correlation is O(n) operation
647    );
648
649    let mut corr_matrix = Array2::zeros((n_vars, n_vars));
650
651    // Fill diagonal with 1s
652    for i in 0..n_vars {
653        corr_matrix[(i, i)] = F::one();
654    }
655
656    // Generate all unique pairs
657    let pairs: Vec<(usize, usize)> = (0..n_vars)
658        .flat_map(|i| (i + 1..n_vars).map(move |j| (i, j)))
659        .collect();
660
661    if pairs.len() * n_obs < parallel_threshold {
662        // Sequential computation
663        for (i, j) in pairs {
664            let var_i = if rowvar {
665                data.slice(s![i, ..])
666            } else {
667                data.slice(s![.., i])
668            };
669
670            let var_j = if rowvar {
671                data.slice(s![j, ..])
672            } else {
673                data.slice(s![.., j])
674            };
675
676            let corr = pearson_r_simd(&var_i, &var_j)?;
677            corr_matrix[(i, j)] = corr;
678            corr_matrix[(j, i)] = corr;
679        }
680    } else {
681        // Parallel computation
682        let correlations: Vec<((usize, usize), F)> = parallel_map(&pairs, |&(i, j)| {
683            let var_i = if rowvar {
684                data.slice(s![i, ..])
685            } else {
686                data.slice(s![.., i])
687            };
688
689            let var_j = if rowvar {
690                data.slice(s![j, ..])
691            } else {
692                data.slice(s![.., j])
693            };
694
695            let corr = pearson_r_simd(&var_i, &var_j)?;
696            Ok(((i, j), corr))
697        })
698        .into_iter()
699        .collect::<StatsResult<Vec<_>>>()?;
700
701        // Fill the correlation matrix
702        for ((i, j), corr) in correlations {
703            corr_matrix[(i, j)] = corr;
704            corr_matrix[(j, i)] = corr;
705        }
706    }
707
708    Ok(corr_matrix)
709}
710
711/// Parallel partial correlation computation
712///
713/// Computes partial correlations between multiple variables, controlling for other variables,
714/// using parallel processing for efficiency.
715#[allow(dead_code)]
716pub fn partial_corrcoef_parallel<F, D>(
717    data: &ArrayBase<D, Ix2>,
718    control_vars: &[usize],
719) -> StatsResult<Array2<F>>
720where
721    F: Float
722        + NumCast
723        + Send
724        + Sync
725        + SimdUnifiedOps
726        + scirs2_core::ndarray::ScalarOperand
727        + std::iter::Sum
728        + scirs2_core::numeric::NumAssign,
729    D: Data<Elem = F> + Sync,
730{
731    use scirs2_linalg::lstsq;
732
733    let n_vars = data.ncols();
734    let n_obs = data.nrows();
735
736    if control_vars.is_empty() {
737        // No control variables, just compute regular correlation
738        return corrcoef_parallel(data, false);
739    }
740
741    // Check control variables are valid
742    for &idx in control_vars {
743        if idx >= n_vars {
744            return Err(StatsError::InvalidArgument(format!(
745                "Control variable index {} out of bounds",
746                idx
747            )));
748        }
749    }
750
751    let threshold = AdaptiveThreshold::new();
752    let parallel_threshold = threshold.calculate(
753        std::mem::size_of::<F>() * n_obs * control_vars.len(),
754        3.0, // Partial correlation is more complex
755    );
756
757    let mut partial_corr = Array2::zeros((n_vars, n_vars));
758
759    // Fill diagonal
760    for i in 0..n_vars {
761        partial_corr[(i, i)] = F::one();
762    }
763
764    // Create control matrix
765    let control_matrix = data.select(scirs2_core::ndarray::Axis(1), control_vars);
766
767    // Generate pairs excluding control variables
768    let pairs: Vec<(usize, usize)> = (0..n_vars)
769        .filter(|i| !control_vars.contains(i))
770        .flat_map(|i| {
771            (i + 1..n_vars)
772                .filter(|j| !control_vars.contains(j))
773                .map(move |j| (i, j))
774        })
775        .collect();
776
777    if pairs.len() * n_obs * control_vars.len() < parallel_threshold {
778        // Sequential computation
779        for (i, j) in pairs {
780            let var_i = data.slice(s![.., i]);
781            let var_j = data.slice(s![.., j]);
782
783            // Regress out control variables and compute residuals
784            let solution_i = lstsq(&control_matrix.view(), &var_i.view(), None)
785                .map_err(|e| StatsError::ComputationError(format!("Regression failed: {}", e)))?;
786            let predicted_i = control_matrix.dot(&solution_i.x);
787            let residuals_i = &var_i - &predicted_i;
788
789            let solution_j = lstsq(&control_matrix.view(), &var_j.view(), None)
790                .map_err(|e| StatsError::ComputationError(format!("Regression failed: {}", e)))?;
791            let predicted_j = control_matrix.dot(&solution_j.x);
792            let residuals_j = &var_j - &predicted_j;
793
794            // Compute correlation of residuals
795            use crate::correlation_simd::pearson_r_simd;
796            let partial_r = pearson_r_simd(&residuals_i, &residuals_j)?;
797
798            partial_corr[(i, j)] = partial_r;
799            partial_corr[(j, i)] = partial_r;
800        }
801    } else {
802        // Parallel computation
803        let partial_correlations: Vec<((usize, usize), F)> = parallel_map(&pairs, |&(i, j)| {
804            let var_i = data.slice(s![.., i]);
805            let var_j = data.slice(s![.., j]);
806
807            // Regress out control variables and compute residuals
808            let solution_i = lstsq(&control_matrix.view(), &var_i.view(), None)
809                .map_err(|e| StatsError::ComputationError(format!("Regression failed: {}", e)))?;
810            let predicted_i = control_matrix.dot(&solution_i.x);
811            let residuals_i = &var_i - &predicted_i;
812
813            let solution_j = lstsq(&control_matrix.view(), &var_j.view(), None)
814                .map_err(|e| StatsError::ComputationError(format!("Regression failed: {}", e)))?;
815            let predicted_j = control_matrix.dot(&solution_j.x);
816            let residuals_j = &var_j - &predicted_j;
817
818            // Compute correlation of residuals
819            use crate::correlation_simd::pearson_r_simd;
820            let partial_r = pearson_r_simd(&residuals_i, &residuals_j)?;
821
822            Ok(((i, j), partial_r))
823        })
824        .into_iter()
825        .collect::<StatsResult<Vec<_>>>()?;
826
827        // Fill the matrix
828        for ((i, j), corr) in partial_correlations {
829            partial_corr[(i, j)] = corr;
830            partial_corr[(j, i)] = corr;
831        }
832    }
833
834    Ok(partial_corr)
835}
836
837/// Parallel autocorrelation computation
838///
839/// Computes autocorrelation function (ACF) for time series data using parallel processing.
840#[allow(dead_code)]
841pub fn autocorrelation_parallel<F, D>(
842    data: &ArrayBase<D, Ix1>,
843    max_lag: usize,
844) -> StatsResult<Array1<F>>
845where
846    F: Float + NumCast + Send + Sync + SimdUnifiedOps,
847    D: Data<Elem = F> + Sync,
848{
849    let n = data.len();
850    if max_lag >= n {
851        return Err(StatsError::InvalidArgument(
852            "max_lag must be less than data length".to_string(),
853        ));
854    }
855
856    let threshold = AdaptiveThreshold::new();
857    let parallel_threshold = threshold.calculate(
858        std::mem::size_of::<F>() * n,
859        1.5, // ACF complexity
860    );
861
862    // Compute mean and variance
863    let mean = mean_simd(data)?;
864    let variance = variance_simd(data, 0)?;
865
866    if variance <= F::epsilon() {
867        return Err(StatsError::InvalidArgument(
868            "Cannot compute autocorrelation for constant series".to_string(),
869        ));
870    }
871
872    let lags: Vec<usize> = (0..=max_lag).collect();
873
874    let autocorr = if max_lag * n < parallel_threshold {
875        // Sequential computation
876        lags.iter()
877            .map(|&lag| {
878                if lag == 0 {
879                    F::one()
880                } else {
881                    let mut sum = F::zero();
882                    for i in 0..n - lag {
883                        sum = sum + (data[i] - mean) * (data[i + lag] - mean);
884                    }
885                    sum / (F::from(n - lag).unwrap() * variance)
886                }
887            })
888            .collect()
889    } else {
890        // Parallel computation
891        parallel_map(&lags, |&lag| {
892            if lag == 0 {
893                F::one()
894            } else if F::simd_available() && n - lag > 64 {
895                // Use SIMD for large lags
896                let data_start = data.slice(s![..n - lag]);
897                let data_lagged = data.slice(s![lag..]);
898
899                let mean_array = scirs2_core::ndarray::Array1::from_elem(n - lag, mean);
900                let start_centered = F::simd_sub(&data_start, &mean_array.view());
901                let lagged_centered = F::simd_sub(&data_lagged, &mean_array.view());
902
903                let products = F::simd_mul(&start_centered.view(), &lagged_centered.view());
904                let sum = F::simd_sum(&products.view());
905
906                sum / (F::from(n - lag).unwrap() * variance)
907            } else {
908                // Scalar fallback
909                let mut sum = F::zero();
910                for i in 0..n - lag {
911                    sum = sum + (data[i] - mean) * (data[i + lag] - mean);
912                }
913                sum / (F::from(n - lag).unwrap() * variance)
914            }
915        })
916    };
917
918    Ok(Array1::from_vec(autocorr))
919}
920
921#[cfg(test)]
922mod tests {
923    use super::*;
924    use approx::assert_relative_eq;
925    use scirs2_core::ndarray::array;
926
927    #[test]
928    #[ignore = "timeout"]
929    fn test_parallel_histogram() {
930        let data = Array1::from_vec((0..10000).map(|i| i as f64 / 100.0).collect());
931        let hist = ParallelHistogram::new(&data.view(), 10).unwrap();
932
933        let (bins, counts) = hist.get_histogram();
934        assert_eq!(bins.len(), 10);
935        assert_eq!(counts.iter().sum::<usize>(), 10000);
936    }
937
938    #[test]
939    fn test_parallel_kde() {
940        let data = array![1.0, 2.0, 3.0, 4.0, 5.0];
941        let eval_points = Array1::linspace(0.0, 6.0, 100);
942
943        let kde_result = kde_parallel(&data.view(), &eval_points, 0.5).unwrap();
944        assert_eq!(kde_result.len(), 100);
945
946        // KDE should have maximum near data points
947        let max_idx = kde_result
948            .iter()
949            .enumerate()
950            .max_by(|(_, a), (_, b)| a.partial_cmp(b).unwrap())
951            .map(|(idx_, _)| idx_)
952            .unwrap();
953
954        assert!(max_idx > 40 && max_idx < 60); // Maximum should be near middle
955    }
956
957    #[test]
958    fn test_moving_stats() {
959        let data = Array1::from_vec((0..100).map(|i| i as f64).collect());
960        let moving_stats = ParallelMovingStats::new(&data.view(), 10).unwrap();
961
962        let moving_mean = moving_stats.moving_mean().unwrap();
963        assert_eq!(moving_mean.len(), 91); // 100 - 10 + 1
964
965        // First moving average should be mean of 0..10
966        assert_relative_eq!(moving_mean[0], 4.5, epsilon = 1e-10);
967
968        // Last moving average should be mean of 90..100
969        assert_relative_eq!(moving_mean[90], 94.5, epsilon = 1e-10);
970    }
971
972    #[test]
973    fn test_pairwise_distances() {
974        let data = array![[0.0, 0.0], [1.0, 0.0], [0.0, 1.0], [1.0, 1.0],];
975
976        let distances = pairwise_distances_parallel(&data.view(), "euclidean").unwrap();
977
978        // Check diagonal is zero
979        for i in 0..4 {
980            assert_relative_eq!(distances[(i, i)], 0.0, epsilon = 1e-10);
981        }
982
983        // Check specific distances
984        assert_relative_eq!(distances[(0, 1)], 1.0, epsilon = 1e-10);
985        assert_relative_eq!(distances[(0, 3)], 2.0_f64.sqrt(), epsilon = 1e-10);
986
987        // Check symmetry
988        for i in 0..4 {
989            for j in 0..4 {
990                assert_relative_eq!(distances[(i, j)], distances[(j, i)], epsilon = 1e-10);
991            }
992        }
993    }
994}