Skip to main content

ferray_random/
parallel.rs

1// ferray-random: Parallel generation via jump-ahead / stream splitting
2//
3// Provides deterministic parallel generation that produces the same output
4// regardless of thread count, by using fixed index-range assignment.
5
6use ferray_core::{Array, FerrayError, IxDyn};
7
8use crate::bitgen::BitGenerator;
9use crate::distributions::normal::standard_normal_single;
10use crate::generator::{Generator, shape_size, vec_to_array_f64};
11use crate::shape::IntoShape;
12
13impl<B: BitGenerator + Clone> Generator<B> {
14    /// Generate standard normal variates in parallel, deterministically.
15    ///
16    /// Uses `spawn()` to create independent child generators (via jump-ahead
17    /// for Xoshiro256**, stream IDs for Philox, or seed-from-parent for
18    /// PCG64), then generates chunks in parallel using Rayon.
19    ///
20    /// The output is **deterministic** for a given seed and thread count,
21    /// but does **not** match `standard_normal(size)` (which is sequential).
22    /// The parallel version produces different values because the child
23    /// generators have different internal states.
24    ///
25    /// # Errors
26    /// Returns `FerrayError::InvalidValue` if `shape` is invalid.
27    pub fn standard_normal_parallel(
28        &mut self,
29        size: impl IntoShape,
30    ) -> Result<Array<f64, IxDyn>, FerrayError> {
31        let shape_vec = size.into_shape()?;
32        let n = shape_size(&shape_vec);
33
34        // For small sizes, sequential is faster (no spawn/thread overhead)
35        let num_threads = rayon::current_num_threads().max(1);
36        if n < 10_000 || num_threads <= 1 {
37            // Sequential fallback
38            let mut data = Vec::with_capacity(n);
39            for _ in 0..n {
40                data.push(standard_normal_single(&mut self.bg));
41            }
42            return vec_to_array_f64(data, &shape_vec);
43        }
44
45        // Spawn independent child generators
46        let mut children = self.spawn(num_threads)?;
47        let chunk_size = n.div_ceil(num_threads);
48
49        // Generate chunks in parallel
50        use rayon::prelude::*;
51        let chunks: Vec<Vec<f64>> = children
52            .par_iter_mut()
53            .enumerate()
54            .map(|(i, child)| {
55                let start = i * chunk_size;
56                let end = (start + chunk_size).min(n);
57                let count = end - start;
58                let mut chunk = Vec::with_capacity(count);
59                for _ in 0..count {
60                    chunk.push(standard_normal_single(&mut child.bg));
61                }
62                chunk
63            })
64            .collect();
65
66        // Concatenate chunks
67        let mut data = Vec::with_capacity(n);
68        for chunk in chunks {
69            data.extend_from_slice(&chunk);
70        }
71        data.truncate(n);
72
73        vec_to_array_f64(data, &shape_vec)
74    }
75
76    /// Spawn `n` independent child generators for manual parallel use.
77    ///
78    /// Uses `jump()` if available, otherwise seeds children from parent output.
79    ///
80    /// # Arguments
81    /// * `n` - Number of child generators to create.
82    ///
83    /// # Errors
84    /// Returns `FerrayError::InvalidValue` if `n` is zero.
85    pub fn spawn(&mut self, n: usize) -> Result<Vec<Self>, FerrayError> {
86        crate::generator::spawn_generators(self, n)
87    }
88}
89
90#[cfg(test)]
91mod tests {
92    use crate::default_rng_seeded;
93
94    #[test]
95    fn parallel_correct_length_and_stats() {
96        // Parallel output has correct length and reasonable statistics
97        let mut rng = default_rng_seeded(42);
98        let par = rng.standard_normal_parallel(10_000).unwrap();
99        assert_eq!(par.shape(), &[10_000]);
100        let slice = par.as_slice().unwrap();
101        let mean: f64 = slice.iter().sum::<f64>() / slice.len() as f64;
102        let var: f64 =
103            slice.iter().map(|x| (x - mean) * (x - mean)).sum::<f64>() / slice.len() as f64;
104        // Mean should be near 0, variance near 1
105        assert!(mean.abs() < 0.05, "mean = {mean}");
106        assert!((var - 1.0).abs() < 0.1, "var = {var}");
107    }
108
109    #[test]
110    fn parallel_deterministic() {
111        let mut rng1 = default_rng_seeded(42);
112        let mut rng2 = default_rng_seeded(42);
113
114        let a = rng1.standard_normal_parallel(50_000).unwrap();
115        let b = rng2.standard_normal_parallel(50_000).unwrap();
116
117        assert_eq!(a.as_slice().unwrap(), b.as_slice().unwrap());
118    }
119
120    #[test]
121    fn parallel_large() {
122        let mut rng = default_rng_seeded(42);
123        let arr = rng.standard_normal_parallel(1_000_000).unwrap();
124        assert_eq!(arr.shape(), &[1_000_000]);
125        // Check mean is roughly 0
126        let slice = arr.as_slice().unwrap();
127        let mean: f64 = slice.iter().sum::<f64>() / slice.len() as f64;
128        assert!(mean.abs() < 0.01, "parallel mean {mean} too far from 0");
129    }
130
131    #[test]
132    fn spawn_creates_independent_generators() {
133        let mut rng = default_rng_seeded(42);
134        let mut children = rng.spawn(4).unwrap();
135        assert_eq!(children.len(), 4);
136
137        // Each child should produce different sequences
138        let outputs: Vec<u64> = children
139            .iter_mut()
140            .map(super::super::generator::Generator::next_u64)
141            .collect();
142        for i in 0..outputs.len() {
143            for j in (i + 1)..outputs.len() {
144                assert_ne!(
145                    outputs[i], outputs[j],
146                    "children {i} and {j} produced same first value"
147                );
148            }
149        }
150    }
151
152    #[test]
153    fn spawn_deterministic() {
154        let mut rng1 = default_rng_seeded(42);
155        let mut rng2 = default_rng_seeded(42);
156
157        let mut children1 = rng1.spawn(4).unwrap();
158        let mut children2 = rng2.spawn(4).unwrap();
159
160        for (c1, c2) in children1.iter_mut().zip(children2.iter_mut()) {
161            for _ in 0..100 {
162                assert_eq!(c1.next_u64(), c2.next_u64());
163            }
164        }
165    }
166
167    #[test]
168    fn parallel_zero_size_returns_empty() {
169        // Issue #264, #455: NumPy treats size=0 as a valid request that
170        // produces an empty array. ferray-random now matches.
171        let mut rng = default_rng_seeded(42);
172        let arr = rng.standard_normal_parallel(0).unwrap();
173        assert_eq!(arr.shape(), &[0]);
174        assert_eq!(arr.size(), 0);
175    }
176}