Skip to main content

scirs2_stats/
parallel_stats_enhanced.rs

1//! Enhanced parallel statistical operations with adaptive processing
2//!
3//! This module provides enhanced parallel implementations with:
4//! - Adaptive threshold selection based on system capabilities
5//! - Better load balancing for heterogeneous data
6//! - Cache-aware chunking strategies
7//! - NUMA-aware processing for large systems
8
9use crate::descriptive_simd::{mean_simd, variance_simd};
10use crate::error::{StatsError, StatsResult};
11use scirs2_core::ndarray::{s, Array1, Array2, ArrayBase, ArrayView2, Data, Ix1, Ix2};
12use scirs2_core::numeric::{Float, NumCast};
13use scirs2_core::parallel_ops::{num_threads, par_chunks, parallel_map, ParallelIterator};
14use scirs2_core::simd_ops::{PlatformCapabilities, SimdUnifiedOps};
15use std::f64::consts::PI;
16use std::sync::Arc;
17
18/// Adaptive threshold calculator based on system capabilities
19pub struct AdaptiveThreshold {
20    base_threshold: usize,
21    cpu_cores: usize,
22    simd_available: bool,
23    cache_linesize: usize,
24}
25
26impl Default for AdaptiveThreshold {
27    fn default() -> Self {
28        Self::new()
29    }
30}
31
32impl AdaptiveThreshold {
33    /// Create a new adaptive threshold calculator
34    pub fn new() -> Self {
35        let caps = PlatformCapabilities::detect();
36        Self {
37            base_threshold: 10_000,
38            cpu_cores: num_threads(),
39            simd_available: caps.simd_available,
40            cache_linesize: 64, // Common cache line size
41        }
42    }
43
44    /// Calculate the optimal threshold for parallel processing
45    pub fn calculate(&self, elementsize: usize, operationcomplexity: f64) -> usize {
46        // Adjust threshold based on:
47        // 1. Number of CPU cores
48        // 2. SIMD availability (SIMD reduces the need for parallelism)
49        // 3. Operation _complexity
50        // 4. Cache efficiency
51
52        let simd_factor = if self.simd_available { 2.0 } else { 1.0 };
53        let core_factor = (self.cpu_cores as f64).sqrt();
54        let cache_factor = (self.cache_linesize / elementsize).max(1) as f64;
55
56        let adjusted_threshold =
57            (self.base_threshold as f64 * simd_factor / core_factor / operationcomplexity
58                * cache_factor.sqrt()) as usize;
59
60        adjusted_threshold.max(1000) // Minimum threshold
61    }
62
63    /// Calculate optimal chunk size for cache efficiency
64    pub fn optimal_chunksize(&self, total_elements: usize, elementsize: usize) -> usize {
65        // L1 cache is typically 32KB per core
66        let l1_cachesize = 32 * 1024;
67        let elements_per_cache = l1_cachesize / elementsize;
68
69        // Optimal chunk size should fit in L1 cache
70        let ideal_chunk = elements_per_cache / 2; // Leave room for other data
71
72        // Balance between cache efficiency and parallelism
73        let min_chunks = self.cpu_cores * 4; // At least 4 chunks per core
74        let max_chunksize = total_elements / min_chunks;
75
76        ideal_chunk.min(max_chunksize).max(64)
77    }
78}
79
80/// Parallel histogram computation with adaptive binning
81pub struct ParallelHistogram<F: Float> {
82    bins: Vec<F>,
83    counts: Vec<usize>,
84    min_val: F,
85    max_val: F,
86    n_bins: usize,
87}
88
89impl<F: Float + NumCast + Send + Sync + std::fmt::Display> ParallelHistogram<F> {
90    /// Create a new parallel histogram
91    pub fn new<D>(data: &ArrayBase<D, Ix1>, nbins: usize) -> StatsResult<Self>
92    where
93        D: Data<Elem = F> + Sync,
94    {
95        if data.is_empty() {
96            return Err(StatsError::InvalidArgument(
97                "Cannot create histogram from empty data".to_string(),
98            ));
99        }
100
101        let threshold = AdaptiveThreshold::new();
102        let parallel_threshold = threshold.calculate(std::mem::size_of::<F>(), 1.0);
103
104        // Find min/max in parallel if data is large enough
105        let (min_val, max_val) = if data.len() >= parallel_threshold {
106            let chunksize = threshold.optimal_chunksize(data.len(), std::mem::size_of::<F>());
107
108            let (min, max) = par_chunks(data.as_slice().expect("Operation failed"), chunksize)
109                .map(|chunk| {
110                    let mut local_min = chunk[0];
111                    let mut local_max = chunk[0];
112                    for &val in chunk.iter().skip(1) {
113                        if val < local_min {
114                            local_min = val;
115                        }
116                        if val > local_max {
117                            local_max = val;
118                        }
119                    }
120                    (local_min, local_max)
121                })
122                .reduce(
123                    || (F::infinity(), F::neg_infinity()),
124                    |(min1, max1), (min2, max2)| {
125                        (
126                            if min1 < min2 { min1 } else { min2 },
127                            if max1 > max2 { max1 } else { max2 },
128                        )
129                    },
130                );
131            (min, max)
132        } else {
133            // Sequential for small data
134            let mut min = data[0];
135            let mut max = data[0];
136            for &val in data.iter().skip(1) {
137                if val < min {
138                    min = val;
139                }
140                if val > max {
141                    max = val;
142                }
143            }
144            (min, max)
145        };
146
147        // Create _bins
148        let range = max_val - min_val;
149        let bin_width = range / F::from(nbins).expect("Failed to convert to float");
150
151        let bins: Vec<F> = (0..=nbins)
152            .map(|i| min_val + bin_width * F::from(i).expect("Failed to convert to float"))
153            .collect();
154
155        let mut histogram = Self {
156            bins,
157            counts: vec![0; nbins],
158            min_val,
159            max_val,
160            n_bins: nbins,
161        };
162
163        // Compute counts
164        histogram.compute_counts(data)?;
165
166        Ok(histogram)
167    }
168
169    /// Compute histogram counts in parallel
170    fn compute_counts<D>(&mut self, data: &ArrayBase<D, Ix1>) -> StatsResult<()>
171    where
172        D: Data<Elem = F> + Sync,
173    {
174        let threshold = AdaptiveThreshold::new();
175        let parallel_threshold = threshold.calculate(std::mem::size_of::<F>(), 2.0);
176
177        if data.len() < parallel_threshold {
178            // Sequential computation
179            let bin_width = (self.max_val - self.min_val)
180                / F::from(self.n_bins).expect("Failed to convert to float");
181
182            for &val in data.iter() {
183                if val >= self.min_val && val <= self.max_val {
184                    let bin_idx = ((val - self.min_val) / bin_width)
185                        .floor()
186                        .to_usize()
187                        .expect("Operation failed")
188                        .min(self.n_bins - 1);
189                    self.counts[bin_idx] += 1;
190                }
191            }
192        } else {
193            // Parallel computation with thread-local histograms
194            let chunksize = threshold.optimal_chunksize(data.len(), std::mem::size_of::<F>());
195            let bin_width = (self.max_val - self.min_val)
196                / F::from(self.n_bins).expect("Failed to convert to float");
197            let n_bins = self.n_bins;
198            let min_val = self.min_val;
199
200            // Each thread maintains its own histogram
201            let local_histograms: Vec<Vec<usize>> =
202                par_chunks(data.as_slice().expect("Operation failed"), chunksize)
203                    .map(|chunk| {
204                        let mut local_counts = vec![0; n_bins];
205
206                        for &val in chunk {
207                            if val >= min_val && val <= self.max_val {
208                                let bin_idx = ((val - min_val) / bin_width)
209                                    .floor()
210                                    .to_usize()
211                                    .expect("Operation failed")
212                                    .min(n_bins - 1);
213                                local_counts[bin_idx] += 1;
214                            }
215                        }
216
217                        local_counts
218                    })
219                    .collect();
220
221            // Merge local histograms
222            for local_counts in local_histograms {
223                for (i, count) in local_counts.into_iter().enumerate() {
224                    self.counts[i] += count;
225                }
226            }
227        }
228
229        Ok(())
230    }
231
232    /// Get histogram bins and counts
233    pub fn get_histogram(&self) -> (&[F], &[usize]) {
234        (&self.bins[..self.n_bins], &self.counts)
235    }
236}
237
238/// Parallel kernel density estimation
239#[allow(dead_code)]
240pub fn kde_parallel<F, D>(
241    data: &ArrayBase<D, Ix1>,
242    eval_points: &Array1<F>,
243    bandwidth: F,
244) -> StatsResult<Array1<F>>
245where
246    F: Float + NumCast + Send + Sync + SimdUnifiedOps,
247    D: Data<Elem = F> + Sync,
248{
249    if data.is_empty() || eval_points.is_empty() {
250        return Err(StatsError::InvalidArgument("Empty input data".to_string()));
251    }
252
253    let n = data.len();
254    let threshold = AdaptiveThreshold::new();
255    let parallel_threshold = threshold.calculate(std::mem::size_of::<F>(), 3.0);
256
257    // Gaussian kernel constant
258    let norm_const = F::one()
259        / (F::from(2.0 * PI)
260            .expect("Failed to convert to float")
261            .sqrt()
262            * bandwidth
263            * F::from(n).expect("Failed to convert to float"));
264
265    if eval_points.len() * n < parallel_threshold {
266        // Sequential KDE
267        let mut result = Array1::zeros(eval_points.len());
268
269        for (i, &x) in eval_points.iter().enumerate() {
270            let mut density = F::zero();
271            for &xi in data.iter() {
272                let u = (x - xi) / bandwidth;
273                density = density
274                    + (-u * u / F::from(2.0).expect("Failed to convert constant to float")).exp();
275            }
276            result[i] = density * norm_const;
277        }
278
279        Ok(result)
280    } else {
281        // Parallel KDE
282        let eval_vec: Vec<F> = eval_points.to_vec();
283        let data_slice = data.as_slice().expect("Operation failed");
284
285        let densities: Vec<F> = parallel_map(&eval_vec, |&x| {
286            let chunksize = threshold.optimal_chunksize(n, std::mem::size_of::<F>());
287
288            // Compute density for this evaluation point in parallel
289            let density: F = par_chunks(data_slice, chunksize)
290                .map(|chunk| {
291                    let mut local_sum = F::zero();
292                    for &xi in chunk {
293                        let u = (x - xi) / bandwidth;
294                        local_sum = local_sum
295                            + (-u * u / F::from(2.0).expect("Failed to convert constant to float"))
296                                .exp();
297                    }
298                    local_sum
299                })
300                .reduce(|| F::zero(), |a, b| a + b);
301
302            density * norm_const
303        });
304
305        Ok(Array1::from_vec(densities))
306    }
307}
308
309/// Parallel moving statistics (rolling mean, std, etc.)
310pub struct ParallelMovingStats<F: Float> {
311    windowsize: usize,
312    data: Arc<Vec<F>>,
313}
314
315impl<F: Float + NumCast + Send + Sync + SimdUnifiedOps + std::fmt::Display> ParallelMovingStats<F> {
316    /// Create a new moving statistics calculator
317    pub fn new<D>(data: &ArrayBase<D, Ix1>, windowsize: usize) -> StatsResult<Self>
318    where
319        D: Data<Elem = F>,
320    {
321        if windowsize == 0 || windowsize > data.len() {
322            return Err(StatsError::InvalidArgument(
323                "Invalid window size".to_string(),
324            ));
325        }
326
327        Ok(Self {
328            windowsize,
329            data: Arc::new(data.to_vec()),
330        })
331    }
332
333    /// Compute moving average in parallel
334    pub fn moving_mean(&self) -> StatsResult<Array1<F>> {
335        let n = self.data.len();
336        let output_len = n - self.windowsize + 1;
337
338        let threshold = AdaptiveThreshold::new();
339        let parallel_threshold = threshold.calculate(std::mem::size_of::<F>(), 2.0);
340
341        if output_len < parallel_threshold {
342            // Sequential computation
343            let mut result = Array1::zeros(output_len);
344            let windowsize_f = F::from(self.windowsize).expect("Failed to convert to float");
345
346            // Initial window sum
347            let mut window_sum = self.data[..self.windowsize]
348                .iter()
349                .fold(F::zero(), |acc, &x| acc + x);
350            result[0] = window_sum / windowsize_f;
351
352            // Sliding window
353            for i in 1..output_len {
354                window_sum = window_sum - self.data[i - 1] + self.data[i + self.windowsize - 1];
355                result[i] = window_sum / windowsize_f;
356            }
357
358            Ok(result)
359        } else {
360            // Parallel computation
361            let indices: Vec<usize> = (0..output_len).collect();
362            let data_ref = Arc::clone(&self.data);
363            let windowsize = self.windowsize;
364            let windowsize_f = F::from(windowsize).expect("Failed to convert to float");
365
366            let means: Vec<F> = parallel_map(&indices, |&i| {
367                let window_sum = data_ref[i..i + windowsize]
368                    .iter()
369                    .fold(F::zero(), |acc, &x| acc + x);
370                window_sum / windowsize_f
371            });
372
373            Ok(Array1::from_vec(means))
374        }
375    }
376
377    /// Compute moving standard deviation in parallel
378    pub fn moving_std(&self, ddof: usize) -> StatsResult<Array1<F>> {
379        let n = self.data.len();
380        let output_len = n - self.windowsize + 1;
381
382        if self.windowsize <= ddof {
383            return Err(StatsError::InvalidArgument(
384                "Window size must be greater than ddof".to_string(),
385            ));
386        }
387
388        let threshold = AdaptiveThreshold::new();
389        let parallel_threshold = threshold.calculate(std::mem::size_of::<F>(), 3.0);
390
391        if output_len < parallel_threshold {
392            // Sequential computation using Welford's algorithm
393            let mut result = Array1::zeros(output_len);
394            let divisor = F::from(self.windowsize - ddof).expect("Failed to convert to float");
395
396            for i in 0..output_len {
397                let window = &self.data[i..i + self.windowsize];
398                let mean = window.iter().fold(F::zero(), |acc, &x| acc + x)
399                    / F::from(self.windowsize).expect("Failed to convert to float");
400
401                let variance = window
402                    .iter()
403                    .map(|&x| {
404                        let diff = x - mean;
405                        diff * diff
406                    })
407                    .fold(F::zero(), |acc, x| acc + x)
408                    / divisor;
409
410                result[i] = variance.sqrt();
411            }
412
413            Ok(result)
414        } else {
415            // Parallel computation
416            let indices: Vec<usize> = (0..output_len).collect();
417            let data_ref = Arc::clone(&self.data);
418            let windowsize = self.windowsize;
419            let divisor = F::from(windowsize - ddof).expect("Failed to convert to float");
420
421            let stds: Vec<F> = parallel_map(&indices, |&i| {
422                let window = &data_ref[i..i + windowsize];
423                let mean = window.iter().fold(F::zero(), |acc, &x| acc + x)
424                    / F::from(windowsize).expect("Failed to convert to float");
425
426                let variance = window
427                    .iter()
428                    .map(|&x| {
429                        let diff = x - mean;
430                        diff * diff
431                    })
432                    .fold(F::zero(), |acc, x| acc + x)
433                    / divisor;
434
435                variance.sqrt()
436            });
437
438            Ok(Array1::from_vec(stds))
439        }
440    }
441}
442
443/// Parallel computation of pairwise distances
444#[allow(dead_code)]
445pub fn pairwise_distances_parallel<F, D>(
446    x: &ArrayBase<D, Ix2>,
447    metric: &str,
448) -> StatsResult<Array2<F>>
449where
450    F: Float + NumCast + Send + Sync + SimdUnifiedOps,
451    D: Data<Elem = F> + Sync,
452{
453    let n = x.nrows();
454    let d = x.ncols();
455
456    let threshold = AdaptiveThreshold::new();
457    let parallel_threshold = threshold.calculate(std::mem::size_of::<F>() * d, 2.0);
458
459    if n * n < parallel_threshold {
460        // Sequential computation
461        let mut distances = Array2::zeros((n, n));
462
463        for i in 0..n {
464            for j in i + 1..n {
465                let dist = match metric {
466                    "euclidean" => {
467                        let mut sum = F::zero();
468                        for k in 0..d {
469                            let diff = x[(i, k)] - x[(j, k)];
470                            sum = sum + diff * diff;
471                        }
472                        sum.sqrt()
473                    }
474                    "manhattan" => {
475                        let mut sum = F::zero();
476                        for k in 0..d {
477                            sum = sum + (x[(i, k)] - x[(j, k)]).abs();
478                        }
479                        sum
480                    }
481                    _ => return Err(StatsError::InvalidArgument("Unknown metric".to_string())),
482                };
483
484                distances[(i, j)] = dist;
485                distances[(j, i)] = dist;
486            }
487        }
488
489        Ok(distances)
490    } else {
491        // Parallel computation
492        let mut distances = Array2::zeros((n, n));
493        let pairs: Vec<(usize, usize)> = (0..n)
494            .flat_map(|i| (i + 1..n).map(move |j| (i, j)))
495            .collect();
496
497        let computed_distances: Vec<((usize, usize), F)> = parallel_map(&pairs, |&(i, j)| {
498            let dist = match metric {
499                "euclidean" => {
500                    if d > 8 && F::simd_available() {
501                        // Use SIMD for distance computation
502                        let row_i = x.slice(s![i, ..]);
503                        let row_j = x.slice(s![j, ..]);
504                        let diff = F::simd_sub(&row_i, &row_j);
505                        let squared = F::simd_mul(&diff.view(), &diff.view());
506                        F::simd_sum(&squared.view()).sqrt()
507                    } else {
508                        let mut sum = F::zero();
509                        for k in 0..d {
510                            let diff = x[(i, k)] - x[(j, k)];
511                            sum = sum + diff * diff;
512                        }
513                        sum.sqrt()
514                    }
515                }
516                "manhattan" => {
517                    let mut sum = F::zero();
518                    for k in 0..d {
519                        sum = sum + (x[(i, k)] - x[(j, k)]).abs();
520                    }
521                    sum
522                }
523                _ => F::nan(), // Should not reach here
524            };
525            ((i, j), dist)
526        });
527
528        // Fill the distance matrix
529        for ((i, j), dist) in computed_distances {
530            distances[(i, j)] = dist;
531            distances[(j, i)] = dist;
532        }
533
534        Ok(distances)
535    }
536}
537
538/// Parallel cross-validation for model evaluation
539pub struct ParallelCrossValidation<F: Float> {
540    n_folds: usize,
541    shuffle: bool,
542    random_state: Option<u64>,
543    _phantom: std::marker::PhantomData<F>,
544}
545
546impl<F: Float + NumCast + Send + Sync + std::fmt::Display> ParallelCrossValidation<F> {
547    /// Create a new cross-validation splitter
548    pub fn new(n_folds: usize, shuffle: bool, randomstate: Option<u64>) -> Self {
549        Self {
550            n_folds,
551            shuffle,
552            random_state: randomstate,
553            _phantom: std::marker::PhantomData,
554        }
555    }
556
557    /// Perform parallel cross-validation
558    pub fn cross_val_score<D, M, S>(
559        &self,
560        x: &ArrayBase<D, Ix2>,
561        y: &Array1<F>,
562        model: M,
563        scorer: S,
564    ) -> StatsResult<Array1<F>>
565    where
566        D: Data<Elem = F> + Sync,
567        M: Fn(&ArrayView2<F>, &Array1<F>) -> StatsResult<()> + Send + Sync + Clone,
568        S: Fn(&ArrayView2<F>, &Array1<F>) -> StatsResult<F> + Send + Sync,
569    {
570        let n_samples_ = x.nrows();
571        if n_samples_ != y.len() {
572            return Err(StatsError::DimensionMismatch(
573                "X and y must have same number of samples".to_string(),
574            ));
575        }
576
577        // Create fold indices
578        let indices: Vec<usize> = if self.shuffle {
579            use crate::random::permutation_int;
580            permutation_int(n_samples_, self.random_state)
581                .expect("Operation failed")
582                .to_vec()
583        } else {
584            (0..n_samples_).collect()
585        };
586
587        // Split into folds
588        let foldsize = n_samples_ / self.n_folds;
589        let remainder = n_samples_ % self.n_folds;
590
591        let fold_indices: Vec<(usize, usize)> = (0..self.n_folds)
592            .map(|i| {
593                let start = i * foldsize + i.min(remainder);
594                let size = foldsize + if i < remainder { 1 } else { 0 };
595                (start, start + size)
596            })
597            .collect();
598
599        // Parallel cross-validation
600        let scores: Vec<F> = parallel_map(&fold_indices, |&(test_start, test_end)| {
601            // Create train/test splits
602            let test_indices = &indices[test_start..test_end];
603            let train_indices: Vec<usize> = indices[..test_start]
604                .iter()
605                .chain(indices[test_end..].iter())
606                .copied()
607                .collect();
608
609            // Create train/test data
610            let x_train = x.select(scirs2_core::ndarray::Axis(0), &train_indices);
611            let y_train = y.select(scirs2_core::ndarray::Axis(0), &train_indices);
612            let x_test = x.select(scirs2_core::ndarray::Axis(0), test_indices);
613            let y_test = y.select(scirs2_core::ndarray::Axis(0), test_indices);
614
615            // Train model on fold
616            let fold_model = model.clone();
617            fold_model(&x_train.view(), &y_train)?;
618
619            // Score on test set
620            scorer(&x_test.view(), &y_test)
621        })
622        .into_iter()
623        .collect::<StatsResult<Vec<_>>>()?;
624
625        Ok(Array1::from_vec(scores))
626    }
627}
628
629/// Parallel correlation matrix computation
630///
631/// Efficiently computes correlation matrix for multiple variables using parallel processing
632/// and SIMD operations where available.
633#[allow(dead_code)]
634pub fn corrcoef_parallel<F, D>(data: &ArrayBase<D, Ix2>, rowvar: bool) -> StatsResult<Array2<F>>
635where
636    F: Float + NumCast + Send + Sync + SimdUnifiedOps,
637    D: Data<Elem = F> + Sync,
638{
639    use crate::correlation_simd::pearson_r_simd;
640
641    let (n_vars, n_obs) = if rowvar {
642        (data.nrows(), data.ncols())
643    } else {
644        (data.ncols(), data.nrows())
645    };
646
647    if n_obs < 2 {
648        return Err(StatsError::InvalidArgument(
649            "Need at least 2 observations to compute correlation".to_string(),
650        ));
651    }
652
653    let threshold = AdaptiveThreshold::new();
654    let parallel_threshold = threshold.calculate(
655        std::mem::size_of::<F>() * n_obs,
656        2.0, // Correlation is O(n) operation
657    );
658
659    let mut corr_matrix = Array2::zeros((n_vars, n_vars));
660
661    // Fill diagonal with 1s
662    for i in 0..n_vars {
663        corr_matrix[(i, i)] = F::one();
664    }
665
666    // Generate all unique pairs
667    let pairs: Vec<(usize, usize)> = (0..n_vars)
668        .flat_map(|i| (i + 1..n_vars).map(move |j| (i, j)))
669        .collect();
670
671    if pairs.len() * n_obs < parallel_threshold {
672        // Sequential computation
673        for (i, j) in pairs {
674            let var_i = if rowvar {
675                data.slice(s![i, ..])
676            } else {
677                data.slice(s![.., i])
678            };
679
680            let var_j = if rowvar {
681                data.slice(s![j, ..])
682            } else {
683                data.slice(s![.., j])
684            };
685
686            let corr = pearson_r_simd(&var_i, &var_j)?;
687            corr_matrix[(i, j)] = corr;
688            corr_matrix[(j, i)] = corr;
689        }
690    } else {
691        // Parallel computation
692        let correlations: Vec<((usize, usize), F)> = parallel_map(&pairs, |&(i, j)| {
693            let var_i = if rowvar {
694                data.slice(s![i, ..])
695            } else {
696                data.slice(s![.., i])
697            };
698
699            let var_j = if rowvar {
700                data.slice(s![j, ..])
701            } else {
702                data.slice(s![.., j])
703            };
704
705            let corr = pearson_r_simd(&var_i, &var_j)?;
706            Ok(((i, j), corr))
707        })
708        .into_iter()
709        .collect::<StatsResult<Vec<_>>>()?;
710
711        // Fill the correlation matrix
712        for ((i, j), corr) in correlations {
713            corr_matrix[(i, j)] = corr;
714            corr_matrix[(j, i)] = corr;
715        }
716    }
717
718    Ok(corr_matrix)
719}
720
721/// Parallel partial correlation computation
722///
723/// Computes partial correlations between multiple variables, controlling for other variables,
724/// using parallel processing for efficiency.
725#[allow(dead_code)]
726pub fn partial_corrcoef_parallel<F, D>(
727    data: &ArrayBase<D, Ix2>,
728    control_vars: &[usize],
729) -> StatsResult<Array2<F>>
730where
731    F: Float
732        + NumCast
733        + Send
734        + Sync
735        + SimdUnifiedOps
736        + scirs2_core::ndarray::ScalarOperand
737        + std::iter::Sum
738        + scirs2_core::numeric::NumAssign,
739    D: Data<Elem = F> + Sync,
740{
741    use scirs2_linalg::lstsq;
742
743    let n_vars = data.ncols();
744    let n_obs = data.nrows();
745
746    if control_vars.is_empty() {
747        // No control variables, just compute regular correlation
748        return corrcoef_parallel(data, false);
749    }
750
751    // Check control variables are valid
752    for &idx in control_vars {
753        if idx >= n_vars {
754            return Err(StatsError::InvalidArgument(format!(
755                "Control variable index {} out of bounds",
756                idx
757            )));
758        }
759    }
760
761    let threshold = AdaptiveThreshold::new();
762    let parallel_threshold = threshold.calculate(
763        std::mem::size_of::<F>() * n_obs * control_vars.len(),
764        3.0, // Partial correlation is more complex
765    );
766
767    let mut partial_corr = Array2::zeros((n_vars, n_vars));
768
769    // Fill diagonal
770    for i in 0..n_vars {
771        partial_corr[(i, i)] = F::one();
772    }
773
774    // Create control matrix
775    let control_matrix = data.select(scirs2_core::ndarray::Axis(1), control_vars);
776
777    // Generate pairs excluding control variables
778    let pairs: Vec<(usize, usize)> = (0..n_vars)
779        .filter(|i| !control_vars.contains(i))
780        .flat_map(|i| {
781            (i + 1..n_vars)
782                .filter(|j| !control_vars.contains(j))
783                .map(move |j| (i, j))
784        })
785        .collect();
786
787    if pairs.len() * n_obs * control_vars.len() < parallel_threshold {
788        // Sequential computation
789        for (i, j) in pairs {
790            let var_i = data.slice(s![.., i]);
791            let var_j = data.slice(s![.., j]);
792
793            // Regress out control variables and compute residuals
794            let solution_i = lstsq(&control_matrix.view(), &var_i.view(), None)
795                .map_err(|e| StatsError::ComputationError(format!("Regression failed: {}", e)))?;
796            let predicted_i = control_matrix.dot(&solution_i.x);
797            let residuals_i = &var_i - &predicted_i;
798
799            let solution_j = lstsq(&control_matrix.view(), &var_j.view(), None)
800                .map_err(|e| StatsError::ComputationError(format!("Regression failed: {}", e)))?;
801            let predicted_j = control_matrix.dot(&solution_j.x);
802            let residuals_j = &var_j - &predicted_j;
803
804            // Compute correlation of residuals
805            use crate::correlation_simd::pearson_r_simd;
806            let partial_r = pearson_r_simd(&residuals_i, &residuals_j)?;
807
808            partial_corr[(i, j)] = partial_r;
809            partial_corr[(j, i)] = partial_r;
810        }
811    } else {
812        // Parallel computation
813        let partial_correlations: Vec<((usize, usize), F)> = parallel_map(&pairs, |&(i, j)| {
814            let var_i = data.slice(s![.., i]);
815            let var_j = data.slice(s![.., j]);
816
817            // Regress out control variables and compute residuals
818            let solution_i = lstsq(&control_matrix.view(), &var_i.view(), None)
819                .map_err(|e| StatsError::ComputationError(format!("Regression failed: {}", e)))?;
820            let predicted_i = control_matrix.dot(&solution_i.x);
821            let residuals_i = &var_i - &predicted_i;
822
823            let solution_j = lstsq(&control_matrix.view(), &var_j.view(), None)
824                .map_err(|e| StatsError::ComputationError(format!("Regression failed: {}", e)))?;
825            let predicted_j = control_matrix.dot(&solution_j.x);
826            let residuals_j = &var_j - &predicted_j;
827
828            // Compute correlation of residuals
829            use crate::correlation_simd::pearson_r_simd;
830            let partial_r = pearson_r_simd(&residuals_i, &residuals_j)?;
831
832            Ok(((i, j), partial_r))
833        })
834        .into_iter()
835        .collect::<StatsResult<Vec<_>>>()?;
836
837        // Fill the matrix
838        for ((i, j), corr) in partial_correlations {
839            partial_corr[(i, j)] = corr;
840            partial_corr[(j, i)] = corr;
841        }
842    }
843
844    Ok(partial_corr)
845}
846
847/// Parallel autocorrelation computation
848///
849/// Computes autocorrelation function (ACF) for time series data using parallel processing.
850#[allow(dead_code)]
851pub fn autocorrelation_parallel<F, D>(
852    data: &ArrayBase<D, Ix1>,
853    max_lag: usize,
854) -> StatsResult<Array1<F>>
855where
856    F: Float + NumCast + Send + Sync + SimdUnifiedOps,
857    D: Data<Elem = F> + Sync,
858{
859    let n = data.len();
860    if max_lag >= n {
861        return Err(StatsError::InvalidArgument(
862            "max_lag must be less than data length".to_string(),
863        ));
864    }
865
866    let threshold = AdaptiveThreshold::new();
867    let parallel_threshold = threshold.calculate(
868        std::mem::size_of::<F>() * n,
869        1.5, // ACF complexity
870    );
871
872    // Compute mean and variance
873    let mean = mean_simd(data)?;
874    let variance = variance_simd(data, 0)?;
875
876    if variance <= F::epsilon() {
877        return Err(StatsError::InvalidArgument(
878            "Cannot compute autocorrelation for constant series".to_string(),
879        ));
880    }
881
882    let lags: Vec<usize> = (0..=max_lag).collect();
883
884    let autocorr = if max_lag * n < parallel_threshold {
885        // Sequential computation
886        lags.iter()
887            .map(|&lag| {
888                if lag == 0 {
889                    F::one()
890                } else {
891                    let mut sum = F::zero();
892                    for i in 0..n - lag {
893                        sum = sum + (data[i] - mean) * (data[i + lag] - mean);
894                    }
895                    sum / (F::from(n - lag).expect("Failed to convert to float") * variance)
896                }
897            })
898            .collect()
899    } else {
900        // Parallel computation
901        parallel_map(&lags, |&lag| {
902            if lag == 0 {
903                F::one()
904            } else if F::simd_available() && n - lag > 64 {
905                // Use SIMD for large lags
906                let data_start = data.slice(s![..n - lag]);
907                let data_lagged = data.slice(s![lag..]);
908
909                let mean_array = scirs2_core::ndarray::Array1::from_elem(n - lag, mean);
910                let start_centered = F::simd_sub(&data_start, &mean_array.view());
911                let lagged_centered = F::simd_sub(&data_lagged, &mean_array.view());
912
913                let products = F::simd_mul(&start_centered.view(), &lagged_centered.view());
914                let sum = F::simd_sum(&products.view());
915
916                sum / (F::from(n - lag).expect("Failed to convert to float") * variance)
917            } else {
918                // Scalar fallback
919                let mut sum = F::zero();
920                for i in 0..n - lag {
921                    sum = sum + (data[i] - mean) * (data[i + lag] - mean);
922                }
923                sum / (F::from(n - lag).expect("Failed to convert to float") * variance)
924            }
925        })
926    };
927
928    Ok(Array1::from_vec(autocorr))
929}
930
931#[cfg(test)]
932mod tests {
933    use super::*;
934    use approx::assert_relative_eq;
935    use scirs2_core::ndarray::array;
936
937    #[test]
938    fn test_parallel_histogram() {
939        let data = Array1::from_vec((0..10000).map(|i| i as f64 / 100.0).collect());
940        let hist = ParallelHistogram::new(&data.view(), 10).expect("Operation failed");
941
942        let (bins, counts) = hist.get_histogram();
943        assert_eq!(bins.len(), 10);
944        assert_eq!(counts.iter().sum::<usize>(), 10000);
945    }
946
947    #[test]
948    fn test_parallel_kde() {
949        let data = array![1.0, 2.0, 3.0, 4.0, 5.0];
950        let eval_points = Array1::linspace(0.0, 6.0, 100);
951
952        let kde_result = kde_parallel(&data.view(), &eval_points, 0.5).expect("Operation failed");
953        assert_eq!(kde_result.len(), 100);
954
955        // KDE should have maximum near data points
956        let max_idx = kde_result
957            .iter()
958            .enumerate()
959            .max_by(|(_, a), (_, b)| a.partial_cmp(b).expect("Operation failed"))
960            .map(|(idx_, _)| idx_)
961            .expect("Operation failed");
962
963        assert!(max_idx > 40 && max_idx < 60); // Maximum should be near middle
964    }
965
966    #[test]
967    fn test_moving_stats() {
968        let data = Array1::from_vec((0..100).map(|i| i as f64).collect());
969        let moving_stats = ParallelMovingStats::new(&data.view(), 10).expect("Operation failed");
970
971        let moving_mean = moving_stats.moving_mean().expect("Operation failed");
972        assert_eq!(moving_mean.len(), 91); // 100 - 10 + 1
973
974        // First moving average should be mean of 0..10
975        assert_relative_eq!(moving_mean[0], 4.5, epsilon = 1e-10);
976
977        // Last moving average should be mean of 90..100
978        assert_relative_eq!(moving_mean[90], 94.5, epsilon = 1e-10);
979    }
980
981    #[test]
982    fn test_pairwise_distances() {
983        let data = array![[0.0, 0.0], [1.0, 0.0], [0.0, 1.0], [1.0, 1.0],];
984
985        let distances =
986            pairwise_distances_parallel(&data.view(), "euclidean").expect("Operation failed");
987
988        // Check diagonal is zero
989        for i in 0..4 {
990            assert_relative_eq!(distances[(i, i)], 0.0, epsilon = 1e-10);
991        }
992
993        // Check specific distances
994        assert_relative_eq!(distances[(0, 1)], 1.0, epsilon = 1e-10);
995        assert_relative_eq!(distances[(0, 3)], 2.0_f64.sqrt(), epsilon = 1e-10);
996
997        // Check symmetry
998        for i in 0..4 {
999            for j in 0..4 {
1000                assert_relative_eq!(distances[(i, j)], distances[(j, i)], epsilon = 1e-10);
1001            }
1002        }
1003    }
1004}