sklears_feature_selection/
parallel.rs1use rayon::prelude::*;
7use scirs2_core::ndarray::{Array1, Array2};
8use sklears_core::{error::Result as SklResult, types::Float};
9
10pub trait ParallelFeatureEvaluator {
15 fn evaluate_features_parallel(
17 &self,
18 x: &Array2<Float>,
19 y: &Array1<i32>,
20 ) -> SklResult<Array1<Float>>;
21
22 fn evaluate_features_subset_parallel(
24 &self,
25 x: &Array2<Float>,
26 y: &Array1<i32>,
27 feature_indices: &[usize],
28 ) -> SklResult<Array1<Float>>;
29}
30
31#[derive(Debug, Clone)]
33pub struct ParallelUnivariateScorer<F>
34where
35 F: Fn(&Array1<Float>, &Array1<i32>) -> SklResult<Float> + Sync + Send,
36{
37 score_func: F,
38}
39
40impl<F> ParallelUnivariateScorer<F>
41where
42 F: Fn(&Array1<Float>, &Array1<i32>) -> SklResult<Float> + Sync + Send,
43{
44 pub fn new(score_func: F) -> Self {
46 Self { score_func }
47 }
48
49 pub fn evaluate_parallel(
51 &self,
52 x: &Array2<Float>,
53 y: &Array1<i32>,
54 ) -> SklResult<Array1<Float>> {
55 let n_features = x.ncols();
56
57 let scores: SklResult<Vec<Float>> = (0..n_features)
59 .into_par_iter()
60 .map(|feature_idx| {
61 let feature_column = x.column(feature_idx);
62 let feature_owned = feature_column.to_owned();
63 (self.score_func)(&feature_owned, y)
64 })
65 .collect();
66
67 scores.map(Array1::from_vec)
68 }
69
70 pub fn evaluate_subset_parallel(
72 &self,
73 x: &Array2<Float>,
74 y: &Array1<i32>,
75 feature_indices: &[usize],
76 ) -> SklResult<Array1<Float>> {
77 let scores: SklResult<Vec<Float>> = feature_indices
78 .par_iter()
79 .map(|&feature_idx| {
80 let feature_column = x.column(feature_idx);
81 let feature_owned = feature_column.to_owned();
82 (self.score_func)(&feature_owned, y)
83 })
84 .collect();
85
86 scores.map(Array1::from_vec)
87 }
88}
89
90#[derive(Debug, Clone)]
92pub struct ParallelUnivariateRegressionScorer<F>
93where
94 F: Fn(&Array1<Float>, &Array1<Float>) -> SklResult<Float> + Sync + Send,
95{
96 score_func: F,
97}
98
99impl<F> ParallelUnivariateRegressionScorer<F>
100where
101 F: Fn(&Array1<Float>, &Array1<Float>) -> SklResult<Float> + Sync + Send,
102{
103 pub fn new(score_func: F) -> Self {
105 Self { score_func }
106 }
107
108 pub fn evaluate_parallel(
110 &self,
111 x: &Array2<Float>,
112 y: &Array1<Float>,
113 ) -> SklResult<Array1<Float>> {
114 let n_features = x.ncols();
115
116 let scores: SklResult<Vec<Float>> = (0..n_features)
117 .into_par_iter()
118 .map(|feature_idx| {
119 let feature_column = x.column(feature_idx);
120 let feature_owned = feature_column.to_owned();
121 (self.score_func)(&feature_owned, y)
122 })
123 .collect();
124
125 scores.map(Array1::from_vec)
126 }
127}
128
129pub struct ParallelFeatureRanker;
131
132impl ParallelFeatureRanker {
133 pub fn rank_features_parallel(scores: &Array1<Float>) -> Vec<usize> {
135 let mut indexed_scores: Vec<(usize, Float)> = scores
136 .iter()
137 .enumerate()
138 .map(|(idx, &score)| (idx, score))
139 .collect();
140
141 if indexed_scores.len() > 1000 {
143 indexed_scores.par_sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
144 } else {
145 indexed_scores.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
146 }
147
148 indexed_scores.into_iter().map(|(idx, _)| idx).collect()
149 }
150
151 pub fn select_top_k_parallel(scores: &Array1<Float>, k: usize) -> Vec<usize> {
153 let ranked_features = Self::rank_features_parallel(scores);
154 ranked_features.into_iter().take(k).collect()
155 }
156}
157
158pub struct ParallelCorrelationComputer;
160
161impl ParallelCorrelationComputer {
162 pub fn compute_feature_target_correlation_parallel(
164 x: &Array2<Float>,
165 y: &Array1<i32>,
166 ) -> SklResult<Array1<Float>> {
167 let n_features = x.ncols();
168 let y_float: Array1<Float> = y.mapv(|v| v as Float);
169
170 let correlations: SklResult<Vec<Float>> = (0..n_features)
171 .into_par_iter()
172 .map(|feature_idx| {
173 let feature_column = x.column(feature_idx);
174 let feature_owned = feature_column.to_owned();
175 Self::compute_correlation(&feature_owned, &y_float)
176 })
177 .collect();
178
179 correlations.map(Array1::from_vec)
180 }
181
182 pub fn compute_feature_target_correlation_regression_parallel(
184 x: &Array2<Float>,
185 y: &Array1<Float>,
186 ) -> SklResult<Array1<Float>> {
187 let n_features = x.ncols();
188
189 let correlations: SklResult<Vec<Float>> = (0..n_features)
190 .into_par_iter()
191 .map(|feature_idx| {
192 let feature_column = x.column(feature_idx);
193 let feature_owned = feature_column.to_owned();
194 Self::compute_correlation(&feature_owned, y)
195 })
196 .collect();
197
198 correlations.map(Array1::from_vec)
199 }
200
201 pub fn compute_feature_correlation_matrix_parallel(
203 x: &Array2<Float>,
204 ) -> SklResult<Array2<Float>> {
205 let n_features = x.ncols();
206 let mut correlation_matrix = Array2::zeros((n_features, n_features));
207
208 let upper_triangular: SklResult<Vec<((usize, usize), Float)>> = (0..n_features)
210 .into_par_iter()
211 .flat_map(|i| {
212 (i..n_features).into_par_iter().map(move |j| {
213 let corr = if i == j {
214 Ok(1.0)
215 } else {
216 let feature_i = x.column(i).to_owned();
217 let feature_j = x.column(j).to_owned();
218 Self::compute_correlation(&feature_i, &feature_j)
219 };
220 corr.map(|c| ((i, j), c))
221 })
222 })
223 .collect();
224
225 for ((i, j), corr) in upper_triangular? {
227 correlation_matrix[[i, j]] = corr;
228 if i != j {
229 correlation_matrix[[j, i]] = corr;
230 }
231 }
232
233 Ok(correlation_matrix)
234 }
235
236 fn compute_correlation(x: &Array1<Float>, y: &Array1<Float>) -> SklResult<Float> {
238 let n = x.len();
239 if n != y.len() {
240 return Err(sklears_core::error::SklearsError::InvalidInput(
241 "Arrays must have the same length".to_string(),
242 ));
243 }
244
245 if n < 2 {
246 return Ok(0.0);
247 }
248
249 let mean_x = x.mean().unwrap();
250 let mean_y = y.mean().unwrap();
251
252 let mut sum_xy = 0.0;
253 let mut sum_xx = 0.0;
254 let mut sum_yy = 0.0;
255
256 for i in 0..n {
257 let dx = x[i] - mean_x;
258 let dy = y[i] - mean_y;
259 sum_xy += dx * dy;
260 sum_xx += dx * dx;
261 sum_yy += dy * dy;
262 }
263
264 let denominator = (sum_xx * sum_yy).sqrt();
265 if denominator < 1e-10 {
266 Ok(0.0)
267 } else {
268 Ok(sum_xy / denominator)
269 }
270 }
271}
272
273pub struct ParallelVarianceComputer;
275
276impl ParallelVarianceComputer {
277 pub fn compute_feature_variances_parallel(x: &Array2<Float>) -> Array1<Float> {
279 let n_features = x.ncols();
280
281 let variances: Vec<Float> = (0..n_features)
282 .into_par_iter()
283 .map(|feature_idx| {
284 let feature_column = x.column(feature_idx);
285 Self::compute_variance(&feature_column.to_owned())
286 })
287 .collect();
288
289 Array1::from_vec(variances)
290 }
291
292 fn compute_variance(x: &Array1<Float>) -> Float {
294 let n = x.len();
295 if n < 2 {
296 return 0.0;
297 }
298
299 let mean = x.mean().unwrap();
300 let sum_sq_diff: Float = x.iter().map(|&val| (val - mean).powi(2)).sum();
301 sum_sq_diff / (n - 1) as Float
302 }
303}
304
305pub struct ParallelSelectionUtils;
307
308impl ParallelSelectionUtils {
309 pub fn apply_threshold_parallel(scores: &Array1<Float>, threshold: Float) -> Vec<usize> {
311 (0..scores.len())
312 .into_par_iter()
313 .filter(|&i| scores[i] >= threshold)
314 .collect()
315 }
316
317 pub fn apply_percentile_parallel(scores: &Array1<Float>, percentile: Float) -> Vec<usize> {
319 let mut sorted_scores = scores.to_vec();
320 sorted_scores.par_sort_by(|a, b| b.partial_cmp(a).unwrap());
321
322 let threshold_idx = ((1.0 - percentile / 100.0) * sorted_scores.len() as Float) as usize;
323 let threshold = if threshold_idx < sorted_scores.len() {
324 sorted_scores[threshold_idx]
325 } else {
326 Float::NEG_INFINITY
327 };
328
329 Self::apply_threshold_parallel(scores, threshold)
330 }
331}
332
333#[allow(non_snake_case)]
334#[cfg(test)]
335mod tests {
336 use super::*;
337 use scirs2_core::ndarray::Array2;
338
339 #[test]
340 fn test_parallel_univariate_scorer() {
341 let x = Array2::from_shape_vec(
342 (5, 3),
343 vec![
344 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0,
345 ],
346 )
347 .unwrap();
348 let y = Array1::from_vec(vec![0, 1, 0, 1, 0]);
349
350 let scorer =
351 ParallelUnivariateScorer::new(|feature: &Array1<Float>, target: &Array1<i32>| {
352 Ok(feature
353 .iter()
354 .zip(target.iter())
355 .map(|(f, t)| f * (*t as Float))
356 .sum())
357 });
358
359 let scores = scorer.evaluate_parallel(&x, &y).unwrap();
360 assert_eq!(scores.len(), 3);
361 }
362
363 #[test]
364 fn test_parallel_feature_ranker() {
365 let scores = Array1::from_vec(vec![0.1, 0.5, 0.3, 0.8, 0.2]);
366 let ranked = ParallelFeatureRanker::rank_features_parallel(&scores);
367
368 assert_eq!(ranked[0], 3); assert_eq!(ranked[1], 1); assert_eq!(ranked[2], 2); }
372
373 #[test]
374 fn test_parallel_correlation_computer() {
375 let x =
376 Array2::from_shape_vec((4, 2), vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]).unwrap();
377 let y = Array1::from_vec(vec![0, 1, 0, 1]);
378
379 let correlations =
380 ParallelCorrelationComputer::compute_feature_target_correlation_parallel(&x, &y)
381 .unwrap();
382 assert_eq!(correlations.len(), 2);
383 }
384
385 #[test]
386 fn test_parallel_variance_computer() {
387 let x =
388 Array2::from_shape_vec((4, 2), vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]).unwrap();
389
390 let variances = ParallelVarianceComputer::compute_feature_variances_parallel(&x);
391 assert_eq!(variances.len(), 2);
392 assert!(variances[0] > 0.0);
393 assert!(variances[1] > 0.0);
394 }
395
396 #[test]
397 fn test_parallel_selection_utils() {
398 let scores = Array1::from_vec(vec![0.1, 0.5, 0.3, 0.8, 0.2]);
399
400 let threshold_selected = ParallelSelectionUtils::apply_threshold_parallel(&scores, 0.4);
401 assert_eq!(threshold_selected.len(), 2); let percentile_selected = ParallelSelectionUtils::apply_percentile_parallel(&scores, 60.0);
404 assert_eq!(percentile_selected.len(), 3); }
406}