use anyhow::{Result, anyhow};
pub fn run_parallel_chunks<J, T, F>(
jobs: &[J],
parallelism: usize,
stage_name: &'static str,
run_job: F,
) -> Result<Vec<T>>
where
J: Sync,
T: Send,
F: Fn(&J) -> Result<T> + Sync,
{
let parallelism = parallelism.max(1);
let mut results: Vec<T> = Vec::with_capacity(jobs.len());
for chunk in jobs.chunks(parallelism) {
let chunk_results: Vec<Result<T>> = std::thread::scope(|s| {
let handles: Vec<_> = chunk.iter().map(|job| s.spawn(|| run_job(job))).collect();
handles
.into_iter()
.map(|h| {
h.join()
.unwrap_or_else(|_| Err(anyhow!("{} worker thread panicked", stage_name)))
})
.collect()
});
for r in chunk_results {
results.push(r?);
}
}
Ok(results)
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
#[test]
fn preserves_submission_order() {
let jobs: Vec<u32> = (0..20).collect();
let out = run_parallel_chunks(&jobs, 4, "test", |job| Ok(*job * 10)).unwrap();
assert_eq!(out, (0..20).map(|i| i * 10).collect::<Vec<_>>());
}
#[test]
fn bounded_concurrency() {
let jobs: Vec<u32> = (0..10).collect();
let in_flight = AtomicUsize::new(0);
let peak = AtomicUsize::new(0);
run_parallel_chunks(&jobs, 2, "test", |_| {
let now = in_flight.fetch_add(1, Ordering::SeqCst) + 1;
peak.fetch_max(now, Ordering::SeqCst);
std::thread::sleep(std::time::Duration::from_millis(10));
in_flight.fetch_sub(1, Ordering::SeqCst);
Ok(())
})
.unwrap();
assert!(
peak.load(Ordering::SeqCst) <= 2,
"peak in-flight workers exceeded parallelism bound"
);
}
#[test]
fn propagates_first_error() {
let jobs: Vec<u32> = (0..4).collect();
let result = run_parallel_chunks(&jobs, 2, "test", |job| {
if *job == 2 {
Err(anyhow!("job 2 failed"))
} else {
Ok(*job)
}
});
let err = result.unwrap_err();
assert!(
err.to_string().contains("job 2 failed"),
"unexpected error: {}",
err
);
}
#[test]
fn zero_parallelism_clamps_to_one() {
let jobs: Vec<u32> = (0..3).collect();
let out = run_parallel_chunks(&jobs, 0, "test", |job| Ok(*job + 1)).unwrap();
assert_eq!(out, vec![1, 2, 3]);
}
#[test]
fn empty_jobs_returns_empty() {
let out: Vec<u32> = run_parallel_chunks::<u32, u32, _>(&[], 4, "test", |_| Ok(0)).unwrap();
assert!(out.is_empty());
}
#[test]
fn panic_in_worker_becomes_anyhow_error() {
let jobs: Vec<u32> = vec![1, 2, 3];
let result = run_parallel_chunks(&jobs, 2, "explode-stage", |job| -> Result<u32> {
if *job == 2 {
panic!("boom");
}
Ok(*job)
});
let err = result.unwrap_err();
assert!(
err.to_string()
.contains("explode-stage worker thread panicked"),
"unexpected error: {}",
err
);
}
}