1use crate::descriptive_simd::{mean_simd, variance_simd};
10use crate::error::{StatsError, StatsResult};
11use scirs2_core::ndarray::{s, Array1, Array2, ArrayBase, ArrayView2, Data, Ix1, Ix2};
12use scirs2_core::numeric::{Float, NumCast};
13use scirs2_core::parallel_ops::{num_threads, par_chunks, parallel_map, ParallelIterator};
14use scirs2_core::simd_ops::{PlatformCapabilities, SimdUnifiedOps};
15use std::f64::consts::PI;
16use std::sync::Arc;
17
18pub struct AdaptiveThreshold {
20 base_threshold: usize,
21 cpu_cores: usize,
22 simd_available: bool,
23 cache_linesize: usize,
24}
25
26impl Default for AdaptiveThreshold {
27 fn default() -> Self {
28 Self::new()
29 }
30}
31
32impl AdaptiveThreshold {
33 pub fn new() -> Self {
35 let caps = PlatformCapabilities::detect();
36 Self {
37 base_threshold: 10_000,
38 cpu_cores: num_threads(),
39 simd_available: caps.simd_available,
40 cache_linesize: 64, }
42 }
43
44 pub fn calculate(&self, elementsize: usize, operationcomplexity: f64) -> usize {
46 let simd_factor = if self.simd_available { 2.0 } else { 1.0 };
53 let core_factor = (self.cpu_cores as f64).sqrt();
54 let cache_factor = (self.cache_linesize / elementsize).max(1) as f64;
55
56 let adjusted_threshold =
57 (self.base_threshold as f64 * simd_factor / core_factor / operationcomplexity
58 * cache_factor.sqrt()) as usize;
59
60 adjusted_threshold.max(1000) }
62
63 pub fn optimal_chunksize(&self, total_elements: usize, elementsize: usize) -> usize {
65 let l1_cachesize = 32 * 1024;
67 let elements_per_cache = l1_cachesize / elementsize;
68
69 let ideal_chunk = elements_per_cache / 2; let min_chunks = self.cpu_cores * 4; let max_chunksize = total_elements / min_chunks;
75
76 ideal_chunk.min(max_chunksize).max(64)
77 }
78}
79
80pub struct ParallelHistogram<F: Float> {
82 bins: Vec<F>,
83 counts: Vec<usize>,
84 min_val: F,
85 max_val: F,
86 n_bins: usize,
87}
88
89impl<F: Float + NumCast + Send + Sync + std::fmt::Display> ParallelHistogram<F> {
90 pub fn new<D>(data: &ArrayBase<D, Ix1>, nbins: usize) -> StatsResult<Self>
92 where
93 D: Data<Elem = F> + Sync,
94 {
95 if data.is_empty() {
96 return Err(StatsError::InvalidArgument(
97 "Cannot create histogram from empty data".to_string(),
98 ));
99 }
100
101 let threshold = AdaptiveThreshold::new();
102 let parallel_threshold = threshold.calculate(std::mem::size_of::<F>(), 1.0);
103
104 let (min_val, max_val) = if data.len() >= parallel_threshold {
106 let chunksize = threshold.optimal_chunksize(data.len(), std::mem::size_of::<F>());
107
108 let (min, max) = par_chunks(data.as_slice().unwrap(), chunksize)
109 .map(|chunk| {
110 let mut local_min = chunk[0];
111 let mut local_max = chunk[0];
112 for &val in chunk.iter().skip(1) {
113 if val < local_min {
114 local_min = val;
115 }
116 if val > local_max {
117 local_max = val;
118 }
119 }
120 (local_min, local_max)
121 })
122 .reduce(
123 || (F::infinity(), F::neg_infinity()),
124 |(min1, max1), (min2, max2)| {
125 (
126 if min1 < min2 { min1 } else { min2 },
127 if max1 > max2 { max1 } else { max2 },
128 )
129 },
130 );
131 (min, max)
132 } else {
133 let mut min = data[0];
135 let mut max = data[0];
136 for &val in data.iter().skip(1) {
137 if val < min {
138 min = val;
139 }
140 if val > max {
141 max = val;
142 }
143 }
144 (min, max)
145 };
146
147 let range = max_val - min_val;
149 let bin_width = range / F::from(nbins).unwrap();
150
151 let bins: Vec<F> = (0..=nbins)
152 .map(|i| min_val + bin_width * F::from(i).unwrap())
153 .collect();
154
155 let mut histogram = Self {
156 bins,
157 counts: vec![0; nbins],
158 min_val,
159 max_val,
160 n_bins: nbins,
161 };
162
163 histogram.compute_counts(data)?;
165
166 Ok(histogram)
167 }
168
169 fn compute_counts<D>(&mut self, data: &ArrayBase<D, Ix1>) -> StatsResult<()>
171 where
172 D: Data<Elem = F> + Sync,
173 {
174 let threshold = AdaptiveThreshold::new();
175 let parallel_threshold = threshold.calculate(std::mem::size_of::<F>(), 2.0);
176
177 if data.len() < parallel_threshold {
178 let bin_width = (self.max_val - self.min_val) / F::from(self.n_bins).unwrap();
180
181 for &val in data.iter() {
182 if val >= self.min_val && val <= self.max_val {
183 let bin_idx = ((val - self.min_val) / bin_width)
184 .floor()
185 .to_usize()
186 .unwrap()
187 .min(self.n_bins - 1);
188 self.counts[bin_idx] += 1;
189 }
190 }
191 } else {
192 let chunksize = threshold.optimal_chunksize(data.len(), std::mem::size_of::<F>());
194 let bin_width = (self.max_val - self.min_val) / F::from(self.n_bins).unwrap();
195 let n_bins = self.n_bins;
196 let min_val = self.min_val;
197
198 let local_histograms: Vec<Vec<usize>> = par_chunks(data.as_slice().unwrap(), chunksize)
200 .map(|chunk| {
201 let mut local_counts = vec![0; n_bins];
202
203 for &val in chunk {
204 if val >= min_val && val <= self.max_val {
205 let bin_idx = ((val - min_val) / bin_width)
206 .floor()
207 .to_usize()
208 .unwrap()
209 .min(n_bins - 1);
210 local_counts[bin_idx] += 1;
211 }
212 }
213
214 local_counts
215 })
216 .collect();
217
218 for local_counts in local_histograms {
220 for (i, count) in local_counts.into_iter().enumerate() {
221 self.counts[i] += count;
222 }
223 }
224 }
225
226 Ok(())
227 }
228
229 pub fn get_histogram(&self) -> (&[F], &[usize]) {
231 (&self.bins[..self.n_bins], &self.counts)
232 }
233}
234
235#[allow(dead_code)]
237pub fn kde_parallel<F, D>(
238 data: &ArrayBase<D, Ix1>,
239 eval_points: &Array1<F>,
240 bandwidth: F,
241) -> StatsResult<Array1<F>>
242where
243 F: Float + NumCast + Send + Sync + SimdUnifiedOps,
244 D: Data<Elem = F> + Sync,
245{
246 if data.is_empty() || eval_points.is_empty() {
247 return Err(StatsError::InvalidArgument("Empty input data".to_string()));
248 }
249
250 let n = data.len();
251 let threshold = AdaptiveThreshold::new();
252 let parallel_threshold = threshold.calculate(std::mem::size_of::<F>(), 3.0);
253
254 let norm_const =
256 F::one() / (F::from(2.0 * PI).unwrap().sqrt() * bandwidth * F::from(n).unwrap());
257
258 if eval_points.len() * n < parallel_threshold {
259 let mut result = Array1::zeros(eval_points.len());
261
262 for (i, &x) in eval_points.iter().enumerate() {
263 let mut density = F::zero();
264 for &xi in data.iter() {
265 let u = (x - xi) / bandwidth;
266 density = density + (-u * u / F::from(2.0).unwrap()).exp();
267 }
268 result[i] = density * norm_const;
269 }
270
271 Ok(result)
272 } else {
273 let eval_vec: Vec<F> = eval_points.to_vec();
275 let data_slice = data.as_slice().unwrap();
276
277 let densities: Vec<F> = parallel_map(&eval_vec, |&x| {
278 let chunksize = threshold.optimal_chunksize(n, std::mem::size_of::<F>());
279
280 let density: F = par_chunks(data_slice, chunksize)
282 .map(|chunk| {
283 let mut local_sum = F::zero();
284 for &xi in chunk {
285 let u = (x - xi) / bandwidth;
286 local_sum = local_sum + (-u * u / F::from(2.0).unwrap()).exp();
287 }
288 local_sum
289 })
290 .reduce(|| F::zero(), |a, b| a + b);
291
292 density * norm_const
293 });
294
295 Ok(Array1::from_vec(densities))
296 }
297}
298
299pub struct ParallelMovingStats<F: Float> {
301 windowsize: usize,
302 data: Arc<Vec<F>>,
303}
304
305impl<F: Float + NumCast + Send + Sync + SimdUnifiedOps + std::fmt::Display> ParallelMovingStats<F> {
306 pub fn new<D>(data: &ArrayBase<D, Ix1>, windowsize: usize) -> StatsResult<Self>
308 where
309 D: Data<Elem = F>,
310 {
311 if windowsize == 0 || windowsize > data.len() {
312 return Err(StatsError::InvalidArgument(
313 "Invalid window size".to_string(),
314 ));
315 }
316
317 Ok(Self {
318 windowsize,
319 data: Arc::new(data.to_vec()),
320 })
321 }
322
323 pub fn moving_mean(&self) -> StatsResult<Array1<F>> {
325 let n = self.data.len();
326 let output_len = n - self.windowsize + 1;
327
328 let threshold = AdaptiveThreshold::new();
329 let parallel_threshold = threshold.calculate(std::mem::size_of::<F>(), 2.0);
330
331 if output_len < parallel_threshold {
332 let mut result = Array1::zeros(output_len);
334 let windowsize_f = F::from(self.windowsize).unwrap();
335
336 let mut window_sum = self.data[..self.windowsize]
338 .iter()
339 .fold(F::zero(), |acc, &x| acc + x);
340 result[0] = window_sum / windowsize_f;
341
342 for i in 1..output_len {
344 window_sum = window_sum - self.data[i - 1] + self.data[i + self.windowsize - 1];
345 result[i] = window_sum / windowsize_f;
346 }
347
348 Ok(result)
349 } else {
350 let indices: Vec<usize> = (0..output_len).collect();
352 let data_ref = Arc::clone(&self.data);
353 let windowsize = self.windowsize;
354 let windowsize_f = F::from(windowsize).unwrap();
355
356 let means: Vec<F> = parallel_map(&indices, |&i| {
357 let window_sum = data_ref[i..i + windowsize]
358 .iter()
359 .fold(F::zero(), |acc, &x| acc + x);
360 window_sum / windowsize_f
361 });
362
363 Ok(Array1::from_vec(means))
364 }
365 }
366
367 pub fn moving_std(&self, ddof: usize) -> StatsResult<Array1<F>> {
369 let n = self.data.len();
370 let output_len = n - self.windowsize + 1;
371
372 if self.windowsize <= ddof {
373 return Err(StatsError::InvalidArgument(
374 "Window size must be greater than ddof".to_string(),
375 ));
376 }
377
378 let threshold = AdaptiveThreshold::new();
379 let parallel_threshold = threshold.calculate(std::mem::size_of::<F>(), 3.0);
380
381 if output_len < parallel_threshold {
382 let mut result = Array1::zeros(output_len);
384 let divisor = F::from(self.windowsize - ddof).unwrap();
385
386 for i in 0..output_len {
387 let window = &self.data[i..i + self.windowsize];
388 let mean = window.iter().fold(F::zero(), |acc, &x| acc + x)
389 / F::from(self.windowsize).unwrap();
390
391 let variance = window
392 .iter()
393 .map(|&x| {
394 let diff = x - mean;
395 diff * diff
396 })
397 .fold(F::zero(), |acc, x| acc + x)
398 / divisor;
399
400 result[i] = variance.sqrt();
401 }
402
403 Ok(result)
404 } else {
405 let indices: Vec<usize> = (0..output_len).collect();
407 let data_ref = Arc::clone(&self.data);
408 let windowsize = self.windowsize;
409 let divisor = F::from(windowsize - ddof).unwrap();
410
411 let stds: Vec<F> = parallel_map(&indices, |&i| {
412 let window = &data_ref[i..i + windowsize];
413 let mean =
414 window.iter().fold(F::zero(), |acc, &x| acc + x) / F::from(windowsize).unwrap();
415
416 let variance = window
417 .iter()
418 .map(|&x| {
419 let diff = x - mean;
420 diff * diff
421 })
422 .fold(F::zero(), |acc, x| acc + x)
423 / divisor;
424
425 variance.sqrt()
426 });
427
428 Ok(Array1::from_vec(stds))
429 }
430 }
431}
432
433#[allow(dead_code)]
435pub fn pairwise_distances_parallel<F, D>(
436 x: &ArrayBase<D, Ix2>,
437 metric: &str,
438) -> StatsResult<Array2<F>>
439where
440 F: Float + NumCast + Send + Sync + SimdUnifiedOps,
441 D: Data<Elem = F> + Sync,
442{
443 let n = x.nrows();
444 let d = x.ncols();
445
446 let threshold = AdaptiveThreshold::new();
447 let parallel_threshold = threshold.calculate(std::mem::size_of::<F>() * d, 2.0);
448
449 if n * n < parallel_threshold {
450 let mut distances = Array2::zeros((n, n));
452
453 for i in 0..n {
454 for j in i + 1..n {
455 let dist = match metric {
456 "euclidean" => {
457 let mut sum = F::zero();
458 for k in 0..d {
459 let diff = x[(i, k)] - x[(j, k)];
460 sum = sum + diff * diff;
461 }
462 sum.sqrt()
463 }
464 "manhattan" => {
465 let mut sum = F::zero();
466 for k in 0..d {
467 sum = sum + (x[(i, k)] - x[(j, k)]).abs();
468 }
469 sum
470 }
471 _ => return Err(StatsError::InvalidArgument("Unknown metric".to_string())),
472 };
473
474 distances[(i, j)] = dist;
475 distances[(j, i)] = dist;
476 }
477 }
478
479 Ok(distances)
480 } else {
481 let mut distances = Array2::zeros((n, n));
483 let pairs: Vec<(usize, usize)> = (0..n)
484 .flat_map(|i| (i + 1..n).map(move |j| (i, j)))
485 .collect();
486
487 let computed_distances: Vec<((usize, usize), F)> = parallel_map(&pairs, |&(i, j)| {
488 let dist = match metric {
489 "euclidean" => {
490 if d > 8 && F::simd_available() {
491 let row_i = x.slice(s![i, ..]);
493 let row_j = x.slice(s![j, ..]);
494 let diff = F::simd_sub(&row_i, &row_j);
495 let squared = F::simd_mul(&diff.view(), &diff.view());
496 F::simd_sum(&squared.view()).sqrt()
497 } else {
498 let mut sum = F::zero();
499 for k in 0..d {
500 let diff = x[(i, k)] - x[(j, k)];
501 sum = sum + diff * diff;
502 }
503 sum.sqrt()
504 }
505 }
506 "manhattan" => {
507 let mut sum = F::zero();
508 for k in 0..d {
509 sum = sum + (x[(i, k)] - x[(j, k)]).abs();
510 }
511 sum
512 }
513 _ => F::nan(), };
515 ((i, j), dist)
516 });
517
518 for ((i, j), dist) in computed_distances {
520 distances[(i, j)] = dist;
521 distances[(j, i)] = dist;
522 }
523
524 Ok(distances)
525 }
526}
527
528pub struct ParallelCrossValidation<F: Float> {
530 n_folds: usize,
531 shuffle: bool,
532 random_state: Option<u64>,
533 _phantom: std::marker::PhantomData<F>,
534}
535
536impl<F: Float + NumCast + Send + Sync + std::fmt::Display> ParallelCrossValidation<F> {
537 pub fn new(n_folds: usize, shuffle: bool, randomstate: Option<u64>) -> Self {
539 Self {
540 n_folds,
541 shuffle,
542 random_state: randomstate,
543 _phantom: std::marker::PhantomData,
544 }
545 }
546
547 pub fn cross_val_score<D, M, S>(
549 &self,
550 x: &ArrayBase<D, Ix2>,
551 y: &Array1<F>,
552 model: M,
553 scorer: S,
554 ) -> StatsResult<Array1<F>>
555 where
556 D: Data<Elem = F> + Sync,
557 M: Fn(&ArrayView2<F>, &Array1<F>) -> StatsResult<()> + Send + Sync + Clone,
558 S: Fn(&ArrayView2<F>, &Array1<F>) -> StatsResult<F> + Send + Sync,
559 {
560 let n_samples_ = x.nrows();
561 if n_samples_ != y.len() {
562 return Err(StatsError::DimensionMismatch(
563 "X and y must have same number of samples".to_string(),
564 ));
565 }
566
567 let indices: Vec<usize> = if self.shuffle {
569 use crate::random::permutation_int;
570 permutation_int(n_samples_, self.random_state)
571 .unwrap()
572 .to_vec()
573 } else {
574 (0..n_samples_).collect()
575 };
576
577 let foldsize = n_samples_ / self.n_folds;
579 let remainder = n_samples_ % self.n_folds;
580
581 let fold_indices: Vec<(usize, usize)> = (0..self.n_folds)
582 .map(|i| {
583 let start = i * foldsize + i.min(remainder);
584 let size = foldsize + if i < remainder { 1 } else { 0 };
585 (start, start + size)
586 })
587 .collect();
588
589 let scores: Vec<F> = parallel_map(&fold_indices, |&(test_start, test_end)| {
591 let test_indices = &indices[test_start..test_end];
593 let train_indices: Vec<usize> = indices[..test_start]
594 .iter()
595 .chain(indices[test_end..].iter())
596 .copied()
597 .collect();
598
599 let x_train = x.select(scirs2_core::ndarray::Axis(0), &train_indices);
601 let y_train = y.select(scirs2_core::ndarray::Axis(0), &train_indices);
602 let x_test = x.select(scirs2_core::ndarray::Axis(0), test_indices);
603 let y_test = y.select(scirs2_core::ndarray::Axis(0), test_indices);
604
605 let fold_model = model.clone();
607 fold_model(&x_train.view(), &y_train)?;
608
609 scorer(&x_test.view(), &y_test)
611 })
612 .into_iter()
613 .collect::<StatsResult<Vec<_>>>()?;
614
615 Ok(Array1::from_vec(scores))
616 }
617}
618
619#[allow(dead_code)]
624pub fn corrcoef_parallel<F, D>(data: &ArrayBase<D, Ix2>, rowvar: bool) -> StatsResult<Array2<F>>
625where
626 F: Float + NumCast + Send + Sync + SimdUnifiedOps,
627 D: Data<Elem = F> + Sync,
628{
629 use crate::correlation_simd::pearson_r_simd;
630
631 let (n_vars, n_obs) = if rowvar {
632 (data.nrows(), data.ncols())
633 } else {
634 (data.ncols(), data.nrows())
635 };
636
637 if n_obs < 2 {
638 return Err(StatsError::InvalidArgument(
639 "Need at least 2 observations to compute correlation".to_string(),
640 ));
641 }
642
643 let threshold = AdaptiveThreshold::new();
644 let parallel_threshold = threshold.calculate(
645 std::mem::size_of::<F>() * n_obs,
646 2.0, );
648
649 let mut corr_matrix = Array2::zeros((n_vars, n_vars));
650
651 for i in 0..n_vars {
653 corr_matrix[(i, i)] = F::one();
654 }
655
656 let pairs: Vec<(usize, usize)> = (0..n_vars)
658 .flat_map(|i| (i + 1..n_vars).map(move |j| (i, j)))
659 .collect();
660
661 if pairs.len() * n_obs < parallel_threshold {
662 for (i, j) in pairs {
664 let var_i = if rowvar {
665 data.slice(s![i, ..])
666 } else {
667 data.slice(s![.., i])
668 };
669
670 let var_j = if rowvar {
671 data.slice(s![j, ..])
672 } else {
673 data.slice(s![.., j])
674 };
675
676 let corr = pearson_r_simd(&var_i, &var_j)?;
677 corr_matrix[(i, j)] = corr;
678 corr_matrix[(j, i)] = corr;
679 }
680 } else {
681 let correlations: Vec<((usize, usize), F)> = parallel_map(&pairs, |&(i, j)| {
683 let var_i = if rowvar {
684 data.slice(s![i, ..])
685 } else {
686 data.slice(s![.., i])
687 };
688
689 let var_j = if rowvar {
690 data.slice(s![j, ..])
691 } else {
692 data.slice(s![.., j])
693 };
694
695 let corr = pearson_r_simd(&var_i, &var_j)?;
696 Ok(((i, j), corr))
697 })
698 .into_iter()
699 .collect::<StatsResult<Vec<_>>>()?;
700
701 for ((i, j), corr) in correlations {
703 corr_matrix[(i, j)] = corr;
704 corr_matrix[(j, i)] = corr;
705 }
706 }
707
708 Ok(corr_matrix)
709}
710
711#[allow(dead_code)]
716pub fn partial_corrcoef_parallel<F, D>(
717 data: &ArrayBase<D, Ix2>,
718 control_vars: &[usize],
719) -> StatsResult<Array2<F>>
720where
721 F: Float
722 + NumCast
723 + Send
724 + Sync
725 + SimdUnifiedOps
726 + scirs2_core::ndarray::ScalarOperand
727 + std::iter::Sum
728 + scirs2_core::numeric::NumAssign,
729 D: Data<Elem = F> + Sync,
730{
731 use scirs2_linalg::lstsq;
732
733 let n_vars = data.ncols();
734 let n_obs = data.nrows();
735
736 if control_vars.is_empty() {
737 return corrcoef_parallel(data, false);
739 }
740
741 for &idx in control_vars {
743 if idx >= n_vars {
744 return Err(StatsError::InvalidArgument(format!(
745 "Control variable index {} out of bounds",
746 idx
747 )));
748 }
749 }
750
751 let threshold = AdaptiveThreshold::new();
752 let parallel_threshold = threshold.calculate(
753 std::mem::size_of::<F>() * n_obs * control_vars.len(),
754 3.0, );
756
757 let mut partial_corr = Array2::zeros((n_vars, n_vars));
758
759 for i in 0..n_vars {
761 partial_corr[(i, i)] = F::one();
762 }
763
764 let control_matrix = data.select(scirs2_core::ndarray::Axis(1), control_vars);
766
767 let pairs: Vec<(usize, usize)> = (0..n_vars)
769 .filter(|i| !control_vars.contains(i))
770 .flat_map(|i| {
771 (i + 1..n_vars)
772 .filter(|j| !control_vars.contains(j))
773 .map(move |j| (i, j))
774 })
775 .collect();
776
777 if pairs.len() * n_obs * control_vars.len() < parallel_threshold {
778 for (i, j) in pairs {
780 let var_i = data.slice(s![.., i]);
781 let var_j = data.slice(s![.., j]);
782
783 let solution_i = lstsq(&control_matrix.view(), &var_i.view(), None)
785 .map_err(|e| StatsError::ComputationError(format!("Regression failed: {}", e)))?;
786 let predicted_i = control_matrix.dot(&solution_i.x);
787 let residuals_i = &var_i - &predicted_i;
788
789 let solution_j = lstsq(&control_matrix.view(), &var_j.view(), None)
790 .map_err(|e| StatsError::ComputationError(format!("Regression failed: {}", e)))?;
791 let predicted_j = control_matrix.dot(&solution_j.x);
792 let residuals_j = &var_j - &predicted_j;
793
794 use crate::correlation_simd::pearson_r_simd;
796 let partial_r = pearson_r_simd(&residuals_i, &residuals_j)?;
797
798 partial_corr[(i, j)] = partial_r;
799 partial_corr[(j, i)] = partial_r;
800 }
801 } else {
802 let partial_correlations: Vec<((usize, usize), F)> = parallel_map(&pairs, |&(i, j)| {
804 let var_i = data.slice(s![.., i]);
805 let var_j = data.slice(s![.., j]);
806
807 let solution_i = lstsq(&control_matrix.view(), &var_i.view(), None)
809 .map_err(|e| StatsError::ComputationError(format!("Regression failed: {}", e)))?;
810 let predicted_i = control_matrix.dot(&solution_i.x);
811 let residuals_i = &var_i - &predicted_i;
812
813 let solution_j = lstsq(&control_matrix.view(), &var_j.view(), None)
814 .map_err(|e| StatsError::ComputationError(format!("Regression failed: {}", e)))?;
815 let predicted_j = control_matrix.dot(&solution_j.x);
816 let residuals_j = &var_j - &predicted_j;
817
818 use crate::correlation_simd::pearson_r_simd;
820 let partial_r = pearson_r_simd(&residuals_i, &residuals_j)?;
821
822 Ok(((i, j), partial_r))
823 })
824 .into_iter()
825 .collect::<StatsResult<Vec<_>>>()?;
826
827 for ((i, j), corr) in partial_correlations {
829 partial_corr[(i, j)] = corr;
830 partial_corr[(j, i)] = corr;
831 }
832 }
833
834 Ok(partial_corr)
835}
836
837#[allow(dead_code)]
841pub fn autocorrelation_parallel<F, D>(
842 data: &ArrayBase<D, Ix1>,
843 max_lag: usize,
844) -> StatsResult<Array1<F>>
845where
846 F: Float + NumCast + Send + Sync + SimdUnifiedOps,
847 D: Data<Elem = F> + Sync,
848{
849 let n = data.len();
850 if max_lag >= n {
851 return Err(StatsError::InvalidArgument(
852 "max_lag must be less than data length".to_string(),
853 ));
854 }
855
856 let threshold = AdaptiveThreshold::new();
857 let parallel_threshold = threshold.calculate(
858 std::mem::size_of::<F>() * n,
859 1.5, );
861
862 let mean = mean_simd(data)?;
864 let variance = variance_simd(data, 0)?;
865
866 if variance <= F::epsilon() {
867 return Err(StatsError::InvalidArgument(
868 "Cannot compute autocorrelation for constant series".to_string(),
869 ));
870 }
871
872 let lags: Vec<usize> = (0..=max_lag).collect();
873
874 let autocorr = if max_lag * n < parallel_threshold {
875 lags.iter()
877 .map(|&lag| {
878 if lag == 0 {
879 F::one()
880 } else {
881 let mut sum = F::zero();
882 for i in 0..n - lag {
883 sum = sum + (data[i] - mean) * (data[i + lag] - mean);
884 }
885 sum / (F::from(n - lag).unwrap() * variance)
886 }
887 })
888 .collect()
889 } else {
890 parallel_map(&lags, |&lag| {
892 if lag == 0 {
893 F::one()
894 } else if F::simd_available() && n - lag > 64 {
895 let data_start = data.slice(s![..n - lag]);
897 let data_lagged = data.slice(s![lag..]);
898
899 let mean_array = scirs2_core::ndarray::Array1::from_elem(n - lag, mean);
900 let start_centered = F::simd_sub(&data_start, &mean_array.view());
901 let lagged_centered = F::simd_sub(&data_lagged, &mean_array.view());
902
903 let products = F::simd_mul(&start_centered.view(), &lagged_centered.view());
904 let sum = F::simd_sum(&products.view());
905
906 sum / (F::from(n - lag).unwrap() * variance)
907 } else {
908 let mut sum = F::zero();
910 for i in 0..n - lag {
911 sum = sum + (data[i] - mean) * (data[i + lag] - mean);
912 }
913 sum / (F::from(n - lag).unwrap() * variance)
914 }
915 })
916 };
917
918 Ok(Array1::from_vec(autocorr))
919}
920
921#[cfg(test)]
922mod tests {
923 use super::*;
924 use approx::assert_relative_eq;
925 use scirs2_core::ndarray::array;
926
927 #[test]
928 #[ignore = "timeout"]
929 fn test_parallel_histogram() {
930 let data = Array1::from_vec((0..10000).map(|i| i as f64 / 100.0).collect());
931 let hist = ParallelHistogram::new(&data.view(), 10).unwrap();
932
933 let (bins, counts) = hist.get_histogram();
934 assert_eq!(bins.len(), 10);
935 assert_eq!(counts.iter().sum::<usize>(), 10000);
936 }
937
938 #[test]
939 fn test_parallel_kde() {
940 let data = array![1.0, 2.0, 3.0, 4.0, 5.0];
941 let eval_points = Array1::linspace(0.0, 6.0, 100);
942
943 let kde_result = kde_parallel(&data.view(), &eval_points, 0.5).unwrap();
944 assert_eq!(kde_result.len(), 100);
945
946 let max_idx = kde_result
948 .iter()
949 .enumerate()
950 .max_by(|(_, a), (_, b)| a.partial_cmp(b).unwrap())
951 .map(|(idx_, _)| idx_)
952 .unwrap();
953
954 assert!(max_idx > 40 && max_idx < 60); }
956
957 #[test]
958 fn test_moving_stats() {
959 let data = Array1::from_vec((0..100).map(|i| i as f64).collect());
960 let moving_stats = ParallelMovingStats::new(&data.view(), 10).unwrap();
961
962 let moving_mean = moving_stats.moving_mean().unwrap();
963 assert_eq!(moving_mean.len(), 91); assert_relative_eq!(moving_mean[0], 4.5, epsilon = 1e-10);
967
968 assert_relative_eq!(moving_mean[90], 94.5, epsilon = 1e-10);
970 }
971
972 #[test]
973 fn test_pairwise_distances() {
974 let data = array![[0.0, 0.0], [1.0, 0.0], [0.0, 1.0], [1.0, 1.0],];
975
976 let distances = pairwise_distances_parallel(&data.view(), "euclidean").unwrap();
977
978 for i in 0..4 {
980 assert_relative_eq!(distances[(i, i)], 0.0, epsilon = 1e-10);
981 }
982
983 assert_relative_eq!(distances[(0, 1)], 1.0, epsilon = 1e-10);
985 assert_relative_eq!(distances[(0, 3)], 2.0_f64.sqrt(), epsilon = 1e-10);
986
987 for i in 0..4 {
989 for j in 0..4 {
990 assert_relative_eq!(distances[(i, j)], distances[(j, i)], epsilon = 1e-10);
991 }
992 }
993 }
994}