Skip to main content

oxiphysics_core/parallel/
functions.rs

1//! Auto-generated module
2//!
3//! 🤖 Generated with [SplitRS](https://github.com/cool-japan/splitrs)
4
5use std::sync::Arc;
6use std::thread;
7
8/// Applies `f` to every element of `data` and returns the results.
9///
10/// Sequential fallback -- the API is parallel-ready (requires `Sync`).
11pub fn parallel_map_f64(data: &[f64], f: impl Fn(f64) -> f64 + Sync) -> Vec<f64> {
12    data.iter().map(|&x| f(x)).collect()
13}
14/// Reduces `data` to a single value using `f`, starting from `identity`.
15///
16/// Sequential fallback -- the API is parallel-ready (requires `Sync + Send`).
17pub fn parallel_reduce_f64(
18    data: &[f64],
19    identity: f64,
20    f: impl Fn(f64, f64) -> f64 + Sync + Send,
21) -> f64 {
22    data.iter().copied().fold(identity, f)
23}
24/// Splits `data` into `n_parts` roughly equal slices.
25///
26/// If `n_parts` is zero or `data` is empty the result is an empty `Vec`.
27/// Extra items are distributed to the leading partitions.
28pub fn scatter_gather<T: Clone + Default>(data: &[T], n_parts: usize) -> Vec<Vec<T>> {
29    if n_parts == 0 || data.is_empty() {
30        return vec![];
31    }
32    let len = data.len();
33    let base = len / n_parts;
34    let remainder = len % n_parts;
35    let mut result = Vec::with_capacity(n_parts);
36    let mut offset = 0;
37    for i in 0..n_parts {
38        let size = base + if i < remainder { 1 } else { 0 };
39        result.push(data[offset..offset + size].to_vec());
40        offset += size;
41    }
42    result
43}
44/// Concatenates a list of parts back into a single `Vec`.
45pub fn gather<T: Clone>(parts: Vec<Vec<T>>) -> Vec<T> {
46    parts.into_iter().flatten().collect()
47}
48/// Computes the dot product of `a` and `b`.
49///
50/// Panics if the slices have different lengths.
51pub fn parallel_dot_product(a: &[f64], b: &[f64]) -> f64 {
52    assert_eq!(a.len(), b.len(), "parallel_dot_product: length mismatch");
53    a.iter().zip(b.iter()).map(|(&x, &y)| x * y).sum()
54}
55/// Multiplies each row of `matrix_rows` by `vector` and returns the result.
56pub fn parallel_matrix_vec_multiply(matrix_rows: &[Vec<f64>], vector: &[f64]) -> Vec<f64> {
57    matrix_rows
58        .iter()
59        .map(|row| {
60            assert_eq!(
61                row.len(),
62                vector.len(),
63                "parallel_matrix_vec_multiply: dimension mismatch"
64            );
65            row.iter().zip(vector.iter()).map(|(&a, &b)| a * b).sum()
66        })
67        .collect()
68}
69/// Computes the sum of all elements in `data`.
70pub fn parallel_sum(data: &[f64]) -> f64 {
71    data.iter().copied().sum()
72}
73/// Apply `f` to every item in `items` in parallel (no return value).
74///
75/// Like `parallel_map` but discards results, useful for side-effecting work.
76pub fn parallel_for_each<T>(items: &[T], f: impl Fn(&T) + Send + Sync + 'static)
77where
78    T: Send + Sync + Clone + 'static,
79{
80    if items.is_empty() {
81        return;
82    }
83    let n_threads = thread::available_parallelism()
84        .map(|p| p.get())
85        .unwrap_or(4)
86        .min(items.len());
87    let f = Arc::new(f);
88    let items_arc: Arc<[T]> = items.into();
89    let chunk_size = items.len().div_ceil(n_threads);
90    let mut handles = Vec::new();
91    let mut start = 0;
92    while start < items.len() {
93        let end = (start + chunk_size).min(items.len());
94        let f_clone = Arc::clone(&f);
95        let items_clone = Arc::clone(&items_arc);
96        let h = thread::spawn(move || {
97            for i in start..end {
98                f_clone(&items_clone[i]);
99            }
100        });
101        handles.push(h);
102        start = end;
103    }
104    for h in handles {
105        h.join().expect("parallel_for_each: worker thread panicked");
106    }
107}
108/// Apply `f` to every item in `items` in parallel and return the results.
109///
110/// Uses `std::thread::spawn` to fan out work across chunks, then collects
111/// results in the original order.
112///
113/// # Type constraints
114/// - `T: Send + Sync` -- items must be shareable across threads.
115/// - `R: Send` -- results must be sendable back to the main thread.
116/// - `f: Fn(&T) -> R + Send + Sync` -- the closure must be thread-safe.
117pub fn parallel_map<T, R>(items: &[T], f: impl Fn(&T) -> R + Send + Sync + 'static) -> Vec<R>
118where
119    T: Send + Sync + Clone + 'static,
120    R: Send + 'static,
121{
122    if items.is_empty() {
123        return Vec::new();
124    }
125    let n_threads = thread::available_parallelism()
126        .map(|p| p.get())
127        .unwrap_or(4)
128        .min(items.len());
129    let f = Arc::new(f);
130    let items_arc: Arc<[T]> = items.into();
131    let chunk_size = items.len().div_ceil(n_threads);
132    let mut handles: Vec<thread::JoinHandle<Vec<R>>> = Vec::new();
133    let mut start = 0;
134    while start < items.len() {
135        let end = (start + chunk_size).min(items.len());
136        let f_clone = Arc::clone(&f);
137        let items_clone = Arc::clone(&items_arc);
138        let h = thread::spawn(move || (start..end).map(|i| f_clone(&items_clone[i])).collect());
139        handles.push(h);
140        start = end;
141    }
142    let mut result = Vec::with_capacity(items.len());
143    for h in handles {
144        result.extend(h.join().expect("parallel_map: worker thread panicked"));
145    }
146    result
147}
148/// Filter `items` in parallel, keeping only those for which `predicate` returns `true`.
149///
150/// Preserves original order.
151pub fn parallel_filter<T>(
152    items: &[T],
153    predicate: impl Fn(&T) -> bool + Send + Sync + 'static,
154) -> Vec<T>
155where
156    T: Send + Sync + Clone + 'static,
157{
158    if items.is_empty() {
159        return Vec::new();
160    }
161    let n_threads = thread::available_parallelism()
162        .map(|p| p.get())
163        .unwrap_or(4)
164        .min(items.len());
165    let predicate = Arc::new(predicate);
166    let items_arc: Arc<[T]> = items.into();
167    let chunk_size = items.len().div_ceil(n_threads);
168    let mut handles: Vec<thread::JoinHandle<Vec<T>>> = Vec::new();
169    let mut start = 0;
170    while start < items.len() {
171        let end = (start + chunk_size).min(items.len());
172        let pred_clone = Arc::clone(&predicate);
173        let items_clone = Arc::clone(&items_arc);
174        let h = thread::spawn(move || {
175            (start..end)
176                .filter(|&i| pred_clone(&items_clone[i]))
177                .map(|i| items_clone[i].clone())
178                .collect()
179        });
180        handles.push(h);
181        start = end;
182    }
183    let mut result = Vec::new();
184    for h in handles {
185        result.extend(h.join().expect("parallel_filter: worker thread panicked"));
186    }
187    result
188}
189/// Reduce `items` to a single value using `f` with the given `identity` element.
190///
191/// Fans out the reduction into chunks (one per available hardware thread),
192/// reduces each chunk sequentially, then combines the chunk results.
193///
194/// # Type constraints
195/// - `T: Clone + Send + Sync` -- items and identity must be cloneable and thread-safe.
196/// - `f: Fn(T, T) -> T + Send + Sync` -- the combining function must be thread-safe.
197pub fn parallel_reduce<T>(
198    items: &[T],
199    f: impl Fn(T, T) -> T + Send + Sync + 'static,
200    identity: T,
201) -> T
202where
203    T: Clone + Send + Sync + 'static,
204{
205    if items.is_empty() {
206        return identity;
207    }
208    let n_threads = thread::available_parallelism()
209        .map(|p| p.get())
210        .unwrap_or(4)
211        .min(items.len());
212    let f = Arc::new(f);
213    let items_arc: Arc<[T]> = items.into();
214    let chunk_size = items.len().div_ceil(n_threads);
215    let mut handles: Vec<thread::JoinHandle<T>> = Vec::new();
216    let mut start = 0;
217    while start < items.len() {
218        let end = (start + chunk_size).min(items.len());
219        let f_clone = Arc::clone(&f);
220        let items_clone = Arc::clone(&items_arc);
221        let id = identity.clone();
222        let h = thread::spawn(move || {
223            let mut acc = id;
224            for i in start..end {
225                acc = f_clone(acc, items_clone[i].clone());
226            }
227            acc
228        });
229        handles.push(h);
230        start = end;
231    }
232    let mut acc = identity;
233    for h in handles {
234        let chunk_result = h.join().expect("parallel_reduce: worker thread panicked");
235        acc = f(acc, chunk_result);
236    }
237    acc
238}
239/// Reduce with a custom operator struct for more complex reductions.
240///
241/// The operator must provide `identity`, `combine`, and `finalize` methods.
242pub trait ReduceOperator: Send + Sync + 'static {
243    /// The accumulator type.
244    type Acc: Clone + Send + Sync + 'static;
245    /// The input element type.
246    type Item: Clone + Send + Sync + 'static;
247    /// The final result type.
248    type Result;
249    /// Identity element for the accumulator.
250    fn identity(&self) -> Self::Acc;
251    /// Fold a single item into the accumulator.
252    fn fold(&self, acc: Self::Acc, item: Self::Item) -> Self::Acc;
253    /// Combine two accumulators.
254    fn combine(&self, left: Self::Acc, right: Self::Acc) -> Self::Acc;
255    /// Convert the final accumulator to the result.
256    fn finalize(&self, acc: Self::Acc) -> Self::Result;
257}
258/// Reduce with a custom operator.
259pub fn parallel_reduce_with_op<Op>(items: &[Op::Item], op: &Op) -> Op::Result
260where
261    Op: ReduceOperator,
262{
263    if items.is_empty() {
264        return op.finalize(op.identity());
265    }
266    let mut acc = op.identity();
267    for item in items {
268        acc = op.fold(acc, item.clone());
269    }
270    op.finalize(acc)
271}
272/// Parallel merge sort for slices of `f64`.
273///
274/// Splits the data into chunks, sorts each chunk on a separate thread,
275/// then merges the sorted chunks pairwise.
276pub fn parallel_merge_sort(data: &mut [f64]) {
277    let n = data.len();
278    if n <= 1 {
279        return;
280    }
281    if n <= 64 {
282        insertion_sort(data);
283        return;
284    }
285    let n_threads = thread::available_parallelism()
286        .map(|p| p.get())
287        .unwrap_or(4)
288        .min(n / 32)
289        .max(1);
290    let chunk_size = n.div_ceil(n_threads);
291    let chunks: Vec<Vec<f64>> = data.chunks(chunk_size).map(|c| c.to_vec()).collect();
292    let handles: Vec<thread::JoinHandle<Vec<f64>>> = chunks
293        .into_iter()
294        .map(|mut chunk| {
295            thread::spawn(move || {
296                chunk.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
297                chunk
298            })
299        })
300        .collect();
301    let mut sorted_chunks: Vec<Vec<f64>> = handles
302        .into_iter()
303        .map(|h| h.join().expect("parallel_merge_sort: worker panicked"))
304        .collect();
305    while sorted_chunks.len() > 1 {
306        let mut merged = Vec::new();
307        let mut i = 0;
308        while i < sorted_chunks.len() {
309            if i + 1 < sorted_chunks.len() {
310                merged.push(merge_sorted(&sorted_chunks[i], &sorted_chunks[i + 1]));
311                i += 2;
312            } else {
313                merged.push(sorted_chunks[i].clone());
314                i += 1;
315            }
316        }
317        sorted_chunks = merged;
318    }
319    data.copy_from_slice(&sorted_chunks[0]);
320}
321/// Merge two sorted slices into a single sorted vector.
322pub fn merge_sorted(a: &[f64], b: &[f64]) -> Vec<f64> {
323    let mut result = Vec::with_capacity(a.len() + b.len());
324    let (mut i, mut j) = (0, 0);
325    while i < a.len() && j < b.len() {
326        if a[i] <= b[j] {
327            result.push(a[i]);
328            i += 1;
329        } else {
330            result.push(b[j]);
331            j += 1;
332        }
333    }
334    result.extend_from_slice(&a[i..]);
335    result.extend_from_slice(&b[j..]);
336    result
337}
338/// Simple insertion sort for small arrays.
339pub fn insertion_sort(data: &mut [f64]) {
340    for i in 1..data.len() {
341        let key = data[i];
342        let mut j = i;
343        while j > 0 && data[j - 1] > key {
344            data[j] = data[j - 1];
345            j -= 1;
346        }
347        data[j] = key;
348    }
349}
350/// Computes the inclusive prefix sum of `data`.
351///
352/// Returns a vector where element `i` is `sum(data[0..=i])`.
353pub fn prefix_sum(data: &[f64]) -> Vec<f64> {
354    let mut result = Vec::with_capacity(data.len());
355    let mut acc = 0.0_f64;
356    for &x in data {
357        acc += x;
358        result.push(acc);
359    }
360    result
361}
362/// Computes the exclusive prefix sum of `data`.
363///
364/// Returns a vector where element `i` is `sum(data[0..i])`.
365pub fn exclusive_prefix_sum(data: &[f64]) -> Vec<f64> {
366    let mut result = Vec::with_capacity(data.len());
367    let mut acc = 0.0_f64;
368    for &x in data {
369        result.push(acc);
370        acc += x;
371    }
372    result
373}
374/// Find the minimum value in `data` in parallel.
375///
376/// Returns `f64::INFINITY` for empty input.
377pub fn parallel_min(data: &[f64]) -> f64 {
378    if data.is_empty() {
379        return f64::INFINITY;
380    }
381    parallel_reduce(data, |a: f64, b: f64| a.min(b), f64::INFINITY)
382}
383/// Find the maximum value in `data` in parallel.
384///
385/// Returns `f64::NEG_INFINITY` for empty input.
386pub fn parallel_max(data: &[f64]) -> f64 {
387    if data.is_empty() {
388        return f64::NEG_INFINITY;
389    }
390    parallel_reduce(data, |a: f64, b: f64| a.max(b), f64::NEG_INFINITY)
391}
392/// Returns the number of logical CPU cores available to this process.
393///
394/// Falls back to 1 if the OS query fails.
395pub fn available_threads() -> usize {
396    thread::available_parallelism()
397        .map(|p| p.get())
398        .unwrap_or(1)
399}
400/// Returns a suggested number of threads for CPU-bound work.
401///
402/// Uses half the available cores (minimum 1) to leave headroom for the OS and
403/// other processes.
404pub fn suggested_thread_count() -> usize {
405    (available_threads() / 2).max(1)
406}
407/// Process `data` in fixed-size chunks, applying `f` to each chunk and
408/// collecting the results into a flat `Vec`R`.
409///
410/// This pattern mirrors hand-written SIMD loops where a fixed "vector width"
411/// is processed per iteration.  Here `chunk_size` plays the role of the SIMD
412/// lane count.
413pub fn chunk_process<T, R, F>(data: &[T], chunk_size: usize, f: F) -> Vec<R>
414where
415    T: Clone,
416    F: Fn(&[T]) -> Vec<R>,
417{
418    if data.is_empty() || chunk_size == 0 {
419        return Vec::new();
420    }
421    data.chunks(chunk_size).flat_map(f).collect()
422}
423/// Apply `f` element-wise on `a` and `b` in chunks of `chunk_size`, returning
424/// a `Vec`f64`.
425///
426/// Panics if `a.len() != b.len()`.
427pub fn chunk_zip_map(
428    a: &[f64],
429    b: &[f64],
430    chunk_size: usize,
431    f: impl Fn(f64, f64) -> f64,
432) -> Vec<f64> {
433    assert_eq!(
434        a.len(),
435        b.len(),
436        "chunk_zip_map: slices must be same length"
437    );
438    if a.is_empty() || chunk_size == 0 {
439        return Vec::new();
440    }
441    let n = a.len();
442    let mut result = Vec::with_capacity(n);
443    let mut i = 0;
444    while i < n {
445        let end = (i + chunk_size).min(n);
446        for k in i..end {
447            result.push(f(a[k], b[k]));
448        }
449        i = end;
450    }
451    result
452}
453/// Compute the dot product of `a` and `b` using chunk-based accumulation.
454///
455/// Accumulates partial sums over chunks of `chunk_size`, then sums the
456/// partials — mimics a SIMD reduction.
457pub fn chunk_dot_product(a: &[f64], b: &[f64], chunk_size: usize) -> f64 {
458    assert_eq!(a.len(), b.len(), "chunk_dot_product: length mismatch");
459    if a.is_empty() {
460        return 0.0;
461    }
462    let chunk_size = chunk_size.max(1);
463    let mut total = 0.0;
464    let mut i = 0;
465    while i < a.len() {
466        let end = (i + chunk_size).min(a.len());
467        let mut partial = 0.0;
468        for k in i..end {
469            partial += a[k] * b[k];
470        }
471        total += partial;
472        i = end;
473    }
474    total
475}
476/// In-place quicksort (serial reference for a parallel sort API).
477///
478/// Uses the median-of-three pivot selection and falls back to insertion sort
479/// for subarrays with ≤ 16 elements.
480pub fn parallel_sort(data: &mut [f64]) {
481    quicksort_recursive(data);
482}
483/// Recursive quicksort implementation for f64 slices.
484pub fn quicksort_recursive(data: &mut [f64]) {
485    let n = data.len();
486    if n <= 1 {
487        return;
488    }
489    if n <= 16 {
490        insertion_sort(data);
491        return;
492    }
493    let pivot_idx = median_of_three(data);
494    data.swap(pivot_idx, n - 1);
495    let pivot = data[n - 1];
496    let mut store = 0;
497    for i in 0..n - 1 {
498        if data[i] <= pivot {
499            data.swap(i, store);
500            store += 1;
501        }
502    }
503    data.swap(store, n - 1);
504    let (left, right) = data.split_at_mut(store);
505    quicksort_recursive(left);
506    quicksort_recursive(&mut right[1..]);
507}
508/// Returns the index of the median-of-three pivot (first, mid, last).
509pub fn median_of_three(data: &[f64]) -> usize {
510    let n = data.len();
511    let mid = n / 2;
512    let (a, b, c) = (data[0], data[mid], data[n - 1]);
513    if (a <= b && b <= c) || (c <= b && b <= a) {
514        mid
515    } else if (b <= a && a <= c) || (c <= a && a <= b) {
516        0
517    } else {
518        n - 1
519    }
520}
521/// Sort a `Vec`T` using `parallel_sort` (f64 specialisation) and return sorted copy.
522pub fn sorted_copy(data: &[f64]) -> Vec<f64> {
523    let mut v = data.to_vec();
524    parallel_sort(&mut v);
525    v
526}
527/// Accumulate a histogram of `data` into `n_bins` equal-width bins in parallel.
528///
529/// Returns a `Vec`usize` of length `n_bins` with counts.
530/// Empty input or `n_bins == 0` returns an empty `Vec`.
531pub fn parallel_histogram(data: &[f64], n_bins: usize) -> Vec<usize> {
532    if data.is_empty() || n_bins == 0 {
533        return vec![];
534    }
535    let min = data.iter().cloned().fold(f64::INFINITY, f64::min);
536    let max = data.iter().cloned().fold(f64::NEG_INFINITY, f64::max);
537    let width = if (max - min).abs() < f64::EPSILON {
538        1.0
539    } else {
540        (max - min) / n_bins as f64
541    };
542    let mut counts = vec![0usize; n_bins];
543    for &v in data {
544        let idx = ((v - min) / width) as usize;
545        counts[idx.min(n_bins - 1)] += 1;
546    }
547    counts
548}