1use rayon::prelude::*;
7use scirs2_core::ndarray::{Array1, Array2, Axis};
8use scirs2_core::random::{Distribution, RandNormal, Random, Rng};
9use scirs2_core::rngs::StdRng;
10
11#[derive(Clone)]
13pub struct ParallelRng {
14 base_seed: u64,
15}
16
17impl ParallelRng {
18 pub fn new(seed: u64) -> Self {
20 Self { base_seed: seed }
21 }
22
23 pub fn get_thread_rng(&self, thread_id: usize) -> Random<StdRng> {
25 let thread_seed = self.base_seed.wrapping_add(thread_id as u64);
27 Random::seed(thread_seed)
28 }
29
30 pub fn generate_normal_matrix_parallel(
32 &self,
33 n_samples: usize,
34 n_features: usize,
35 mean: f64,
36 std: f64,
37 n_threads: Option<usize>,
38 ) -> Array2<f64> {
39 if let Some(threads) = n_threads {
41 rayon::ThreadPoolBuilder::new()
42 .num_threads(threads)
43 .build_global()
44 .ok();
45 }
46
47 let num_workers = rayon::current_num_threads();
49 let chunk_size = (n_samples + num_workers - 1) / num_workers;
50
51 let rows: Vec<Array1<f64>> = (0..n_samples)
53 .into_par_iter()
54 .chunks(chunk_size)
55 .enumerate()
56 .flat_map(|(chunk_id, chunk)| {
57 let mut rng = self.get_thread_rng(chunk_id);
58 let normal = RandNormal::new(mean, std).unwrap();
59
60 chunk
61 .into_iter()
62 .map(|_| Array1::from_shape_fn(n_features, |_| normal.sample(&mut rng)))
63 .collect::<Vec<_>>()
64 })
65 .collect();
66
67 let mut matrix = Array2::zeros((n_samples, n_features));
69 for (i, row) in rows.into_iter().enumerate() {
70 matrix.row_mut(i).assign(&row);
71 }
72
73 matrix
74 }
75
76 pub fn generate_uniform_parallel(&self, n_samples: usize, low: f64, high: f64) -> Array1<f64> {
78 let num_workers = rayon::current_num_threads();
79 let chunk_size = (n_samples + num_workers - 1) / num_workers;
80
81 (0..n_samples)
82 .into_par_iter()
83 .chunks(chunk_size)
84 .enumerate()
85 .flat_map(|(chunk_id, chunk)| {
86 let mut rng = self.get_thread_rng(chunk_id);
87
88 chunk
89 .into_iter()
90 .map(|_| {
91 let u: f64 = rng.gen();
92 low + u * (high - low)
93 })
94 .collect::<Vec<_>>()
95 })
96 .collect::<Vec<_>>()
97 .into()
98 }
99}
100
101pub fn make_classification_parallel(
103 n_samples: usize,
104 n_features: usize,
105 n_classes: usize,
106 class_sep: f64,
107 random_state: u64,
108 n_threads: Option<usize>,
109) -> (Array2<f64>, Array1<i32>) {
110 let rng = ParallelRng::new(random_state);
111
112 let features = rng.generate_normal_matrix_parallel(n_samples, n_features, 0.0, 1.0, n_threads);
114
115 let targets = Array1::from_shape_fn(n_samples, |i| (i % n_classes) as i32);
117
118 let separated_features = features
120 .axis_iter(Axis(0))
121 .into_par_iter()
122 .enumerate()
123 .map(|(i, row)| {
124 let offset = targets[i] as f64 * class_sep;
125 row.mapv(|x| x + offset)
126 })
127 .collect::<Vec<_>>();
128
129 let mut result = Array2::zeros((n_samples, n_features));
130 for (i, row) in separated_features.into_iter().enumerate() {
131 result.row_mut(i).assign(&row);
132 }
133
134 (result, targets)
135}
136
137pub fn make_regression_parallel(
139 n_samples: usize,
140 n_features: usize,
141 noise: f64,
142 random_state: u64,
143 n_threads: Option<usize>,
144) -> (Array2<f64>, Array1<f64>) {
145 let rng = ParallelRng::new(random_state);
146
147 let features = rng.generate_normal_matrix_parallel(n_samples, n_features, 0.0, 1.0, n_threads);
149
150 let mut coef_rng = rng.get_thread_rng(0);
152 let normal_coef = RandNormal::new(0.0, 1.0).unwrap();
153 let coef = Array1::from_shape_fn(n_features, |_| normal_coef.sample(&mut coef_rng));
154
155 let targets_base: Vec<f64> = features
157 .axis_iter(Axis(0))
158 .into_par_iter()
159 .map(|row| row.dot(&coef))
160 .collect();
161
162 let targets = if noise > 0.0 {
164 let noise_values: Vec<f64> = (0..n_samples)
165 .into_par_iter()
166 .chunks((n_samples + rayon::current_num_threads() - 1) / rayon::current_num_threads())
167 .enumerate()
168 .flat_map(|(chunk_id, chunk)| {
169 let mut chunk_rng = rng.get_thread_rng(chunk_id + 1);
170 let normal_noise = RandNormal::new(0.0, noise).unwrap();
171
172 chunk
173 .into_iter()
174 .map(|_| normal_noise.sample(&mut chunk_rng))
175 .collect::<Vec<_>>()
176 })
177 .collect();
178
179 targets_base
180 .par_iter()
181 .zip(noise_values.par_iter())
182 .map(|(&t, &n)| t + n)
183 .collect::<Vec<_>>()
184 .into()
185 } else {
186 targets_base.into()
187 };
188
189 (features, targets)
190}
191
192pub fn make_blobs_parallel(
194 n_samples: usize,
195 n_features: usize,
196 centers: usize,
197 cluster_std: f64,
198 random_state: u64,
199 _n_threads: Option<usize>,
200) -> (Array2<f64>, Array1<i32>) {
201 let rng = ParallelRng::new(random_state);
202
203 let mut center_rng = rng.get_thread_rng(0);
205 let center_normal = RandNormal::new(0.0, 10.0).unwrap();
206 let cluster_centers = Array2::from_shape_fn((centers, n_features), |_| {
207 center_normal.sample(&mut center_rng)
208 });
209
210 let samples_per_cluster = n_samples / centers;
212 let targets = Array1::from_shape_fn(n_samples, |i| {
213 (i / samples_per_cluster).min(centers - 1) as i32
214 });
215
216 let samples: Vec<Array1<f64>> = (0..n_samples)
218 .into_par_iter()
219 .chunks((n_samples + rayon::current_num_threads() - 1) / rayon::current_num_threads())
220 .enumerate()
221 .flat_map(|(chunk_id, chunk)| {
222 let mut chunk_rng = rng.get_thread_rng(chunk_id + 1);
223 let sample_normal = RandNormal::new(0.0, cluster_std).unwrap();
224
225 chunk
226 .into_iter()
227 .map(|i| {
228 let cluster_id = targets[i] as usize;
229 let center = cluster_centers.row(cluster_id);
230 center.mapv(|c| c + sample_normal.sample(&mut chunk_rng))
231 })
232 .collect::<Vec<_>>()
233 })
234 .collect();
235
236 let mut features = Array2::zeros((n_samples, n_features));
237 for (i, sample) in samples.into_iter().enumerate() {
238 features.row_mut(i).assign(&sample);
239 }
240
241 (features, targets)
242}
243
244#[cfg(test)]
245mod tests {
246 use super::*;
247
248 #[test]
249 fn test_parallel_rng_creation() {
250 let rng = ParallelRng::new(42);
251 let mut thread_rng = rng.get_thread_rng(0);
252 let val1 = thread_rng.gen::<f64>();
254 let val2 = thread_rng.gen::<f64>();
255 assert!(val1 >= 0.0 && val1 <= 1.0);
256 assert!(val2 >= 0.0 && val2 <= 1.0);
257 }
258
259 #[test]
260 fn test_generate_normal_matrix_parallel() {
261 let rng = ParallelRng::new(42);
262 let matrix = rng.generate_normal_matrix_parallel(100, 10, 0.0, 1.0, Some(2));
263
264 assert_eq!(matrix.nrows(), 100);
265 assert_eq!(matrix.ncols(), 10);
266
267 for &val in matrix.iter() {
269 assert!(val.abs() < 5.0);
270 }
271 }
272
273 #[test]
274 fn test_generate_uniform_parallel() {
275 let rng = ParallelRng::new(42);
276 let values = rng.generate_uniform_parallel(100, 0.0, 10.0);
277
278 assert_eq!(values.len(), 100);
279
280 for &val in values.iter() {
282 assert!(val >= 0.0 && val <= 10.0);
283 }
284 }
285
286 #[test]
287 fn test_make_classification_parallel() {
288 let (features, targets) = make_classification_parallel(100, 5, 3, 1.0, 42, Some(2));
289
290 assert_eq!(features.nrows(), 100);
291 assert_eq!(features.ncols(), 5);
292 assert_eq!(targets.len(), 100);
293
294 let mut has_class = vec![false; 3];
296 for &target in targets.iter() {
297 assert!(target >= 0 && target < 3);
298 has_class[target as usize] = true;
299 }
300 assert!(has_class.iter().all(|&x| x));
301 }
302
303 #[test]
304 fn test_make_regression_parallel() {
305 let (features, targets) = make_regression_parallel(100, 5, 0.1, 42, Some(2));
306
307 assert_eq!(features.nrows(), 100);
308 assert_eq!(features.ncols(), 5);
309 assert_eq!(targets.len(), 100);
310 }
311
312 #[test]
313 fn test_make_blobs_parallel() {
314 let (features, targets) = make_blobs_parallel(150, 5, 3, 1.0, 42, Some(2));
315
316 assert_eq!(features.nrows(), 150);
317 assert_eq!(features.ncols(), 5);
318 assert_eq!(targets.len(), 150);
319
320 let mut has_cluster = vec![false; 3];
322 for &target in targets.iter() {
323 assert!(target >= 0 && target < 3);
324 has_cluster[target as usize] = true;
325 }
326 assert!(has_cluster.iter().all(|&x| x));
327 }
328
329 #[test]
330 fn test_deterministic_parallel_generation() {
331 let (features1, _) = make_classification_parallel(50, 3, 2, 1.0, 42, Some(2));
333 let (features2, _) = make_classification_parallel(50, 3, 2, 1.0, 42, Some(2));
334
335 for i in 0..features1.nrows() {
337 for j in 0..features1.ncols() {
338 assert_eq!(features1[[i, j]], features2[[i, j]]);
339 }
340 }
341 }
342}