1use crate::descriptive_simd::{mean_simd, variance_simd};
10use crate::error::{StatsError, StatsResult};
11use scirs2_core::ndarray::{s, Array1, Array2, ArrayBase, ArrayView2, Data, Ix1, Ix2};
12use scirs2_core::numeric::{Float, NumCast};
13use scirs2_core::parallel_ops::{num_threads, par_chunks, parallel_map, ParallelIterator};
14use scirs2_core::simd_ops::{PlatformCapabilities, SimdUnifiedOps};
15use std::f64::consts::PI;
16use std::sync::Arc;
17
18pub struct AdaptiveThreshold {
20 base_threshold: usize,
21 cpu_cores: usize,
22 simd_available: bool,
23 cache_linesize: usize,
24}
25
26impl Default for AdaptiveThreshold {
27 fn default() -> Self {
28 Self::new()
29 }
30}
31
32impl AdaptiveThreshold {
33 pub fn new() -> Self {
35 let caps = PlatformCapabilities::detect();
36 Self {
37 base_threshold: 10_000,
38 cpu_cores: num_threads(),
39 simd_available: caps.simd_available,
40 cache_linesize: 64, }
42 }
43
44 pub fn calculate(&self, elementsize: usize, operationcomplexity: f64) -> usize {
46 let simd_factor = if self.simd_available { 2.0 } else { 1.0 };
53 let core_factor = (self.cpu_cores as f64).sqrt();
54 let cache_factor = (self.cache_linesize / elementsize).max(1) as f64;
55
56 let adjusted_threshold =
57 (self.base_threshold as f64 * simd_factor / core_factor / operationcomplexity
58 * cache_factor.sqrt()) as usize;
59
60 adjusted_threshold.max(1000) }
62
63 pub fn optimal_chunksize(&self, total_elements: usize, elementsize: usize) -> usize {
65 let l1_cachesize = 32 * 1024;
67 let elements_per_cache = l1_cachesize / elementsize;
68
69 let ideal_chunk = elements_per_cache / 2; let min_chunks = self.cpu_cores * 4; let max_chunksize = total_elements / min_chunks;
75
76 ideal_chunk.min(max_chunksize).max(64)
77 }
78}
79
80pub struct ParallelHistogram<F: Float> {
82 bins: Vec<F>,
83 counts: Vec<usize>,
84 min_val: F,
85 max_val: F,
86 n_bins: usize,
87}
88
89impl<F: Float + NumCast + Send + Sync + std::fmt::Display> ParallelHistogram<F> {
90 pub fn new<D>(data: &ArrayBase<D, Ix1>, nbins: usize) -> StatsResult<Self>
92 where
93 D: Data<Elem = F> + Sync,
94 {
95 if data.is_empty() {
96 return Err(StatsError::InvalidArgument(
97 "Cannot create histogram from empty data".to_string(),
98 ));
99 }
100
101 let threshold = AdaptiveThreshold::new();
102 let parallel_threshold = threshold.calculate(std::mem::size_of::<F>(), 1.0);
103
104 let (min_val, max_val) = if data.len() >= parallel_threshold {
106 let chunksize = threshold.optimal_chunksize(data.len(), std::mem::size_of::<F>());
107
108 let (min, max) = par_chunks(data.as_slice().expect("Operation failed"), chunksize)
109 .map(|chunk| {
110 let mut local_min = chunk[0];
111 let mut local_max = chunk[0];
112 for &val in chunk.iter().skip(1) {
113 if val < local_min {
114 local_min = val;
115 }
116 if val > local_max {
117 local_max = val;
118 }
119 }
120 (local_min, local_max)
121 })
122 .reduce(
123 || (F::infinity(), F::neg_infinity()),
124 |(min1, max1), (min2, max2)| {
125 (
126 if min1 < min2 { min1 } else { min2 },
127 if max1 > max2 { max1 } else { max2 },
128 )
129 },
130 );
131 (min, max)
132 } else {
133 let mut min = data[0];
135 let mut max = data[0];
136 for &val in data.iter().skip(1) {
137 if val < min {
138 min = val;
139 }
140 if val > max {
141 max = val;
142 }
143 }
144 (min, max)
145 };
146
147 let range = max_val - min_val;
149 let bin_width = range / F::from(nbins).expect("Failed to convert to float");
150
151 let bins: Vec<F> = (0..=nbins)
152 .map(|i| min_val + bin_width * F::from(i).expect("Failed to convert to float"))
153 .collect();
154
155 let mut histogram = Self {
156 bins,
157 counts: vec![0; nbins],
158 min_val,
159 max_val,
160 n_bins: nbins,
161 };
162
163 histogram.compute_counts(data)?;
165
166 Ok(histogram)
167 }
168
169 fn compute_counts<D>(&mut self, data: &ArrayBase<D, Ix1>) -> StatsResult<()>
171 where
172 D: Data<Elem = F> + Sync,
173 {
174 let threshold = AdaptiveThreshold::new();
175 let parallel_threshold = threshold.calculate(std::mem::size_of::<F>(), 2.0);
176
177 if data.len() < parallel_threshold {
178 let bin_width = (self.max_val - self.min_val)
180 / F::from(self.n_bins).expect("Failed to convert to float");
181
182 for &val in data.iter() {
183 if val >= self.min_val && val <= self.max_val {
184 let bin_idx = ((val - self.min_val) / bin_width)
185 .floor()
186 .to_usize()
187 .expect("Operation failed")
188 .min(self.n_bins - 1);
189 self.counts[bin_idx] += 1;
190 }
191 }
192 } else {
193 let chunksize = threshold.optimal_chunksize(data.len(), std::mem::size_of::<F>());
195 let bin_width = (self.max_val - self.min_val)
196 / F::from(self.n_bins).expect("Failed to convert to float");
197 let n_bins = self.n_bins;
198 let min_val = self.min_val;
199
200 let local_histograms: Vec<Vec<usize>> =
202 par_chunks(data.as_slice().expect("Operation failed"), chunksize)
203 .map(|chunk| {
204 let mut local_counts = vec![0; n_bins];
205
206 for &val in chunk {
207 if val >= min_val && val <= self.max_val {
208 let bin_idx = ((val - min_val) / bin_width)
209 .floor()
210 .to_usize()
211 .expect("Operation failed")
212 .min(n_bins - 1);
213 local_counts[bin_idx] += 1;
214 }
215 }
216
217 local_counts
218 })
219 .collect();
220
221 for local_counts in local_histograms {
223 for (i, count) in local_counts.into_iter().enumerate() {
224 self.counts[i] += count;
225 }
226 }
227 }
228
229 Ok(())
230 }
231
232 pub fn get_histogram(&self) -> (&[F], &[usize]) {
234 (&self.bins[..self.n_bins], &self.counts)
235 }
236}
237
238#[allow(dead_code)]
240pub fn kde_parallel<F, D>(
241 data: &ArrayBase<D, Ix1>,
242 eval_points: &Array1<F>,
243 bandwidth: F,
244) -> StatsResult<Array1<F>>
245where
246 F: Float + NumCast + Send + Sync + SimdUnifiedOps,
247 D: Data<Elem = F> + Sync,
248{
249 if data.is_empty() || eval_points.is_empty() {
250 return Err(StatsError::InvalidArgument("Empty input data".to_string()));
251 }
252
253 let n = data.len();
254 let threshold = AdaptiveThreshold::new();
255 let parallel_threshold = threshold.calculate(std::mem::size_of::<F>(), 3.0);
256
257 let norm_const = F::one()
259 / (F::from(2.0 * PI)
260 .expect("Failed to convert to float")
261 .sqrt()
262 * bandwidth
263 * F::from(n).expect("Failed to convert to float"));
264
265 if eval_points.len() * n < parallel_threshold {
266 let mut result = Array1::zeros(eval_points.len());
268
269 for (i, &x) in eval_points.iter().enumerate() {
270 let mut density = F::zero();
271 for &xi in data.iter() {
272 let u = (x - xi) / bandwidth;
273 density = density
274 + (-u * u / F::from(2.0).expect("Failed to convert constant to float")).exp();
275 }
276 result[i] = density * norm_const;
277 }
278
279 Ok(result)
280 } else {
281 let eval_vec: Vec<F> = eval_points.to_vec();
283 let data_slice = data.as_slice().expect("Operation failed");
284
285 let densities: Vec<F> = parallel_map(&eval_vec, |&x| {
286 let chunksize = threshold.optimal_chunksize(n, std::mem::size_of::<F>());
287
288 let density: F = par_chunks(data_slice, chunksize)
290 .map(|chunk| {
291 let mut local_sum = F::zero();
292 for &xi in chunk {
293 let u = (x - xi) / bandwidth;
294 local_sum = local_sum
295 + (-u * u / F::from(2.0).expect("Failed to convert constant to float"))
296 .exp();
297 }
298 local_sum
299 })
300 .reduce(|| F::zero(), |a, b| a + b);
301
302 density * norm_const
303 });
304
305 Ok(Array1::from_vec(densities))
306 }
307}
308
309pub struct ParallelMovingStats<F: Float> {
311 windowsize: usize,
312 data: Arc<Vec<F>>,
313}
314
315impl<F: Float + NumCast + Send + Sync + SimdUnifiedOps + std::fmt::Display> ParallelMovingStats<F> {
316 pub fn new<D>(data: &ArrayBase<D, Ix1>, windowsize: usize) -> StatsResult<Self>
318 where
319 D: Data<Elem = F>,
320 {
321 if windowsize == 0 || windowsize > data.len() {
322 return Err(StatsError::InvalidArgument(
323 "Invalid window size".to_string(),
324 ));
325 }
326
327 Ok(Self {
328 windowsize,
329 data: Arc::new(data.to_vec()),
330 })
331 }
332
333 pub fn moving_mean(&self) -> StatsResult<Array1<F>> {
335 let n = self.data.len();
336 let output_len = n - self.windowsize + 1;
337
338 let threshold = AdaptiveThreshold::new();
339 let parallel_threshold = threshold.calculate(std::mem::size_of::<F>(), 2.0);
340
341 if output_len < parallel_threshold {
342 let mut result = Array1::zeros(output_len);
344 let windowsize_f = F::from(self.windowsize).expect("Failed to convert to float");
345
346 let mut window_sum = self.data[..self.windowsize]
348 .iter()
349 .fold(F::zero(), |acc, &x| acc + x);
350 result[0] = window_sum / windowsize_f;
351
352 for i in 1..output_len {
354 window_sum = window_sum - self.data[i - 1] + self.data[i + self.windowsize - 1];
355 result[i] = window_sum / windowsize_f;
356 }
357
358 Ok(result)
359 } else {
360 let indices: Vec<usize> = (0..output_len).collect();
362 let data_ref = Arc::clone(&self.data);
363 let windowsize = self.windowsize;
364 let windowsize_f = F::from(windowsize).expect("Failed to convert to float");
365
366 let means: Vec<F> = parallel_map(&indices, |&i| {
367 let window_sum = data_ref[i..i + windowsize]
368 .iter()
369 .fold(F::zero(), |acc, &x| acc + x);
370 window_sum / windowsize_f
371 });
372
373 Ok(Array1::from_vec(means))
374 }
375 }
376
377 pub fn moving_std(&self, ddof: usize) -> StatsResult<Array1<F>> {
379 let n = self.data.len();
380 let output_len = n - self.windowsize + 1;
381
382 if self.windowsize <= ddof {
383 return Err(StatsError::InvalidArgument(
384 "Window size must be greater than ddof".to_string(),
385 ));
386 }
387
388 let threshold = AdaptiveThreshold::new();
389 let parallel_threshold = threshold.calculate(std::mem::size_of::<F>(), 3.0);
390
391 if output_len < parallel_threshold {
392 let mut result = Array1::zeros(output_len);
394 let divisor = F::from(self.windowsize - ddof).expect("Failed to convert to float");
395
396 for i in 0..output_len {
397 let window = &self.data[i..i + self.windowsize];
398 let mean = window.iter().fold(F::zero(), |acc, &x| acc + x)
399 / F::from(self.windowsize).expect("Failed to convert to float");
400
401 let variance = window
402 .iter()
403 .map(|&x| {
404 let diff = x - mean;
405 diff * diff
406 })
407 .fold(F::zero(), |acc, x| acc + x)
408 / divisor;
409
410 result[i] = variance.sqrt();
411 }
412
413 Ok(result)
414 } else {
415 let indices: Vec<usize> = (0..output_len).collect();
417 let data_ref = Arc::clone(&self.data);
418 let windowsize = self.windowsize;
419 let divisor = F::from(windowsize - ddof).expect("Failed to convert to float");
420
421 let stds: Vec<F> = parallel_map(&indices, |&i| {
422 let window = &data_ref[i..i + windowsize];
423 let mean = window.iter().fold(F::zero(), |acc, &x| acc + x)
424 / F::from(windowsize).expect("Failed to convert to float");
425
426 let variance = window
427 .iter()
428 .map(|&x| {
429 let diff = x - mean;
430 diff * diff
431 })
432 .fold(F::zero(), |acc, x| acc + x)
433 / divisor;
434
435 variance.sqrt()
436 });
437
438 Ok(Array1::from_vec(stds))
439 }
440 }
441}
442
443#[allow(dead_code)]
445pub fn pairwise_distances_parallel<F, D>(
446 x: &ArrayBase<D, Ix2>,
447 metric: &str,
448) -> StatsResult<Array2<F>>
449where
450 F: Float + NumCast + Send + Sync + SimdUnifiedOps,
451 D: Data<Elem = F> + Sync,
452{
453 let n = x.nrows();
454 let d = x.ncols();
455
456 let threshold = AdaptiveThreshold::new();
457 let parallel_threshold = threshold.calculate(std::mem::size_of::<F>() * d, 2.0);
458
459 if n * n < parallel_threshold {
460 let mut distances = Array2::zeros((n, n));
462
463 for i in 0..n {
464 for j in i + 1..n {
465 let dist = match metric {
466 "euclidean" => {
467 let mut sum = F::zero();
468 for k in 0..d {
469 let diff = x[(i, k)] - x[(j, k)];
470 sum = sum + diff * diff;
471 }
472 sum.sqrt()
473 }
474 "manhattan" => {
475 let mut sum = F::zero();
476 for k in 0..d {
477 sum = sum + (x[(i, k)] - x[(j, k)]).abs();
478 }
479 sum
480 }
481 _ => return Err(StatsError::InvalidArgument("Unknown metric".to_string())),
482 };
483
484 distances[(i, j)] = dist;
485 distances[(j, i)] = dist;
486 }
487 }
488
489 Ok(distances)
490 } else {
491 let mut distances = Array2::zeros((n, n));
493 let pairs: Vec<(usize, usize)> = (0..n)
494 .flat_map(|i| (i + 1..n).map(move |j| (i, j)))
495 .collect();
496
497 let computed_distances: Vec<((usize, usize), F)> = parallel_map(&pairs, |&(i, j)| {
498 let dist = match metric {
499 "euclidean" => {
500 if d > 8 && F::simd_available() {
501 let row_i = x.slice(s![i, ..]);
503 let row_j = x.slice(s![j, ..]);
504 let diff = F::simd_sub(&row_i, &row_j);
505 let squared = F::simd_mul(&diff.view(), &diff.view());
506 F::simd_sum(&squared.view()).sqrt()
507 } else {
508 let mut sum = F::zero();
509 for k in 0..d {
510 let diff = x[(i, k)] - x[(j, k)];
511 sum = sum + diff * diff;
512 }
513 sum.sqrt()
514 }
515 }
516 "manhattan" => {
517 let mut sum = F::zero();
518 for k in 0..d {
519 sum = sum + (x[(i, k)] - x[(j, k)]).abs();
520 }
521 sum
522 }
523 _ => F::nan(), };
525 ((i, j), dist)
526 });
527
528 for ((i, j), dist) in computed_distances {
530 distances[(i, j)] = dist;
531 distances[(j, i)] = dist;
532 }
533
534 Ok(distances)
535 }
536}
537
538pub struct ParallelCrossValidation<F: Float> {
540 n_folds: usize,
541 shuffle: bool,
542 random_state: Option<u64>,
543 _phantom: std::marker::PhantomData<F>,
544}
545
546impl<F: Float + NumCast + Send + Sync + std::fmt::Display> ParallelCrossValidation<F> {
547 pub fn new(n_folds: usize, shuffle: bool, randomstate: Option<u64>) -> Self {
549 Self {
550 n_folds,
551 shuffle,
552 random_state: randomstate,
553 _phantom: std::marker::PhantomData,
554 }
555 }
556
557 pub fn cross_val_score<D, M, S>(
559 &self,
560 x: &ArrayBase<D, Ix2>,
561 y: &Array1<F>,
562 model: M,
563 scorer: S,
564 ) -> StatsResult<Array1<F>>
565 where
566 D: Data<Elem = F> + Sync,
567 M: Fn(&ArrayView2<F>, &Array1<F>) -> StatsResult<()> + Send + Sync + Clone,
568 S: Fn(&ArrayView2<F>, &Array1<F>) -> StatsResult<F> + Send + Sync,
569 {
570 let n_samples_ = x.nrows();
571 if n_samples_ != y.len() {
572 return Err(StatsError::DimensionMismatch(
573 "X and y must have same number of samples".to_string(),
574 ));
575 }
576
577 let indices: Vec<usize> = if self.shuffle {
579 use crate::random::permutation_int;
580 permutation_int(n_samples_, self.random_state)
581 .expect("Operation failed")
582 .to_vec()
583 } else {
584 (0..n_samples_).collect()
585 };
586
587 let foldsize = n_samples_ / self.n_folds;
589 let remainder = n_samples_ % self.n_folds;
590
591 let fold_indices: Vec<(usize, usize)> = (0..self.n_folds)
592 .map(|i| {
593 let start = i * foldsize + i.min(remainder);
594 let size = foldsize + if i < remainder { 1 } else { 0 };
595 (start, start + size)
596 })
597 .collect();
598
599 let scores: Vec<F> = parallel_map(&fold_indices, |&(test_start, test_end)| {
601 let test_indices = &indices[test_start..test_end];
603 let train_indices: Vec<usize> = indices[..test_start]
604 .iter()
605 .chain(indices[test_end..].iter())
606 .copied()
607 .collect();
608
609 let x_train = x.select(scirs2_core::ndarray::Axis(0), &train_indices);
611 let y_train = y.select(scirs2_core::ndarray::Axis(0), &train_indices);
612 let x_test = x.select(scirs2_core::ndarray::Axis(0), test_indices);
613 let y_test = y.select(scirs2_core::ndarray::Axis(0), test_indices);
614
615 let fold_model = model.clone();
617 fold_model(&x_train.view(), &y_train)?;
618
619 scorer(&x_test.view(), &y_test)
621 })
622 .into_iter()
623 .collect::<StatsResult<Vec<_>>>()?;
624
625 Ok(Array1::from_vec(scores))
626 }
627}
628
629#[allow(dead_code)]
634pub fn corrcoef_parallel<F, D>(data: &ArrayBase<D, Ix2>, rowvar: bool) -> StatsResult<Array2<F>>
635where
636 F: Float + NumCast + Send + Sync + SimdUnifiedOps,
637 D: Data<Elem = F> + Sync,
638{
639 use crate::correlation_simd::pearson_r_simd;
640
641 let (n_vars, n_obs) = if rowvar {
642 (data.nrows(), data.ncols())
643 } else {
644 (data.ncols(), data.nrows())
645 };
646
647 if n_obs < 2 {
648 return Err(StatsError::InvalidArgument(
649 "Need at least 2 observations to compute correlation".to_string(),
650 ));
651 }
652
653 let threshold = AdaptiveThreshold::new();
654 let parallel_threshold = threshold.calculate(
655 std::mem::size_of::<F>() * n_obs,
656 2.0, );
658
659 let mut corr_matrix = Array2::zeros((n_vars, n_vars));
660
661 for i in 0..n_vars {
663 corr_matrix[(i, i)] = F::one();
664 }
665
666 let pairs: Vec<(usize, usize)> = (0..n_vars)
668 .flat_map(|i| (i + 1..n_vars).map(move |j| (i, j)))
669 .collect();
670
671 if pairs.len() * n_obs < parallel_threshold {
672 for (i, j) in pairs {
674 let var_i = if rowvar {
675 data.slice(s![i, ..])
676 } else {
677 data.slice(s![.., i])
678 };
679
680 let var_j = if rowvar {
681 data.slice(s![j, ..])
682 } else {
683 data.slice(s![.., j])
684 };
685
686 let corr = pearson_r_simd(&var_i, &var_j)?;
687 corr_matrix[(i, j)] = corr;
688 corr_matrix[(j, i)] = corr;
689 }
690 } else {
691 let correlations: Vec<((usize, usize), F)> = parallel_map(&pairs, |&(i, j)| {
693 let var_i = if rowvar {
694 data.slice(s![i, ..])
695 } else {
696 data.slice(s![.., i])
697 };
698
699 let var_j = if rowvar {
700 data.slice(s![j, ..])
701 } else {
702 data.slice(s![.., j])
703 };
704
705 let corr = pearson_r_simd(&var_i, &var_j)?;
706 Ok(((i, j), corr))
707 })
708 .into_iter()
709 .collect::<StatsResult<Vec<_>>>()?;
710
711 for ((i, j), corr) in correlations {
713 corr_matrix[(i, j)] = corr;
714 corr_matrix[(j, i)] = corr;
715 }
716 }
717
718 Ok(corr_matrix)
719}
720
721#[allow(dead_code)]
726pub fn partial_corrcoef_parallel<F, D>(
727 data: &ArrayBase<D, Ix2>,
728 control_vars: &[usize],
729) -> StatsResult<Array2<F>>
730where
731 F: Float
732 + NumCast
733 + Send
734 + Sync
735 + SimdUnifiedOps
736 + scirs2_core::ndarray::ScalarOperand
737 + std::iter::Sum
738 + scirs2_core::numeric::NumAssign,
739 D: Data<Elem = F> + Sync,
740{
741 use scirs2_linalg::lstsq;
742
743 let n_vars = data.ncols();
744 let n_obs = data.nrows();
745
746 if control_vars.is_empty() {
747 return corrcoef_parallel(data, false);
749 }
750
751 for &idx in control_vars {
753 if idx >= n_vars {
754 return Err(StatsError::InvalidArgument(format!(
755 "Control variable index {} out of bounds",
756 idx
757 )));
758 }
759 }
760
761 let threshold = AdaptiveThreshold::new();
762 let parallel_threshold = threshold.calculate(
763 std::mem::size_of::<F>() * n_obs * control_vars.len(),
764 3.0, );
766
767 let mut partial_corr = Array2::zeros((n_vars, n_vars));
768
769 for i in 0..n_vars {
771 partial_corr[(i, i)] = F::one();
772 }
773
774 let control_matrix = data.select(scirs2_core::ndarray::Axis(1), control_vars);
776
777 let pairs: Vec<(usize, usize)> = (0..n_vars)
779 .filter(|i| !control_vars.contains(i))
780 .flat_map(|i| {
781 (i + 1..n_vars)
782 .filter(|j| !control_vars.contains(j))
783 .map(move |j| (i, j))
784 })
785 .collect();
786
787 if pairs.len() * n_obs * control_vars.len() < parallel_threshold {
788 for (i, j) in pairs {
790 let var_i = data.slice(s![.., i]);
791 let var_j = data.slice(s![.., j]);
792
793 let solution_i = lstsq(&control_matrix.view(), &var_i.view(), None)
795 .map_err(|e| StatsError::ComputationError(format!("Regression failed: {}", e)))?;
796 let predicted_i = control_matrix.dot(&solution_i.x);
797 let residuals_i = &var_i - &predicted_i;
798
799 let solution_j = lstsq(&control_matrix.view(), &var_j.view(), None)
800 .map_err(|e| StatsError::ComputationError(format!("Regression failed: {}", e)))?;
801 let predicted_j = control_matrix.dot(&solution_j.x);
802 let residuals_j = &var_j - &predicted_j;
803
804 use crate::correlation_simd::pearson_r_simd;
806 let partial_r = pearson_r_simd(&residuals_i, &residuals_j)?;
807
808 partial_corr[(i, j)] = partial_r;
809 partial_corr[(j, i)] = partial_r;
810 }
811 } else {
812 let partial_correlations: Vec<((usize, usize), F)> = parallel_map(&pairs, |&(i, j)| {
814 let var_i = data.slice(s![.., i]);
815 let var_j = data.slice(s![.., j]);
816
817 let solution_i = lstsq(&control_matrix.view(), &var_i.view(), None)
819 .map_err(|e| StatsError::ComputationError(format!("Regression failed: {}", e)))?;
820 let predicted_i = control_matrix.dot(&solution_i.x);
821 let residuals_i = &var_i - &predicted_i;
822
823 let solution_j = lstsq(&control_matrix.view(), &var_j.view(), None)
824 .map_err(|e| StatsError::ComputationError(format!("Regression failed: {}", e)))?;
825 let predicted_j = control_matrix.dot(&solution_j.x);
826 let residuals_j = &var_j - &predicted_j;
827
828 use crate::correlation_simd::pearson_r_simd;
830 let partial_r = pearson_r_simd(&residuals_i, &residuals_j)?;
831
832 Ok(((i, j), partial_r))
833 })
834 .into_iter()
835 .collect::<StatsResult<Vec<_>>>()?;
836
837 for ((i, j), corr) in partial_correlations {
839 partial_corr[(i, j)] = corr;
840 partial_corr[(j, i)] = corr;
841 }
842 }
843
844 Ok(partial_corr)
845}
846
847#[allow(dead_code)]
851pub fn autocorrelation_parallel<F, D>(
852 data: &ArrayBase<D, Ix1>,
853 max_lag: usize,
854) -> StatsResult<Array1<F>>
855where
856 F: Float + NumCast + Send + Sync + SimdUnifiedOps,
857 D: Data<Elem = F> + Sync,
858{
859 let n = data.len();
860 if max_lag >= n {
861 return Err(StatsError::InvalidArgument(
862 "max_lag must be less than data length".to_string(),
863 ));
864 }
865
866 let threshold = AdaptiveThreshold::new();
867 let parallel_threshold = threshold.calculate(
868 std::mem::size_of::<F>() * n,
869 1.5, );
871
872 let mean = mean_simd(data)?;
874 let variance = variance_simd(data, 0)?;
875
876 if variance <= F::epsilon() {
877 return Err(StatsError::InvalidArgument(
878 "Cannot compute autocorrelation for constant series".to_string(),
879 ));
880 }
881
882 let lags: Vec<usize> = (0..=max_lag).collect();
883
884 let autocorr = if max_lag * n < parallel_threshold {
885 lags.iter()
887 .map(|&lag| {
888 if lag == 0 {
889 F::one()
890 } else {
891 let mut sum = F::zero();
892 for i in 0..n - lag {
893 sum = sum + (data[i] - mean) * (data[i + lag] - mean);
894 }
895 sum / (F::from(n - lag).expect("Failed to convert to float") * variance)
896 }
897 })
898 .collect()
899 } else {
900 parallel_map(&lags, |&lag| {
902 if lag == 0 {
903 F::one()
904 } else if F::simd_available() && n - lag > 64 {
905 let data_start = data.slice(s![..n - lag]);
907 let data_lagged = data.slice(s![lag..]);
908
909 let mean_array = scirs2_core::ndarray::Array1::from_elem(n - lag, mean);
910 let start_centered = F::simd_sub(&data_start, &mean_array.view());
911 let lagged_centered = F::simd_sub(&data_lagged, &mean_array.view());
912
913 let products = F::simd_mul(&start_centered.view(), &lagged_centered.view());
914 let sum = F::simd_sum(&products.view());
915
916 sum / (F::from(n - lag).expect("Failed to convert to float") * variance)
917 } else {
918 let mut sum = F::zero();
920 for i in 0..n - lag {
921 sum = sum + (data[i] - mean) * (data[i + lag] - mean);
922 }
923 sum / (F::from(n - lag).expect("Failed to convert to float") * variance)
924 }
925 })
926 };
927
928 Ok(Array1::from_vec(autocorr))
929}
930
931#[cfg(test)]
932mod tests {
933 use super::*;
934 use approx::assert_relative_eq;
935 use scirs2_core::ndarray::array;
936
937 #[test]
938 fn test_parallel_histogram() {
939 let data = Array1::from_vec((0..10000).map(|i| i as f64 / 100.0).collect());
940 let hist = ParallelHistogram::new(&data.view(), 10).expect("Operation failed");
941
942 let (bins, counts) = hist.get_histogram();
943 assert_eq!(bins.len(), 10);
944 assert_eq!(counts.iter().sum::<usize>(), 10000);
945 }
946
947 #[test]
948 fn test_parallel_kde() {
949 let data = array![1.0, 2.0, 3.0, 4.0, 5.0];
950 let eval_points = Array1::linspace(0.0, 6.0, 100);
951
952 let kde_result = kde_parallel(&data.view(), &eval_points, 0.5).expect("Operation failed");
953 assert_eq!(kde_result.len(), 100);
954
955 let max_idx = kde_result
957 .iter()
958 .enumerate()
959 .max_by(|(_, a), (_, b)| a.partial_cmp(b).expect("Operation failed"))
960 .map(|(idx_, _)| idx_)
961 .expect("Operation failed");
962
963 assert!(max_idx > 40 && max_idx < 60); }
965
966 #[test]
967 fn test_moving_stats() {
968 let data = Array1::from_vec((0..100).map(|i| i as f64).collect());
969 let moving_stats = ParallelMovingStats::new(&data.view(), 10).expect("Operation failed");
970
971 let moving_mean = moving_stats.moving_mean().expect("Operation failed");
972 assert_eq!(moving_mean.len(), 91); assert_relative_eq!(moving_mean[0], 4.5, epsilon = 1e-10);
976
977 assert_relative_eq!(moving_mean[90], 94.5, epsilon = 1e-10);
979 }
980
981 #[test]
982 fn test_pairwise_distances() {
983 let data = array![[0.0, 0.0], [1.0, 0.0], [0.0, 1.0], [1.0, 1.0],];
984
985 let distances =
986 pairwise_distances_parallel(&data.view(), "euclidean").expect("Operation failed");
987
988 for i in 0..4 {
990 assert_relative_eq!(distances[(i, i)], 0.0, epsilon = 1e-10);
991 }
992
993 assert_relative_eq!(distances[(0, 1)], 1.0, epsilon = 1e-10);
995 assert_relative_eq!(distances[(0, 3)], 2.0_f64.sqrt(), epsilon = 1e-10);
996
997 for i in 0..4 {
999 for j in 0..4 {
1000 assert_relative_eq!(distances[(i, j)], distances[(j, i)], epsilon = 1e-10);
1001 }
1002 }
1003 }
1004}