anodizer_core/parallel.rs
1//! Shared bounded-parallelism helper used by stages that run one subprocess
2//! per sub-config (makeself, nfpm, snapcraft, flatpak, upx, …).
3//!
4//! The stages share the same Phase 1 / Phase 2 / Phase 3 shape:
5//!
6//! 1. **Phase 1** (serial, `&mut ctx`): render templates, stage files,
7//! collect a `Vec<Job>` of fully-owned work units.
8//! 2. **Phase 2** (parallel, bounded by `ctx.options.parallelism`): run one
9//! subprocess per job in `std::thread::scope`.
10//! 3. **Phase 3** (serial, `&mut ctx`): register the returned artifacts.
11//!
12//! Before this helper every stage hand-rolled the Phase 2 loop —
13//! `for chunk in jobs.chunks(n) { thread::scope(|s| …) }` with its own
14//! join-unwrap-or-panic handling. The pattern is now shared here so new
15//! parallelized stages just write `run_job`.
16//!
17//! Semantics match the previous hand-rolled loops exactly:
18//!
19//! - **Bounded concurrency**: at most `parallelism` workers run at once,
20//! enforced by chunking the job list and scoping threads per-chunk.
21//! - **Fail-fast within a chunk**: if any worker in a chunk fails, the whole
22//! chunk still runs to completion (threads are already spawned), but the
23//! caller receives the first error and processes no further chunks.
24//! - **Panic-safe**: a worker panic becomes an `anyhow::Error` annotated
25//! with `stage_name`, so a panicked thread doesn't leave the pool
26//! deadlocked or drop all other results on the floor.
27//! - **Order-preserving**: results are collected in job-submission order, so
28//! downstream artifact registration remains deterministic.
29
30use anyhow::{Result, anyhow};
31
32/// Run `run_job` across `jobs` with bounded parallelism. Returns the
33/// per-job results in submission order.
34///
35/// `stage_name` is embedded in the panic error message so a crash in one
36/// stage is attributable at a glance (`"nfpm worker thread panicked"` vs
37/// `"snapcraft worker thread panicked"`).
38///
39/// `parallelism` is clamped to `>= 1` internally, so callers can pass
40/// `ctx.options.parallelism` without pre-clamping.
41pub fn run_parallel_chunks<J, T, F>(
42 jobs: &[J],
43 parallelism: usize,
44 stage_name: &'static str,
45 run_job: F,
46) -> Result<Vec<T>>
47where
48 J: Sync,
49 T: Send,
50 F: Fn(&J) -> Result<T> + Sync,
51{
52 let parallelism = parallelism.max(1);
53 let mut results: Vec<T> = Vec::with_capacity(jobs.len());
54
55 for chunk in jobs.chunks(parallelism) {
56 let chunk_results: Vec<Result<T>> = std::thread::scope(|s| {
57 let handles: Vec<_> = chunk.iter().map(|job| s.spawn(|| run_job(job))).collect();
58 handles
59 .into_iter()
60 .map(|h| {
61 h.join()
62 .unwrap_or_else(|_| Err(anyhow!("{} worker thread panicked", stage_name)))
63 })
64 .collect()
65 });
66
67 for r in chunk_results {
68 results.push(r?);
69 }
70 }
71 Ok(results)
72}
73
74#[cfg(test)]
75mod tests {
76 use super::*;
77 use std::sync::atomic::{AtomicUsize, Ordering};
78
79 #[test]
80 fn preserves_submission_order() {
81 // Even with multi-threaded execution, the returned Vec must mirror
82 // the input slice order so downstream artifact registration is
83 // deterministic across runs.
84 let jobs: Vec<u32> = (0..20).collect();
85 let out = run_parallel_chunks(&jobs, 4, "test", |job| Ok(*job * 10)).unwrap();
86 assert_eq!(out, (0..20).map(|i| i * 10).collect::<Vec<_>>());
87 }
88
89 #[test]
90 fn bounded_concurrency() {
91 // With parallelism=2 across 10 jobs, no more than 2 workers should
92 // be in-flight at once. We observe this via an AtomicUsize peak
93 // counter that each worker increments on entry and decrements on
94 // exit, with a small sleep to force overlap.
95 let jobs: Vec<u32> = (0..10).collect();
96 let in_flight = AtomicUsize::new(0);
97 let peak = AtomicUsize::new(0);
98
99 run_parallel_chunks(&jobs, 2, "test", |_| {
100 let now = in_flight.fetch_add(1, Ordering::SeqCst) + 1;
101 peak.fetch_max(now, Ordering::SeqCst);
102 std::thread::sleep(std::time::Duration::from_millis(10));
103 in_flight.fetch_sub(1, Ordering::SeqCst);
104 Ok(())
105 })
106 .unwrap();
107
108 assert!(
109 peak.load(Ordering::SeqCst) <= 2,
110 "peak in-flight workers exceeded parallelism bound"
111 );
112 }
113
114 #[test]
115 fn propagates_first_error() {
116 // A single failing job should fail the batch. The job index returned
117 // in the error payload asserts the failing worker is the one the
118 // caller receives (not silently swallowed by a later success).
119 let jobs: Vec<u32> = (0..4).collect();
120 let result = run_parallel_chunks(&jobs, 2, "test", |job| {
121 if *job == 2 {
122 Err(anyhow!("job 2 failed"))
123 } else {
124 Ok(*job)
125 }
126 });
127 let err = result.unwrap_err();
128 assert!(
129 err.to_string().contains("job 2 failed"),
130 "unexpected error: {}",
131 err
132 );
133 }
134
135 #[test]
136 fn zero_parallelism_clamps_to_one() {
137 // `ctx.options.parallelism` can legitimately be 0 (unset) —
138 // callers must not need to pre-clamp. Verify the helper runs
139 // sequentially in that case rather than spawning 0 threads.
140 let jobs: Vec<u32> = (0..3).collect();
141 let out = run_parallel_chunks(&jobs, 0, "test", |job| Ok(*job + 1)).unwrap();
142 assert_eq!(out, vec![1, 2, 3]);
143 }
144
145 #[test]
146 fn empty_jobs_returns_empty() {
147 let out: Vec<u32> = run_parallel_chunks::<u32, u32, _>(&[], 4, "test", |_| Ok(0)).unwrap();
148 assert!(out.is_empty());
149 }
150
151 #[test]
152 fn panic_in_worker_becomes_anyhow_error() {
153 // A panicking worker must not take down the whole thread::scope
154 // silently — we want an attributable error with the stage name.
155 let jobs: Vec<u32> = vec![1, 2, 3];
156 let result = run_parallel_chunks(&jobs, 2, "explode-stage", |job| -> Result<u32> {
157 if *job == 2 {
158 panic!("boom");
159 }
160 Ok(*job)
161 });
162 let err = result.unwrap_err();
163 assert!(
164 err.to_string()
165 .contains("explode-stage worker thread panicked"),
166 "unexpected error: {}",
167 err
168 );
169 }
170}