Skip to main content

anodizer_core/
parallel.rs

1//! Shared bounded-parallelism helper used by stages that run one subprocess
2//! per sub-config (makeself, nfpm, snapcraft, flatpak, upx, …).
3//!
4//! The stages share the same Step 1 / Step 2 / Step 3 shape:
5//!
6//! 1. **Step 1** (serial, `&mut ctx`): render templates, stage files,
7//!    collect a `Vec<Job>` of fully-owned work units.
8//! 2. **Step 2** (parallel, bounded by `ctx.options.parallelism`): run one
9//!    subprocess per job in `std::thread::scope`.
10//! 3. **Step 3** (serial, `&mut ctx`): register the returned artifacts.
11//!
12//! Before this helper every stage hand-rolled the Step 2 loop —
13//! `for chunk in jobs.chunks(n) { thread::scope(|s| …) }` with its own
14//! join-unwrap-or-panic handling. The pattern is now shared here so new
15//! parallelized stages just write `run_job`.
16//!
17//! Semantics match the previous hand-rolled loops exactly:
18//!
19//! - **Bounded concurrency**: at most `parallelism` workers run at once,
20//!   enforced by chunking the job list and scoping threads per-chunk.
21//! - **Fail-fast within a chunk**: if any worker in a chunk fails, the whole
22//!   chunk still runs to completion (threads are already spawned), but the
23//!   caller receives the first error and processes no further chunks.
24//! - **Panic-safe**: a worker panic becomes an `anyhow::Error` annotated
25//!   with `stage_name`, so a panicked thread doesn't leave the pool
26//!   deadlocked or drop all other results on the floor.
27//! - **Order-preserving**: results are collected in job-submission order, so
28//!   downstream artifact registration remains deterministic.
29
30use anyhow::{Result, anyhow};
31
32use crate::log::StageLogger;
33use std::sync::{Mutex, MutexGuard};
34
35/// Acquire a `Mutex` guard, recovering from poison rather than panicking.
36///
37/// A poisoned lock means a sibling worker thread panicked while holding
38/// the guard. For the data shapes this helper is used on (counters,
39/// `Vec` accumulators), the inner state has no invariant a panic could
40/// have broken — the worst case is one partial write missing. Panicking
41/// the current worker too would abandon its already-completed network
42/// call without updating the count, silently inflating the operator's
43/// `failed` bucket.
44pub fn lock_recover<'a, T>(m: &'a Mutex<T>, log: &StageLogger, label: &str) -> MutexGuard<'a, T> {
45    match m.lock() {
46        Ok(g) => g,
47        Err(poisoned) => {
48            log.warn(&format!(
49                "{label}: mutex poisoned by sibling thread panic; recovering state"
50            ));
51            poisoned.into_inner()
52        }
53    }
54}
55
56/// Translate a `thread::JoinHandle::join` result's panic payload into
57/// an `anyhow::Error` tagged with `label`. The two common panic
58/// payload shapes (`&'static str` / `String`) are downcast so the
59/// surfaced message is readable rather than the opaque `Any`
60/// placeholder.
61///
62/// Accepts `Result<T, Box<dyn Any + Send>>` rather than the handle
63/// itself so a single helper covers both [`std::thread::JoinHandle`]
64/// and [`std::thread::ScopedJoinHandle`] — both expose `.join()`
65/// returning the same `Result` shape.
66///
67/// Use when the worker returns `T` and the caller wants `Result<T>`
68/// so a panic doesn't propagate as a silently-lost result. For
69/// workers that already return `Result<T, anyhow::Error>`, prefer
70/// [`run_parallel_chunks`] which bakes this in.
71pub fn join_panic_to_err<T>(join_result: std::thread::Result<T>, label: &str) -> Result<T> {
72    join_result.map_err(|panic_payload| {
73        let msg = if let Some(s) = panic_payload.downcast_ref::<&'static str>() {
74            (*s).to_string()
75        } else if let Some(s) = panic_payload.downcast_ref::<String>() {
76            s.clone()
77        } else {
78            format!("{:?}", panic_payload)
79        };
80        anyhow!("{label} worker thread panicked: {msg}")
81    })
82}
83
84/// Run `run_job` across `jobs` with bounded parallelism. Returns the
85/// per-job results in submission order.
86///
87/// `stage_name` is embedded in the panic error message so a crash in one
88/// stage is attributable at a glance (`"nfpm worker thread panicked"` vs
89/// `"snapcraft worker thread panicked"`).
90///
91/// `parallelism` is clamped to `>= 1` internally, so callers can pass
92/// `ctx.options.parallelism` without pre-clamping.
93pub fn run_parallel_chunks<J, T, F>(
94    jobs: &[J],
95    parallelism: usize,
96    stage_name: &'static str,
97    run_job: F,
98) -> Result<Vec<T>>
99where
100    J: Sync,
101    T: Send,
102    F: Fn(&J) -> Result<T> + Sync,
103{
104    let parallelism = parallelism.max(1);
105    let mut results: Vec<T> = Vec::with_capacity(jobs.len());
106
107    for chunk in jobs.chunks(parallelism) {
108        let chunk_results: Vec<Result<T>> = std::thread::scope(|s| {
109            let handles: Vec<_> = chunk.iter().map(|job| s.spawn(|| run_job(job))).collect();
110            handles
111                .into_iter()
112                .map(|h| {
113                    h.join()
114                        .unwrap_or_else(|_| Err(anyhow!("{} worker thread panicked", stage_name)))
115                })
116                .collect()
117        });
118
119        for r in chunk_results {
120            results.push(r?);
121        }
122    }
123    Ok(results)
124}
125
126#[cfg(test)]
127mod tests {
128    use super::*;
129    use std::sync::atomic::{AtomicUsize, Ordering};
130
131    #[test]
132    fn preserves_submission_order() {
133        // Even with multi-threaded execution, the returned Vec must mirror
134        // the input slice order so downstream artifact registration is
135        // deterministic across runs.
136        let jobs: Vec<u32> = (0..20).collect();
137        let out = run_parallel_chunks(&jobs, 4, "test", |job| Ok(*job * 10)).unwrap();
138        assert_eq!(out, (0..20).map(|i| i * 10).collect::<Vec<_>>());
139    }
140
141    #[test]
142    fn bounded_concurrency() {
143        // With parallelism=2 across 10 jobs, no more than 2 workers should
144        // be in-flight at once. We observe this via an AtomicUsize peak
145        // counter that each worker increments on entry and decrements on
146        // exit, with a small sleep to force overlap.
147        let jobs: Vec<u32> = (0..10).collect();
148        let in_flight = AtomicUsize::new(0);
149        let peak = AtomicUsize::new(0);
150
151        run_parallel_chunks(&jobs, 2, "test", |_| {
152            let now = in_flight.fetch_add(1, Ordering::SeqCst) + 1;
153            peak.fetch_max(now, Ordering::SeqCst);
154            std::thread::sleep(std::time::Duration::from_millis(10));
155            in_flight.fetch_sub(1, Ordering::SeqCst);
156            Ok(())
157        })
158        .unwrap();
159
160        assert!(
161            peak.load(Ordering::SeqCst) <= 2,
162            "peak in-flight workers exceeded parallelism bound"
163        );
164    }
165
166    #[test]
167    fn propagates_first_error() {
168        // A single failing job should fail the batch. The job index returned
169        // in the error payload asserts the failing worker is the one the
170        // caller receives (not silently swallowed by a later success).
171        let jobs: Vec<u32> = (0..4).collect();
172        let result = run_parallel_chunks(&jobs, 2, "test", |job| {
173            if *job == 2 {
174                Err(anyhow!("job 2 failed"))
175            } else {
176                Ok(*job)
177            }
178        });
179        let err = result.unwrap_err();
180        assert!(
181            err.to_string().contains("job 2 failed"),
182            "unexpected error: {}",
183            err
184        );
185    }
186
187    #[test]
188    fn zero_parallelism_clamps_to_one() {
189        // `ctx.options.parallelism` can legitimately be 0 (unset) —
190        // callers must not need to pre-clamp. Verify the helper runs
191        // sequentially in that case rather than spawning 0 threads.
192        let jobs: Vec<u32> = (0..3).collect();
193        let out = run_parallel_chunks(&jobs, 0, "test", |job| Ok(*job + 1)).unwrap();
194        assert_eq!(out, vec![1, 2, 3]);
195    }
196
197    #[test]
198    fn empty_jobs_returns_empty() {
199        let out: Vec<u32> = run_parallel_chunks::<u32, u32, _>(&[], 4, "test", |_| Ok(0)).unwrap();
200        assert!(out.is_empty());
201    }
202
203    #[test]
204    fn panic_in_worker_becomes_anyhow_error() {
205        // A panicking worker must not take down the whole thread::scope
206        // silently — we want an attributable error with the stage name.
207        let jobs: Vec<u32> = vec![1, 2, 3];
208        let result = run_parallel_chunks(&jobs, 2, "explode-stage", |job| -> Result<u32> {
209            if *job == 2 {
210                panic!("boom");
211            }
212            Ok(*job)
213        });
214        let err = result.unwrap_err();
215        assert!(
216            err.to_string()
217                .contains("explode-stage worker thread panicked"),
218            "unexpected error: {}",
219            err
220        );
221    }
222
223    // ---------- lock_recover ----------
224
225    #[test]
226    fn lock_recover_returns_inner_when_unpoisoned() {
227        // Happy path: an unpoisoned Mutex yields its guard, the helper
228        // adds no observable behavior over a bare `.lock().unwrap()`.
229        let log = StageLogger::new("test", crate::log::Verbosity::Quiet);
230        let m = Mutex::new(0u32);
231        {
232            let mut g = lock_recover(&m, &log, "test");
233            *g = 42;
234        }
235        assert_eq!(*m.lock().unwrap(), 42);
236    }
237
238    #[test]
239    fn lock_recover_recovers_from_poison() {
240        // A poisoned Mutex (sibling thread panicked while holding the
241        // guard) must yield the inner state rather than panicking the
242        // recovering thread too.
243        let log = StageLogger::new("test", crate::log::Verbosity::Quiet);
244        let m = std::sync::Arc::new(Mutex::new(7u32));
245        let m_for_thread = std::sync::Arc::clone(&m);
246        let h = std::thread::spawn(move || {
247            let _g = m_for_thread.lock().unwrap();
248            panic!("poison the mutex");
249        });
250        let _ = h.join();
251        assert!(m.is_poisoned(), "test setup: mutex should be poisoned");
252        let g = lock_recover(&m, &log, "test");
253        assert_eq!(*g, 7);
254    }
255
256    // ---------- join_panic_to_err ----------
257
258    #[test]
259    fn join_panic_to_err_passes_through_success() {
260        let h = std::thread::spawn(|| 42u32);
261        let r = join_panic_to_err(h.join(), "worker").unwrap();
262        assert_eq!(r, 42);
263    }
264
265    #[test]
266    fn join_panic_to_err_translates_str_panic() {
267        // The most common panic shape in our codebase is `panic!("msg")`
268        // which produces a `&'static str` payload — verify the message
269        // survives into the surfaced anyhow chain.
270        let h = std::thread::spawn(|| -> u32 {
271            panic!("kaboom");
272        });
273        let err = join_panic_to_err(h.join(), "worker").unwrap_err();
274        let s = err.to_string();
275        assert!(
276            s.contains("worker worker thread panicked") && s.contains("kaboom"),
277            "unexpected error: {}",
278            s
279        );
280    }
281
282    #[test]
283    fn join_panic_to_err_translates_string_panic() {
284        // The other common panic shape — `format!()`-derived `String`
285        // payloads — must also be downcast rather than printing as `Any`.
286        let h = std::thread::spawn(|| -> u32 {
287            panic!("{}", String::from("string-panic"));
288        });
289        let err = join_panic_to_err(h.join(), "worker").unwrap_err();
290        assert!(
291            err.to_string().contains("string-panic"),
292            "unexpected error: {}",
293            err
294        );
295    }
296
297    #[test]
298    fn join_panic_to_err_works_on_scoped_handle() {
299        // ScopedJoinHandle::join returns the same Result shape as
300        // JoinHandle::join — verify a single helper covers both so
301        // callers using `std::thread::scope` don't need a second variant.
302        let out: Result<u32> = std::thread::scope(|s| {
303            let h = s.spawn(|| 99u32);
304            join_panic_to_err(h.join(), "scoped")
305        });
306        assert_eq!(out.unwrap(), 99);
307    }
308}