sklears_datasets/
parallel_rng.rs

1//! Parallel random number generation for dataset creation
2//!
3//! This module provides thread-safe parallel random number generation
4//! for efficient multi-threaded dataset generation.
5
6use rayon::prelude::*;
7use scirs2_core::ndarray::{Array1, Array2, Axis};
8use scirs2_core::random::{Distribution, RandNormal, Random, Rng};
9use scirs2_core::rngs::StdRng;
10
11/// Thread-safe parallel random number generator
12#[derive(Clone)]
13pub struct ParallelRng {
14    base_seed: u64,
15}
16
17impl ParallelRng {
18    /// Create a new parallel RNG with a base seed
19    pub fn new(seed: u64) -> Self {
20        Self { base_seed: seed }
21    }
22
23    /// Get a thread-local RNG for a specific thread ID
24    pub fn get_thread_rng(&self, thread_id: usize) -> Random<StdRng> {
25        // Combine base seed with thread ID for deterministic per-thread RNG
26        let thread_seed = self.base_seed.wrapping_add(thread_id as u64);
27        Random::seed(thread_seed)
28    }
29
30    /// Generate a normal distribution matrix in parallel
31    pub fn generate_normal_matrix_parallel(
32        &self,
33        n_samples: usize,
34        n_features: usize,
35        mean: f64,
36        std: f64,
37        n_threads: Option<usize>,
38    ) -> Array2<f64> {
39        // Set thread pool size if specified
40        if let Some(threads) = n_threads {
41            rayon::ThreadPoolBuilder::new()
42                .num_threads(threads)
43                .build_global()
44                .ok();
45        }
46
47        // Determine chunk size for parallel processing
48        let num_workers = rayon::current_num_threads();
49        let chunk_size = (n_samples + num_workers - 1) / num_workers;
50
51        // Generate rows in parallel
52        let rows: Vec<Array1<f64>> = (0..n_samples)
53            .into_par_iter()
54            .chunks(chunk_size)
55            .enumerate()
56            .flat_map(|(chunk_id, chunk)| {
57                let mut rng = self.get_thread_rng(chunk_id);
58                let normal = RandNormal::new(mean, std).unwrap();
59
60                chunk
61                    .into_iter()
62                    .map(|_| Array1::from_shape_fn(n_features, |_| normal.sample(&mut rng)))
63                    .collect::<Vec<_>>()
64            })
65            .collect();
66
67        // Stack rows into matrix
68        let mut matrix = Array2::zeros((n_samples, n_features));
69        for (i, row) in rows.into_iter().enumerate() {
70            matrix.row_mut(i).assign(&row);
71        }
72
73        matrix
74    }
75
76    /// Generate uniform random values in parallel
77    pub fn generate_uniform_parallel(&self, n_samples: usize, low: f64, high: f64) -> Array1<f64> {
78        let num_workers = rayon::current_num_threads();
79        let chunk_size = (n_samples + num_workers - 1) / num_workers;
80
81        (0..n_samples)
82            .into_par_iter()
83            .chunks(chunk_size)
84            .enumerate()
85            .flat_map(|(chunk_id, chunk)| {
86                let mut rng = self.get_thread_rng(chunk_id);
87
88                chunk
89                    .into_iter()
90                    .map(|_| {
91                        let u: f64 = rng.gen();
92                        low + u * (high - low)
93                    })
94                    .collect::<Vec<_>>()
95            })
96            .collect::<Vec<_>>()
97            .into()
98    }
99}
100
101/// Parallel classification dataset generation
102pub fn make_classification_parallel(
103    n_samples: usize,
104    n_features: usize,
105    n_classes: usize,
106    class_sep: f64,
107    random_state: u64,
108    n_threads: Option<usize>,
109) -> (Array2<f64>, Array1<i32>) {
110    let rng = ParallelRng::new(random_state);
111
112    // Generate features in parallel
113    let features = rng.generate_normal_matrix_parallel(n_samples, n_features, 0.0, 1.0, n_threads);
114
115    // Generate targets
116    let targets = Array1::from_shape_fn(n_samples, |i| (i % n_classes) as i32);
117
118    // Apply class separation in parallel
119    let separated_features = features
120        .axis_iter(Axis(0))
121        .into_par_iter()
122        .enumerate()
123        .map(|(i, row)| {
124            let offset = targets[i] as f64 * class_sep;
125            row.mapv(|x| x + offset)
126        })
127        .collect::<Vec<_>>();
128
129    let mut result = Array2::zeros((n_samples, n_features));
130    for (i, row) in separated_features.into_iter().enumerate() {
131        result.row_mut(i).assign(&row);
132    }
133
134    (result, targets)
135}
136
137/// Parallel regression dataset generation
138pub fn make_regression_parallel(
139    n_samples: usize,
140    n_features: usize,
141    noise: f64,
142    random_state: u64,
143    n_threads: Option<usize>,
144) -> (Array2<f64>, Array1<f64>) {
145    let rng = ParallelRng::new(random_state);
146
147    // Generate features in parallel
148    let features = rng.generate_normal_matrix_parallel(n_samples, n_features, 0.0, 1.0, n_threads);
149
150    // Generate coefficients
151    let mut coef_rng = rng.get_thread_rng(0);
152    let normal_coef = RandNormal::new(0.0, 1.0).unwrap();
153    let coef = Array1::from_shape_fn(n_features, |_| normal_coef.sample(&mut coef_rng));
154
155    // Compute targets in parallel: y = X @ coef
156    let targets_base: Vec<f64> = features
157        .axis_iter(Axis(0))
158        .into_par_iter()
159        .map(|row| row.dot(&coef))
160        .collect();
161
162    // Add noise in parallel if requested
163    let targets = if noise > 0.0 {
164        let noise_values: Vec<f64> = (0..n_samples)
165            .into_par_iter()
166            .chunks((n_samples + rayon::current_num_threads() - 1) / rayon::current_num_threads())
167            .enumerate()
168            .flat_map(|(chunk_id, chunk)| {
169                let mut chunk_rng = rng.get_thread_rng(chunk_id + 1);
170                let normal_noise = RandNormal::new(0.0, noise).unwrap();
171
172                chunk
173                    .into_iter()
174                    .map(|_| normal_noise.sample(&mut chunk_rng))
175                    .collect::<Vec<_>>()
176            })
177            .collect();
178
179        targets_base
180            .par_iter()
181            .zip(noise_values.par_iter())
182            .map(|(&t, &n)| t + n)
183            .collect::<Vec<_>>()
184            .into()
185    } else {
186        targets_base.into()
187    };
188
189    (features, targets)
190}
191
192/// Parallel blob generation for clustering
193pub fn make_blobs_parallel(
194    n_samples: usize,
195    n_features: usize,
196    centers: usize,
197    cluster_std: f64,
198    random_state: u64,
199    _n_threads: Option<usize>,
200) -> (Array2<f64>, Array1<i32>) {
201    let rng = ParallelRng::new(random_state);
202
203    // Generate cluster centers
204    let mut center_rng = rng.get_thread_rng(0);
205    let center_normal = RandNormal::new(0.0, 10.0).unwrap();
206    let cluster_centers = Array2::from_shape_fn((centers, n_features), |_| {
207        center_normal.sample(&mut center_rng)
208    });
209
210    // Assign samples to clusters
211    let samples_per_cluster = n_samples / centers;
212    let targets = Array1::from_shape_fn(n_samples, |i| {
213        (i / samples_per_cluster).min(centers - 1) as i32
214    });
215
216    // Generate samples in parallel
217    let samples: Vec<Array1<f64>> = (0..n_samples)
218        .into_par_iter()
219        .chunks((n_samples + rayon::current_num_threads() - 1) / rayon::current_num_threads())
220        .enumerate()
221        .flat_map(|(chunk_id, chunk)| {
222            let mut chunk_rng = rng.get_thread_rng(chunk_id + 1);
223            let sample_normal = RandNormal::new(0.0, cluster_std).unwrap();
224
225            chunk
226                .into_iter()
227                .map(|i| {
228                    let cluster_id = targets[i] as usize;
229                    let center = cluster_centers.row(cluster_id);
230                    center.mapv(|c| c + sample_normal.sample(&mut chunk_rng))
231                })
232                .collect::<Vec<_>>()
233        })
234        .collect();
235
236    let mut features = Array2::zeros((n_samples, n_features));
237    for (i, sample) in samples.into_iter().enumerate() {
238        features.row_mut(i).assign(&sample);
239    }
240
241    (features, targets)
242}
243
244#[cfg(test)]
245mod tests {
246    use super::*;
247
248    #[test]
249    fn test_parallel_rng_creation() {
250        let rng = ParallelRng::new(42);
251        let mut thread_rng = rng.get_thread_rng(0);
252        // Just verify it was created successfully
253        let val1 = thread_rng.gen::<f64>();
254        let val2 = thread_rng.gen::<f64>();
255        assert!(val1 >= 0.0 && val1 <= 1.0);
256        assert!(val2 >= 0.0 && val2 <= 1.0);
257    }
258
259    #[test]
260    fn test_generate_normal_matrix_parallel() {
261        let rng = ParallelRng::new(42);
262        let matrix = rng.generate_normal_matrix_parallel(100, 10, 0.0, 1.0, Some(2));
263
264        assert_eq!(matrix.nrows(), 100);
265        assert_eq!(matrix.ncols(), 10);
266
267        // Check that values are reasonable
268        for &val in matrix.iter() {
269            assert!(val.abs() < 5.0);
270        }
271    }
272
273    #[test]
274    fn test_generate_uniform_parallel() {
275        let rng = ParallelRng::new(42);
276        let values = rng.generate_uniform_parallel(100, 0.0, 10.0);
277
278        assert_eq!(values.len(), 100);
279
280        // Check that all values are in range
281        for &val in values.iter() {
282            assert!(val >= 0.0 && val <= 10.0);
283        }
284    }
285
286    #[test]
287    fn test_make_classification_parallel() {
288        let (features, targets) = make_classification_parallel(100, 5, 3, 1.0, 42, Some(2));
289
290        assert_eq!(features.nrows(), 100);
291        assert_eq!(features.ncols(), 5);
292        assert_eq!(targets.len(), 100);
293
294        // Check that we have all classes
295        let mut has_class = vec![false; 3];
296        for &target in targets.iter() {
297            assert!(target >= 0 && target < 3);
298            has_class[target as usize] = true;
299        }
300        assert!(has_class.iter().all(|&x| x));
301    }
302
303    #[test]
304    fn test_make_regression_parallel() {
305        let (features, targets) = make_regression_parallel(100, 5, 0.1, 42, Some(2));
306
307        assert_eq!(features.nrows(), 100);
308        assert_eq!(features.ncols(), 5);
309        assert_eq!(targets.len(), 100);
310    }
311
312    #[test]
313    fn test_make_blobs_parallel() {
314        let (features, targets) = make_blobs_parallel(150, 5, 3, 1.0, 42, Some(2));
315
316        assert_eq!(features.nrows(), 150);
317        assert_eq!(features.ncols(), 5);
318        assert_eq!(targets.len(), 150);
319
320        // Check that we have all clusters
321        let mut has_cluster = vec![false; 3];
322        for &target in targets.iter() {
323            assert!(target >= 0 && target < 3);
324            has_cluster[target as usize] = true;
325        }
326        assert!(has_cluster.iter().all(|&x| x));
327    }
328
329    #[test]
330    fn test_deterministic_parallel_generation() {
331        // Test that same seed produces same results
332        let (features1, _) = make_classification_parallel(50, 3, 2, 1.0, 42, Some(2));
333        let (features2, _) = make_classification_parallel(50, 3, 2, 1.0, 42, Some(2));
334
335        // Should be identical
336        for i in 0..features1.nrows() {
337            for j in 0..features1.ncols() {
338                assert_eq!(features1[[i, j]], features2[[i, j]]);
339            }
340        }
341    }
342}