sklears_tree/
parallel.rs

1//! Parallel processing utilities for tree algorithms
2//!
3//! This module provides parallel implementations of tree construction,
4//! prediction, and ensemble methods using rayon.
5
6use scirs2_core::ndarray::{Array1, Array2};
7use scirs2_core::random::thread_rng;
8use scirs2_core::SliceRandomExt; // For shuffle method
9use sklears_core::error::Result;
10
11#[cfg(feature = "parallel")]
12use rayon::prelude::*;
13
14/// Parallel utilities for tree algorithms
15pub struct ParallelUtils;
16
17impl ParallelUtils {
18    /// Execute a function in parallel if the parallel feature is enabled
19    #[cfg(feature = "parallel")]
20    pub fn maybe_parallel_map<T, U, F>(items: Vec<T>, f: F) -> Vec<U>
21    where
22        T: Send + Sync,
23        U: Send,
24        F: Fn(T) -> U + Sync + Send,
25    {
26        items.into_par_iter().map(f).collect()
27    }
28
29    /// Execute a function sequentially if parallel feature is disabled
30    #[cfg(not(feature = "parallel"))]
31    pub fn maybe_parallel_map<T, U, F>(items: Vec<T>, f: F) -> Vec<U>
32    where
33        F: Fn(T) -> U,
34    {
35        items.into_iter().map(f).collect()
36    }
37
38    /// Parallel bootstrap sampling for ensemble methods
39    #[cfg(feature = "parallel")]
40    pub fn parallel_bootstrap_samples(
41        x: &Array2<f64>,
42        y: &Array1<i32>,
43        n_estimators: usize,
44        n_samples: usize,
45        random_seeds: &[u64],
46    ) -> Vec<(Array2<f64>, Array1<i32>)> {
47        (0..n_estimators)
48            .into_par_iter()
49            .map(|i| {
50                let seed = random_seeds.get(i).copied().unwrap_or(42 + i as u64);
51                Self::bootstrap_sample(x, y, n_samples, seed)
52            })
53            .collect()
54    }
55
56    /// Sequential bootstrap sampling fallback
57    #[cfg(not(feature = "parallel"))]
58    pub fn parallel_bootstrap_samples(
59        x: &Array2<f64>,
60        y: &Array1<i32>,
61        n_estimators: usize,
62        n_samples: usize,
63        random_seeds: &[u64],
64    ) -> Vec<(Array2<f64>, Array1<i32>)> {
65        (0..n_estimators)
66            .map(|i| {
67                let seed = random_seeds.get(i).copied().unwrap_or(42 + i as u64);
68                Self::bootstrap_sample(x, y, n_samples, seed)
69            })
70            .collect()
71    }
72
73    /// Create a bootstrap sample from the data
74    fn bootstrap_sample(
75        x: &Array2<f64>,
76        y: &Array1<i32>,
77        n_samples: usize,
78        seed: u64,
79    ) -> (Array2<f64>, Array1<i32>) {
80        let mut rng = thread_rng();
81        let original_n_samples = x.nrows();
82        let n_features = x.ncols();
83
84        let mut bootstrap_x = Array2::zeros((n_samples, n_features));
85        let mut bootstrap_y = Array1::zeros(n_samples);
86
87        for i in 0..n_samples {
88            let idx = rng.gen_range(0..original_n_samples);
89            bootstrap_x.row_mut(i).assign(&x.row(idx));
90            bootstrap_y[i] = y[idx];
91        }
92
93        (bootstrap_x, bootstrap_y)
94    }
95
96    /// Parallel prediction aggregation for ensemble methods
97    #[cfg(feature = "parallel")]
98    pub fn parallel_predict_proba_aggregate(predictions: Vec<Array2<f64>>) -> Result<Array2<f64>> {
99        if predictions.is_empty() {
100            return Err(sklears_core::error::SklearsError::InvalidData {
101                reason: "No predictions to aggregate".to_string(),
102            });
103        }
104
105        let n_samples = predictions[0].nrows();
106        let n_classes = predictions[0].ncols();
107
108        // Parallel aggregation of predictions
109        let aggregated: Vec<Vec<f64>> = (0..n_samples)
110            .into_par_iter()
111            .map(|sample_idx| {
112                let mut class_votes = vec![0.0; n_classes];
113                for pred_matrix in &predictions {
114                    let row = pred_matrix.row(sample_idx);
115                    for (class_idx, &prob) in row.iter().enumerate() {
116                        class_votes[class_idx] += prob;
117                    }
118                }
119
120                // Normalize by number of estimators
121                let n_estimators = predictions.len() as f64;
122                class_votes
123                    .iter()
124                    .map(|&vote| vote / n_estimators)
125                    .collect()
126            })
127            .collect();
128
129        // Convert back to Array2
130        let mut result = Array2::zeros((n_samples, n_classes));
131        for (i, row) in aggregated.iter().enumerate() {
132            for (j, &val) in row.iter().enumerate() {
133                result[[i, j]] = val;
134            }
135        }
136
137        Ok(result)
138    }
139
140    /// Sequential prediction aggregation fallback
141    #[cfg(not(feature = "parallel"))]
142    pub fn parallel_predict_proba_aggregate(predictions: Vec<Array2<f64>>) -> Result<Array2<f64>> {
143        if predictions.is_empty() {
144            return Err(sklears_core::error::SklearsError::InvalidData {
145                reason: "No predictions to aggregate".to_string(),
146            });
147        }
148
149        let n_samples = predictions[0].nrows();
150        let n_classes = predictions[0].ncols();
151        let n_estimators = predictions.len() as f64;
152
153        let mut result = Array2::zeros((n_samples, n_classes));
154
155        for pred_matrix in predictions {
156            result = result + pred_matrix;
157        }
158
159        result = result / n_estimators;
160        Ok(result)
161    }
162
163    /// Parallel feature importance calculation using permutation
164    #[cfg(feature = "parallel")]
165    pub fn parallel_permutation_importance<F>(
166        x: &Array2<f64>,
167        y: &Array1<i32>,
168        baseline_score: f64,
169        scoring_fn: F,
170        n_repeats: usize,
171        random_seeds: &[u64],
172    ) -> Result<Array1<f64>>
173    where
174        F: Fn(&Array2<f64>, &Array1<i32>) -> Result<f64> + Sync + Send,
175    {
176        let n_features = x.ncols();
177
178        let importances: Result<Vec<f64>> = (0..n_features)
179            .into_par_iter()
180            .map(|feature_idx| {
181                let mut importance_scores = Vec::new();
182
183                for repeat in 0..n_repeats {
184                    let seed = random_seeds
185                        .get(repeat)
186                        .copied()
187                        .unwrap_or(42 + repeat as u64);
188                    let mut x_permuted = x.clone();
189                    Self::permute_feature(&mut x_permuted, feature_idx, seed)?;
190
191                    let permuted_score = scoring_fn(&x_permuted, y)?;
192                    importance_scores.push(baseline_score - permuted_score);
193                }
194
195                // Average importance across repeats
196                let avg_importance =
197                    importance_scores.iter().sum::<f64>() / importance_scores.len() as f64;
198                Ok(avg_importance)
199            })
200            .collect();
201
202        let importances = importances?;
203        Ok(Array1::from_vec(importances))
204    }
205
206    /// Sequential feature importance calculation fallback
207    #[cfg(not(feature = "parallel"))]
208    pub fn parallel_permutation_importance<F>(
209        x: &Array2<f64>,
210        y: &Array1<i32>,
211        baseline_score: f64,
212        scoring_fn: F,
213        n_repeats: usize,
214        random_seeds: &[u64],
215    ) -> Result<Array1<f64>>
216    where
217        F: Fn(&Array2<f64>, &Array1<i32>) -> Result<f64>,
218    {
219        let n_features = x.ncols();
220        let mut importances = Vec::with_capacity(n_features);
221
222        for feature_idx in 0..n_features {
223            let mut importance_scores = Vec::new();
224
225            for repeat in 0..n_repeats {
226                let seed = random_seeds
227                    .get(repeat)
228                    .copied()
229                    .unwrap_or(42 + repeat as u64);
230                let mut x_permuted = x.clone();
231                Self::permute_feature(&mut x_permuted, feature_idx, seed)?;
232
233                let permuted_score = scoring_fn(&x_permuted, y)?;
234                importance_scores.push(baseline_score - permuted_score);
235            }
236
237            let avg_importance =
238                importance_scores.iter().sum::<f64>() / importance_scores.len() as f64;
239            importances.push(avg_importance);
240        }
241
242        Ok(Array1::from_vec(importances))
243    }
244
245    /// Permute values in a specific feature column
246    fn permute_feature(x: &mut Array2<f64>, feature_idx: usize, seed: u64) -> Result<()> {
247        let mut rng = thread_rng();
248        let mut column_values: Vec<f64> = x.column(feature_idx).to_vec();
249        column_values.shuffle(&mut rng);
250
251        for (i, &value) in column_values.iter().enumerate() {
252            x[[i, feature_idx]] = value;
253        }
254
255        Ok(())
256    }
257
258    /// Determine optimal number of threads to use
259    pub fn optimal_n_threads(n_jobs: Option<i32>) -> usize {
260        match n_jobs {
261            Some(n) if n > 0 => n as usize,
262            Some(-1) => num_cpus::get(),
263            Some(n) if n < -1 => (num_cpus::get() as i32 + n + 1).max(1) as usize,
264            _ => 1,
265        }
266    }
267
268    /// Initialize thread pool with specified number of threads
269    #[cfg(feature = "parallel")]
270    pub fn with_thread_pool<T, F>(n_threads: usize, f: F) -> T
271    where
272        F: FnOnce() -> T + Send,
273        T: Send,
274    {
275        if n_threads <= 1 {
276            f()
277        } else {
278            let pool = rayon::ThreadPoolBuilder::new()
279                .num_threads(n_threads)
280                .build()
281                .unwrap_or_else(|_| rayon::ThreadPoolBuilder::new().build().unwrap());
282            pool.install(f)
283        }
284    }
285
286    /// Sequential execution fallback
287    #[cfg(not(feature = "parallel"))]
288    pub fn with_thread_pool<T, F>(n_threads: usize, f: F) -> T
289    where
290        F: FnOnce() -> T,
291    {
292        f() // Always execute sequentially
293    }
294
295    /// Parallel evaluation of all features to find the best split
296    #[cfg(feature = "parallel")]
297    pub fn parallel_find_best_split(
298        x: &Array2<f64>,
299        y: &Array1<f64>,
300        sample_indices: &[usize],
301        config: &ParallelFeatureConfig,
302    ) -> Result<Option<FeatureSplit>> {
303        let n_features = x.ncols();
304        let n_samples = sample_indices.len();
305
306        if n_samples < config.min_samples_split {
307            return Ok(None);
308        }
309
310        // Select features to evaluate
311        let feature_indices = Self::select_features(n_features, config)?;
312
313        // Parallel evaluation of features
314        let feature_splits: Vec<Option<FeatureSplit>> = feature_indices
315            .into_par_iter()
316            .map(|feature_idx| {
317                Self::evaluate_feature_split(x, y, sample_indices, feature_idx, config)
318                    .unwrap_or(None)
319            })
320            .collect();
321
322        // Find the best split among all features
323        let mut best_split = None;
324        let mut best_score = f64::NEG_INFINITY;
325
326        for split in feature_splits.into_iter().flatten() {
327            if split.is_valid() && split.quality_score() > best_score {
328                best_score = split.quality_score();
329                best_split = Some(split);
330            }
331        }
332
333        Ok(best_split)
334    }
335
336    /// Sequential fallback for finding best split
337    #[cfg(not(feature = "parallel"))]
338    pub fn parallel_find_best_split(
339        x: &Array2<f64>,
340        y: &Array1<f64>,
341        sample_indices: &[usize],
342        config: &ParallelFeatureConfig,
343    ) -> Result<Option<FeatureSplit>> {
344        let n_features = x.ncols();
345        let n_samples = sample_indices.len();
346
347        if n_samples < config.min_samples_split {
348            return Ok(None);
349        }
350
351        let feature_indices = Self::select_features(n_features, config)?;
352
353        let mut best_split = None;
354        let mut best_score = f64::NEG_INFINITY;
355
356        for feature_idx in feature_indices {
357            if let Some(split) =
358                Self::evaluate_feature_split(x, y, sample_indices, feature_idx, config)?
359            {
360                if split.is_valid() && split.quality_score() > best_score {
361                    best_score = split.quality_score();
362                    best_split = Some(split);
363                }
364            }
365        }
366
367        Ok(best_split)
368    }
369
370    /// Parallel evaluation of classification splits
371    #[cfg(feature = "parallel")]
372    pub fn parallel_find_best_classification_split(
373        x: &Array2<f64>,
374        y: &Array1<i32>,
375        sample_indices: &[usize],
376        n_classes: usize,
377        config: &ParallelFeatureConfig,
378    ) -> Result<Option<FeatureSplit>> {
379        let n_features = x.ncols();
380        let n_samples = sample_indices.len();
381
382        if n_samples < config.min_samples_split {
383            return Ok(None);
384        }
385
386        let feature_indices = Self::select_features(n_features, config)?;
387
388        // Parallel evaluation of features for classification
389        let feature_splits: Vec<Option<FeatureSplit>> = feature_indices
390            .into_par_iter()
391            .map(|feature_idx| {
392                Self::evaluate_classification_split(
393                    x,
394                    y,
395                    sample_indices,
396                    feature_idx,
397                    n_classes,
398                    config,
399                )
400                .unwrap_or(None)
401            })
402            .collect();
403
404        // Find the best split
405        let mut best_split = None;
406        let mut best_score = f64::NEG_INFINITY;
407
408        for split in feature_splits.into_iter().flatten() {
409            if split.is_valid() && split.quality_score() > best_score {
410                best_score = split.quality_score();
411                best_split = Some(split);
412            }
413        }
414
415        Ok(best_split)
416    }
417
418    /// Sequential fallback for classification splits
419    #[cfg(not(feature = "parallel"))]
420    pub fn parallel_find_best_classification_split(
421        x: &Array2<f64>,
422        y: &Array1<i32>,
423        sample_indices: &[usize],
424        n_classes: usize,
425        config: &ParallelFeatureConfig,
426    ) -> Result<Option<FeatureSplit>> {
427        let n_features = x.ncols();
428        let n_samples = sample_indices.len();
429
430        if n_samples < config.min_samples_split {
431            return Ok(None);
432        }
433
434        let feature_indices = Self::select_features(n_features, config)?;
435
436        let mut best_split = None;
437        let mut best_score = f64::NEG_INFINITY;
438
439        for feature_idx in feature_indices {
440            if let Some(split) = Self::evaluate_classification_split(
441                x,
442                y,
443                sample_indices,
444                feature_idx,
445                n_classes,
446                config,
447            )? {
448                if split.is_valid() && split.quality_score() > best_score {
449                    best_score = split.quality_score();
450                    best_split = Some(split);
451                }
452            }
453        }
454
455        Ok(best_split)
456    }
457
458    /// Parallel computation of feature statistics for multiple features
459    #[cfg(feature = "parallel")]
460    pub fn parallel_compute_feature_stats(
461        x: &Array2<f64>,
462        sample_indices: &[usize],
463        feature_indices: &[usize],
464    ) -> Vec<FeatureStats> {
465        feature_indices
466            .par_iter()
467            .map(|&feature_idx| Self::compute_feature_stats(x, sample_indices, feature_idx))
468            .collect()
469    }
470
471    /// Sequential fallback for feature statistics
472    #[cfg(not(feature = "parallel"))]
473    pub fn parallel_compute_feature_stats(
474        x: &Array2<f64>,
475        sample_indices: &[usize],
476        feature_indices: &[usize],
477    ) -> Vec<FeatureStats> {
478        feature_indices
479            .iter()
480            .map(|&feature_idx| Self::compute_feature_stats(x, sample_indices, feature_idx))
481            .collect()
482    }
483
484    /// Select features to evaluate based on configuration
485    fn select_features(n_features: usize, config: &ParallelFeatureConfig) -> Result<Vec<usize>> {
486        let max_features = config.max_features.unwrap_or(n_features);
487        let n_features_to_use = max_features.min(n_features);
488
489        if n_features_to_use >= n_features {
490            // Use all features
491            Ok((0..n_features).collect())
492        } else {
493            // Randomly sample features
494            let mut rng = thread_rng();
495
496            let mut all_features: Vec<usize> = (0..n_features).collect();
497            all_features.shuffle(&mut rng);
498            all_features.truncate(n_features_to_use);
499
500            Ok(all_features)
501        }
502    }
503
504    /// Evaluate a specific feature for regression splits
505    fn evaluate_feature_split(
506        x: &Array2<f64>,
507        y: &Array1<f64>,
508        sample_indices: &[usize],
509        feature_idx: usize,
510        config: &ParallelFeatureConfig,
511    ) -> Result<Option<FeatureSplit>> {
512        let n_samples = sample_indices.len();
513
514        if n_samples < config.min_samples_split {
515            return Ok(None);
516        }
517
518        // Collect feature values and targets for samples
519        let mut feature_target_pairs: Vec<(f64, f64)> = sample_indices
520            .iter()
521            .map(|&idx| (x[[idx, feature_idx]], y[idx]))
522            .collect();
523
524        // Sort by feature value
525        feature_target_pairs.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap());
526
527        let mut best_threshold = 0.0;
528        let mut best_impurity_reduction = f64::NEG_INFINITY;
529        let mut best_n_left = 0;
530        let mut best_n_right = 0;
531
532        // Calculate initial statistics
533        let total_sum: f64 = feature_target_pairs.iter().map(|(_, target)| target).sum();
534        let total_sum_sq: f64 = feature_target_pairs
535            .iter()
536            .map(|(_, target)| target * target)
537            .sum();
538        let total_variance =
539            (total_sum_sq / n_samples as f64) - (total_sum / n_samples as f64).powi(2);
540
541        // Try different split points
542        for i in 1..n_samples {
543            let current_val = feature_target_pairs[i - 1].0;
544            let next_val = feature_target_pairs[i].0;
545
546            if (next_val - current_val).abs() < 1e-10 {
547                continue; // Skip if values are the same
548            }
549
550            let n_left = i;
551            let n_right = n_samples - i;
552
553            if n_left < config.min_samples_leaf || n_right < config.min_samples_leaf {
554                continue;
555            }
556
557            // Calculate left and right statistics
558            let left_sum: f64 = feature_target_pairs[..i]
559                .iter()
560                .map(|(_, target)| target)
561                .sum();
562            let left_sum_sq: f64 = feature_target_pairs[..i]
563                .iter()
564                .map(|(_, target)| target * target)
565                .sum();
566            let left_variance = (left_sum_sq / n_left as f64) - (left_sum / n_left as f64).powi(2);
567
568            let right_sum = total_sum - left_sum;
569            let right_sum_sq = total_sum_sq - left_sum_sq;
570            let right_variance =
571                (right_sum_sq / n_right as f64) - (right_sum / n_right as f64).powi(2);
572
573            // Calculate weighted variance reduction
574            let weighted_variance = (n_left as f64 / n_samples as f64) * left_variance
575                + (n_right as f64 / n_samples as f64) * right_variance;
576            let impurity_reduction = total_variance - weighted_variance;
577
578            if impurity_reduction > best_impurity_reduction {
579                best_impurity_reduction = impurity_reduction;
580                best_threshold = (current_val + next_val) / 2.0;
581                best_n_left = n_left;
582                best_n_right = n_right;
583            }
584        }
585
586        if best_impurity_reduction > config.min_impurity_decrease {
587            Ok(Some(FeatureSplit::new(
588                feature_idx,
589                best_threshold,
590                best_impurity_reduction,
591                best_n_left,
592                best_n_right,
593            )))
594        } else {
595            Ok(None)
596        }
597    }
598
599    /// Evaluate a specific feature for classification splits
600    fn evaluate_classification_split(
601        x: &Array2<f64>,
602        y: &Array1<i32>,
603        sample_indices: &[usize],
604        feature_idx: usize,
605        n_classes: usize,
606        config: &ParallelFeatureConfig,
607    ) -> Result<Option<FeatureSplit>> {
608        let n_samples = sample_indices.len();
609
610        if n_samples < config.min_samples_split {
611            return Ok(None);
612        }
613
614        // Collect feature values and targets
615        let mut feature_target_pairs: Vec<(f64, i32)> = sample_indices
616            .iter()
617            .map(|&idx| (x[[idx, feature_idx]], y[idx]))
618            .collect();
619
620        // Sort by feature value
621        feature_target_pairs.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap());
622
623        let mut best_threshold = 0.0;
624        let mut best_information_gain = f64::NEG_INFINITY;
625        let mut best_n_left = 0;
626        let mut best_n_right = 0;
627
628        // Calculate initial entropy
629        let mut class_counts = vec![0; n_classes];
630        for (_, class) in &feature_target_pairs {
631            if *class >= 0 && (*class as usize) < n_classes {
632                class_counts[*class as usize] += 1;
633            }
634        }
635
636        let initial_entropy = Self::calculate_entropy(&class_counts, n_samples);
637
638        // Try different split points
639        for i in 1..n_samples {
640            let current_val = feature_target_pairs[i - 1].0;
641            let next_val = feature_target_pairs[i].0;
642
643            if (next_val - current_val).abs() < 1e-10 {
644                continue;
645            }
646
647            let n_left = i;
648            let n_right = n_samples - i;
649
650            if n_left < config.min_samples_leaf || n_right < config.min_samples_leaf {
651                continue;
652            }
653
654            // Calculate left and right class distributions
655            let mut left_counts = vec![0; n_classes];
656            let mut right_counts = vec![0; n_classes];
657
658            for j in 0..i {
659                let class = feature_target_pairs[j].1;
660                if class >= 0 && (class as usize) < n_classes {
661                    left_counts[class as usize] += 1;
662                }
663            }
664
665            for j in i..n_samples {
666                let class = feature_target_pairs[j].1;
667                if class >= 0 && (class as usize) < n_classes {
668                    right_counts[class as usize] += 1;
669                }
670            }
671
672            // Calculate entropies
673            let left_entropy = Self::calculate_entropy(&left_counts, n_left);
674            let right_entropy = Self::calculate_entropy(&right_counts, n_right);
675
676            // Calculate weighted entropy
677            let weighted_entropy = (n_left as f64 / n_samples as f64) * left_entropy
678                + (n_right as f64 / n_samples as f64) * right_entropy;
679
680            let information_gain = initial_entropy - weighted_entropy;
681
682            if information_gain > best_information_gain {
683                best_information_gain = information_gain;
684                best_threshold = (current_val + next_val) / 2.0;
685                best_n_left = n_left;
686                best_n_right = n_right;
687            }
688        }
689
690        if best_information_gain > config.min_impurity_decrease {
691            Ok(Some(
692                FeatureSplit::new(
693                    feature_idx,
694                    best_threshold,
695                    best_information_gain, // Use information gain as impurity reduction
696                    best_n_left,
697                    best_n_right,
698                )
699                .with_information_gain(best_information_gain),
700            ))
701        } else {
702            Ok(None)
703        }
704    }
705
706    /// Calculate entropy for classification
707    fn calculate_entropy(class_counts: &[usize], total_samples: usize) -> f64 {
708        if total_samples == 0 {
709            return 0.0;
710        }
711
712        let mut entropy = 0.0;
713        for &count in class_counts {
714            if count > 0 {
715                let probability = count as f64 / total_samples as f64;
716                entropy -= probability * probability.log2();
717            }
718        }
719
720        entropy
721    }
722
723    /// Compute statistics for a single feature
724    fn compute_feature_stats(
725        x: &Array2<f64>,
726        sample_indices: &[usize],
727        feature_idx: usize,
728    ) -> FeatureStats {
729        let values: Vec<f64> = sample_indices
730            .iter()
731            .map(|&idx| x[[idx, feature_idx]])
732            .collect();
733
734        if values.is_empty() {
735            return FeatureStats::default();
736        }
737
738        let mut sorted_values = values.clone();
739        sorted_values.sort_by(|a, b| a.partial_cmp(b).unwrap());
740
741        let n = values.len();
742        let sum: f64 = values.iter().sum();
743        let mean = sum / n as f64;
744
745        let variance = values.iter().map(|&x| (x - mean).powi(2)).sum::<f64>() / n as f64;
746
747        let std_dev = variance.sqrt();
748        let min = sorted_values[0];
749        let max = sorted_values[n - 1];
750        let median = if n % 2 == 0 {
751            (sorted_values[n / 2 - 1] + sorted_values[n / 2]) / 2.0
752        } else {
753            sorted_values[n / 2]
754        };
755
756        // Count unique values
757        let mut unique_values = sorted_values.clone();
758        unique_values.dedup_by(|a, b| (*a - *b).abs() < 1e-10);
759        let n_unique = unique_values.len();
760
761        FeatureStats {
762            feature_idx,
763            mean,
764            std_dev,
765            min,
766            max,
767            median,
768            n_unique,
769            n_samples: n,
770        }
771    }
772}
773
774/// Parallel iterator extensions for tree algorithms
775pub trait ParallelTreeExt<T> {
776    /// Process items in parallel if the parallel feature is enabled
777    fn maybe_parallel_process<U, F>(self, f: F) -> Vec<U>
778    where
779        T: Send + Sync,
780        U: Send,
781        F: Fn(T) -> U + Sync + Send;
782}
783
784impl<I, T> ParallelTreeExt<T> for I
785where
786    I: IntoIterator<Item = T>,
787    I::IntoIter: Send,
788    T: Send + Sync,
789{
790    #[cfg(feature = "parallel")]
791    fn maybe_parallel_process<U, F>(self, f: F) -> Vec<U>
792    where
793        T: Send + Sync,
794        U: Send,
795        F: Fn(T) -> U + Sync + Send,
796    {
797        self.into_iter()
798            .collect::<Vec<_>>()
799            .into_par_iter()
800            .map(f)
801            .collect()
802    }
803
804    #[cfg(not(feature = "parallel"))]
805    fn maybe_parallel_process<U, F>(self, f: F) -> Vec<U>
806    where
807        F: Fn(T) -> U,
808    {
809        self.into_iter().map(f).collect()
810    }
811}
812
813/// Parallel feature evaluation for tree construction
814#[derive(Debug, Clone)]
815pub struct FeatureSplit {
816    /// Feature index
817    pub feature_idx: usize,
818    /// Best threshold for this feature
819    pub threshold: f64,
820    /// Impurity reduction achieved by this split
821    pub impurity_reduction: f64,
822    /// Number of samples that would go left
823    pub n_left: usize,
824    /// Number of samples that would go right
825    pub n_right: usize,
826    /// Information gain (for classification)
827    pub information_gain: Option<f64>,
828}
829
830impl FeatureSplit {
831    /// Create a new feature split
832    pub fn new(
833        feature_idx: usize,
834        threshold: f64,
835        impurity_reduction: f64,
836        n_left: usize,
837        n_right: usize,
838    ) -> Self {
839        Self {
840            feature_idx,
841            threshold,
842            impurity_reduction,
843            n_left,
844            n_right,
845            information_gain: None,
846        }
847    }
848
849    /// Set information gain for classification
850    pub fn with_information_gain(mut self, gain: f64) -> Self {
851        self.information_gain = Some(gain);
852        self
853    }
854
855    /// Check if this split is valid (has samples on both sides)
856    pub fn is_valid(&self) -> bool {
857        self.n_left > 0 && self.n_right > 0
858    }
859
860    /// Get the split quality score
861    pub fn quality_score(&self) -> f64 {
862        self.information_gain.unwrap_or(self.impurity_reduction)
863    }
864}
865
866/// Configuration for parallel feature evaluation
867#[derive(Debug, Clone)]
868pub struct ParallelFeatureConfig {
869    /// Minimum number of samples required to split
870    pub min_samples_split: usize,
871    /// Minimum number of samples required in a leaf
872    pub min_samples_leaf: usize,
873    /// Minimum impurity decrease required for a split
874    pub min_impurity_decrease: f64,
875    /// Maximum number of features to consider (None = all features)
876    pub max_features: Option<usize>,
877    /// Random seed for feature sampling
878    pub random_state: Option<u64>,
879}
880
881impl Default for ParallelFeatureConfig {
882    fn default() -> Self {
883        Self {
884            min_samples_split: 2,
885            min_samples_leaf: 1,
886            min_impurity_decrease: 0.0,
887            max_features: None,
888            random_state: None,
889        }
890    }
891}
892
893/// Statistics for a feature
894#[derive(Debug, Clone)]
895pub struct FeatureStats {
896    /// Feature index
897    pub feature_idx: usize,
898    /// Mean value
899    pub mean: f64,
900    /// Standard deviation
901    pub std_dev: f64,
902    /// Minimum value
903    pub min: f64,
904    /// Maximum value
905    pub max: f64,
906    /// Median value
907    pub median: f64,
908    /// Number of unique values
909    pub n_unique: usize,
910    /// Number of samples
911    pub n_samples: usize,
912}
913
914impl Default for FeatureStats {
915    fn default() -> Self {
916        Self {
917            feature_idx: 0,
918            mean: 0.0,
919            std_dev: 0.0,
920            min: 0.0,
921            max: 0.0,
922            median: 0.0,
923            n_unique: 0,
924            n_samples: 0,
925        }
926    }
927}
928
929impl FeatureStats {
930    /// Check if this feature has enough variation to be useful for splitting
931    pub fn is_informative(&self) -> bool {
932        self.n_unique > 1 && self.std_dev > 1e-10
933    }
934
935    /// Get the range of the feature
936    pub fn range(&self) -> f64 {
937        self.max - self.min
938    }
939
940    /// Get coefficient of variation (std_dev / mean)
941    pub fn coefficient_of_variation(&self) -> f64 {
942        if self.mean.abs() > 1e-10 {
943            self.std_dev / self.mean.abs()
944        } else {
945            f64::INFINITY
946        }
947    }
948}