oxiphysics_core/parallel/
functions.rs1use std::sync::Arc;
6use std::thread;
7
8pub fn parallel_map_f64(data: &[f64], f: impl Fn(f64) -> f64 + Sync) -> Vec<f64> {
12 data.iter().map(|&x| f(x)).collect()
13}
14pub fn parallel_reduce_f64(
18 data: &[f64],
19 identity: f64,
20 f: impl Fn(f64, f64) -> f64 + Sync + Send,
21) -> f64 {
22 data.iter().copied().fold(identity, f)
23}
24pub fn scatter_gather<T: Clone + Default>(data: &[T], n_parts: usize) -> Vec<Vec<T>> {
29 if n_parts == 0 || data.is_empty() {
30 return vec![];
31 }
32 let len = data.len();
33 let base = len / n_parts;
34 let remainder = len % n_parts;
35 let mut result = Vec::with_capacity(n_parts);
36 let mut offset = 0;
37 for i in 0..n_parts {
38 let size = base + if i < remainder { 1 } else { 0 };
39 result.push(data[offset..offset + size].to_vec());
40 offset += size;
41 }
42 result
43}
44pub fn gather<T: Clone>(parts: Vec<Vec<T>>) -> Vec<T> {
46 parts.into_iter().flatten().collect()
47}
48pub fn parallel_dot_product(a: &[f64], b: &[f64]) -> f64 {
52 assert_eq!(a.len(), b.len(), "parallel_dot_product: length mismatch");
53 a.iter().zip(b.iter()).map(|(&x, &y)| x * y).sum()
54}
55pub fn parallel_matrix_vec_multiply(matrix_rows: &[Vec<f64>], vector: &[f64]) -> Vec<f64> {
57 matrix_rows
58 .iter()
59 .map(|row| {
60 assert_eq!(
61 row.len(),
62 vector.len(),
63 "parallel_matrix_vec_multiply: dimension mismatch"
64 );
65 row.iter().zip(vector.iter()).map(|(&a, &b)| a * b).sum()
66 })
67 .collect()
68}
69pub fn parallel_sum(data: &[f64]) -> f64 {
71 data.iter().copied().sum()
72}
73pub fn parallel_for_each<T>(items: &[T], f: impl Fn(&T) + Send + Sync + 'static)
77where
78 T: Send + Sync + Clone + 'static,
79{
80 if items.is_empty() {
81 return;
82 }
83 let n_threads = thread::available_parallelism()
84 .map(|p| p.get())
85 .unwrap_or(4)
86 .min(items.len());
87 let f = Arc::new(f);
88 let items_arc: Arc<[T]> = items.into();
89 let chunk_size = items.len().div_ceil(n_threads);
90 let mut handles = Vec::new();
91 let mut start = 0;
92 while start < items.len() {
93 let end = (start + chunk_size).min(items.len());
94 let f_clone = Arc::clone(&f);
95 let items_clone = Arc::clone(&items_arc);
96 let h = thread::spawn(move || {
97 for i in start..end {
98 f_clone(&items_clone[i]);
99 }
100 });
101 handles.push(h);
102 start = end;
103 }
104 for h in handles {
105 h.join().expect("parallel_for_each: worker thread panicked");
106 }
107}
108pub fn parallel_map<T, R>(items: &[T], f: impl Fn(&T) -> R + Send + Sync + 'static) -> Vec<R>
118where
119 T: Send + Sync + Clone + 'static,
120 R: Send + 'static,
121{
122 if items.is_empty() {
123 return Vec::new();
124 }
125 let n_threads = thread::available_parallelism()
126 .map(|p| p.get())
127 .unwrap_or(4)
128 .min(items.len());
129 let f = Arc::new(f);
130 let items_arc: Arc<[T]> = items.into();
131 let chunk_size = items.len().div_ceil(n_threads);
132 let mut handles: Vec<thread::JoinHandle<Vec<R>>> = Vec::new();
133 let mut start = 0;
134 while start < items.len() {
135 let end = (start + chunk_size).min(items.len());
136 let f_clone = Arc::clone(&f);
137 let items_clone = Arc::clone(&items_arc);
138 let h = thread::spawn(move || (start..end).map(|i| f_clone(&items_clone[i])).collect());
139 handles.push(h);
140 start = end;
141 }
142 let mut result = Vec::with_capacity(items.len());
143 for h in handles {
144 result.extend(h.join().expect("parallel_map: worker thread panicked"));
145 }
146 result
147}
148pub fn parallel_filter<T>(
152 items: &[T],
153 predicate: impl Fn(&T) -> bool + Send + Sync + 'static,
154) -> Vec<T>
155where
156 T: Send + Sync + Clone + 'static,
157{
158 if items.is_empty() {
159 return Vec::new();
160 }
161 let n_threads = thread::available_parallelism()
162 .map(|p| p.get())
163 .unwrap_or(4)
164 .min(items.len());
165 let predicate = Arc::new(predicate);
166 let items_arc: Arc<[T]> = items.into();
167 let chunk_size = items.len().div_ceil(n_threads);
168 let mut handles: Vec<thread::JoinHandle<Vec<T>>> = Vec::new();
169 let mut start = 0;
170 while start < items.len() {
171 let end = (start + chunk_size).min(items.len());
172 let pred_clone = Arc::clone(&predicate);
173 let items_clone = Arc::clone(&items_arc);
174 let h = thread::spawn(move || {
175 (start..end)
176 .filter(|&i| pred_clone(&items_clone[i]))
177 .map(|i| items_clone[i].clone())
178 .collect()
179 });
180 handles.push(h);
181 start = end;
182 }
183 let mut result = Vec::new();
184 for h in handles {
185 result.extend(h.join().expect("parallel_filter: worker thread panicked"));
186 }
187 result
188}
189pub fn parallel_reduce<T>(
198 items: &[T],
199 f: impl Fn(T, T) -> T + Send + Sync + 'static,
200 identity: T,
201) -> T
202where
203 T: Clone + Send + Sync + 'static,
204{
205 if items.is_empty() {
206 return identity;
207 }
208 let n_threads = thread::available_parallelism()
209 .map(|p| p.get())
210 .unwrap_or(4)
211 .min(items.len());
212 let f = Arc::new(f);
213 let items_arc: Arc<[T]> = items.into();
214 let chunk_size = items.len().div_ceil(n_threads);
215 let mut handles: Vec<thread::JoinHandle<T>> = Vec::new();
216 let mut start = 0;
217 while start < items.len() {
218 let end = (start + chunk_size).min(items.len());
219 let f_clone = Arc::clone(&f);
220 let items_clone = Arc::clone(&items_arc);
221 let id = identity.clone();
222 let h = thread::spawn(move || {
223 let mut acc = id;
224 for i in start..end {
225 acc = f_clone(acc, items_clone[i].clone());
226 }
227 acc
228 });
229 handles.push(h);
230 start = end;
231 }
232 let mut acc = identity;
233 for h in handles {
234 let chunk_result = h.join().expect("parallel_reduce: worker thread panicked");
235 acc = f(acc, chunk_result);
236 }
237 acc
238}
239pub trait ReduceOperator: Send + Sync + 'static {
243 type Acc: Clone + Send + Sync + 'static;
245 type Item: Clone + Send + Sync + 'static;
247 type Result;
249 fn identity(&self) -> Self::Acc;
251 fn fold(&self, acc: Self::Acc, item: Self::Item) -> Self::Acc;
253 fn combine(&self, left: Self::Acc, right: Self::Acc) -> Self::Acc;
255 fn finalize(&self, acc: Self::Acc) -> Self::Result;
257}
258pub fn parallel_reduce_with_op<Op>(items: &[Op::Item], op: &Op) -> Op::Result
260where
261 Op: ReduceOperator,
262{
263 if items.is_empty() {
264 return op.finalize(op.identity());
265 }
266 let mut acc = op.identity();
267 for item in items {
268 acc = op.fold(acc, item.clone());
269 }
270 op.finalize(acc)
271}
272pub fn parallel_merge_sort(data: &mut [f64]) {
277 let n = data.len();
278 if n <= 1 {
279 return;
280 }
281 if n <= 64 {
282 insertion_sort(data);
283 return;
284 }
285 let n_threads = thread::available_parallelism()
286 .map(|p| p.get())
287 .unwrap_or(4)
288 .min(n / 32)
289 .max(1);
290 let chunk_size = n.div_ceil(n_threads);
291 let chunks: Vec<Vec<f64>> = data.chunks(chunk_size).map(|c| c.to_vec()).collect();
292 let handles: Vec<thread::JoinHandle<Vec<f64>>> = chunks
293 .into_iter()
294 .map(|mut chunk| {
295 thread::spawn(move || {
296 chunk.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
297 chunk
298 })
299 })
300 .collect();
301 let mut sorted_chunks: Vec<Vec<f64>> = handles
302 .into_iter()
303 .map(|h| h.join().expect("parallel_merge_sort: worker panicked"))
304 .collect();
305 while sorted_chunks.len() > 1 {
306 let mut merged = Vec::new();
307 let mut i = 0;
308 while i < sorted_chunks.len() {
309 if i + 1 < sorted_chunks.len() {
310 merged.push(merge_sorted(&sorted_chunks[i], &sorted_chunks[i + 1]));
311 i += 2;
312 } else {
313 merged.push(sorted_chunks[i].clone());
314 i += 1;
315 }
316 }
317 sorted_chunks = merged;
318 }
319 data.copy_from_slice(&sorted_chunks[0]);
320}
321pub fn merge_sorted(a: &[f64], b: &[f64]) -> Vec<f64> {
323 let mut result = Vec::with_capacity(a.len() + b.len());
324 let (mut i, mut j) = (0, 0);
325 while i < a.len() && j < b.len() {
326 if a[i] <= b[j] {
327 result.push(a[i]);
328 i += 1;
329 } else {
330 result.push(b[j]);
331 j += 1;
332 }
333 }
334 result.extend_from_slice(&a[i..]);
335 result.extend_from_slice(&b[j..]);
336 result
337}
338pub fn insertion_sort(data: &mut [f64]) {
340 for i in 1..data.len() {
341 let key = data[i];
342 let mut j = i;
343 while j > 0 && data[j - 1] > key {
344 data[j] = data[j - 1];
345 j -= 1;
346 }
347 data[j] = key;
348 }
349}
350pub fn prefix_sum(data: &[f64]) -> Vec<f64> {
354 let mut result = Vec::with_capacity(data.len());
355 let mut acc = 0.0_f64;
356 for &x in data {
357 acc += x;
358 result.push(acc);
359 }
360 result
361}
362pub fn exclusive_prefix_sum(data: &[f64]) -> Vec<f64> {
366 let mut result = Vec::with_capacity(data.len());
367 let mut acc = 0.0_f64;
368 for &x in data {
369 result.push(acc);
370 acc += x;
371 }
372 result
373}
374pub fn parallel_min(data: &[f64]) -> f64 {
378 if data.is_empty() {
379 return f64::INFINITY;
380 }
381 parallel_reduce(data, |a: f64, b: f64| a.min(b), f64::INFINITY)
382}
383pub fn parallel_max(data: &[f64]) -> f64 {
387 if data.is_empty() {
388 return f64::NEG_INFINITY;
389 }
390 parallel_reduce(data, |a: f64, b: f64| a.max(b), f64::NEG_INFINITY)
391}
392pub fn available_threads() -> usize {
396 thread::available_parallelism()
397 .map(|p| p.get())
398 .unwrap_or(1)
399}
400pub fn suggested_thread_count() -> usize {
405 (available_threads() / 2).max(1)
406}
407pub fn chunk_process<T, R, F>(data: &[T], chunk_size: usize, f: F) -> Vec<R>
414where
415 T: Clone,
416 F: Fn(&[T]) -> Vec<R>,
417{
418 if data.is_empty() || chunk_size == 0 {
419 return Vec::new();
420 }
421 data.chunks(chunk_size).flat_map(f).collect()
422}
423pub fn chunk_zip_map(
428 a: &[f64],
429 b: &[f64],
430 chunk_size: usize,
431 f: impl Fn(f64, f64) -> f64,
432) -> Vec<f64> {
433 assert_eq!(
434 a.len(),
435 b.len(),
436 "chunk_zip_map: slices must be same length"
437 );
438 if a.is_empty() || chunk_size == 0 {
439 return Vec::new();
440 }
441 let n = a.len();
442 let mut result = Vec::with_capacity(n);
443 let mut i = 0;
444 while i < n {
445 let end = (i + chunk_size).min(n);
446 for k in i..end {
447 result.push(f(a[k], b[k]));
448 }
449 i = end;
450 }
451 result
452}
453pub fn chunk_dot_product(a: &[f64], b: &[f64], chunk_size: usize) -> f64 {
458 assert_eq!(a.len(), b.len(), "chunk_dot_product: length mismatch");
459 if a.is_empty() {
460 return 0.0;
461 }
462 let chunk_size = chunk_size.max(1);
463 let mut total = 0.0;
464 let mut i = 0;
465 while i < a.len() {
466 let end = (i + chunk_size).min(a.len());
467 let mut partial = 0.0;
468 for k in i..end {
469 partial += a[k] * b[k];
470 }
471 total += partial;
472 i = end;
473 }
474 total
475}
476pub fn parallel_sort(data: &mut [f64]) {
481 quicksort_recursive(data);
482}
483pub fn quicksort_recursive(data: &mut [f64]) {
485 let n = data.len();
486 if n <= 1 {
487 return;
488 }
489 if n <= 16 {
490 insertion_sort(data);
491 return;
492 }
493 let pivot_idx = median_of_three(data);
494 data.swap(pivot_idx, n - 1);
495 let pivot = data[n - 1];
496 let mut store = 0;
497 for i in 0..n - 1 {
498 if data[i] <= pivot {
499 data.swap(i, store);
500 store += 1;
501 }
502 }
503 data.swap(store, n - 1);
504 let (left, right) = data.split_at_mut(store);
505 quicksort_recursive(left);
506 quicksort_recursive(&mut right[1..]);
507}
508pub fn median_of_three(data: &[f64]) -> usize {
510 let n = data.len();
511 let mid = n / 2;
512 let (a, b, c) = (data[0], data[mid], data[n - 1]);
513 if (a <= b && b <= c) || (c <= b && b <= a) {
514 mid
515 } else if (b <= a && a <= c) || (c <= a && a <= b) {
516 0
517 } else {
518 n - 1
519 }
520}
521pub fn sorted_copy(data: &[f64]) -> Vec<f64> {
523 let mut v = data.to_vec();
524 parallel_sort(&mut v);
525 v
526}
527pub fn parallel_histogram(data: &[f64], n_bins: usize) -> Vec<usize> {
532 if data.is_empty() || n_bins == 0 {
533 return vec![];
534 }
535 let min = data.iter().cloned().fold(f64::INFINITY, f64::min);
536 let max = data.iter().cloned().fold(f64::NEG_INFINITY, f64::max);
537 let width = if (max - min).abs() < f64::EPSILON {
538 1.0
539 } else {
540 (max - min) / n_bins as f64
541 };
542 let mut counts = vec![0usize; n_bins];
543 for &v in data {
544 let idx = ((v - min) / width) as usize;
545 counts[idx.min(n_bins - 1)] += 1;
546 }
547 counts
548}