Skip to main content

taktora_executor/
fatal.rs

1//! Fatal-handler API — `FatalContext`, `FatalSite`, `FatalHandler`, and the
2//! crate-internal `FatalDispatch` that owns the handler plus a swappable
3//! terminal action (production: `std::process::abort`; test: recording stub).
4//!
5//! This module also houses `panic_payload_message`, moved here from
6//! `executor.rs` (Task 1) because it is the natural extraction point for
7//! panic-payload introspection shared by the fatal path.
8
9// This is a private module; pub(crate) on items is intentional — they are used
10// by executor.rs / pool.rs once Task 3 wires the hot path.
11#![allow(clippy::redundant_pub_crate)]
12
13use std::sync::Arc;
14
15// ── Public API ────────────────────────────────────────────────────────────────
16
17/// Why the runtime is about to abort. Passed to the fatal handler.
18///
19/// Marked `#[non_exhaustive]` so future fields (e.g. a thread name, a
20/// stack-trace fragment) do not break existing match/struct-init expressions.
21#[non_exhaustive]
22pub struct FatalContext {
23    /// Best-effort message extracted from the panic payload.
24    pub cause: String,
25    /// Which runtime boundary caught it.
26    pub site: FatalSite,
27}
28
29/// Which executor boundary detected the unrecoverable fault.
30///
31/// Marked `#[non_exhaustive]` so new boundaries (e.g. a future timer thread)
32/// can be added without breaking `match` arms in caller code.
33#[non_exhaustive]
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum FatalSite {
36    /// A pool worker thread's `catch_unwind` boundary re-panicked.
37    PoolWorker,
38    /// The inline-submit path (pool size 0) caught a second panic.
39    InlineSubmit,
40    /// The executor's main run-loop caught an unrecoverable panic.
41    ExecutorRunLoop,
42}
43
44/// An `Arc`-wrapped callback invoked once on the fail-fast path.
45///
46/// **Contract** (from [`crate::ExecutorBuilder::on_fatal`]):
47/// - Runs over known-unsound executor state.
48/// - MUST NOT touch executor internals.
49/// - A panic inside the handler routes straight to `abort()`.
50pub type FatalHandler = Arc<dyn Fn(&FatalContext) + Send + Sync + 'static>;
51
52// ── crate-internal dispatch ────────────────────────────────────────────────────
53
54/// Owns a user handler and a terminal action. The terminal is
55/// `std::process::abort` in production; tests may substitute a recording stub
56/// via `FatalDispatch::with_terminal` (available in `#[cfg(test)]` only).
57pub(crate) struct FatalDispatch {
58    handler: FatalHandler,
59    terminal: Arc<dyn Fn(&FatalContext) + Send + Sync + 'static>,
60}
61
62impl FatalDispatch {
63    /// Production constructor. Terminal is `std::process::abort`.
64    pub(crate) fn new(handler: FatalHandler) -> Self {
65        Self {
66            handler,
67            terminal: Arc::new(|_ctx| std::process::abort()),
68        }
69    }
70
71    /// Test-only constructor that allows substituting the terminal.
72    ///
73    /// Only available in `cfg(test)` builds — the abort terminal is the only
74    /// terminal reachable in release.
75    #[cfg(test)]
76    pub(crate) fn with_terminal(
77        handler: FatalHandler,
78        terminal: impl Fn(&FatalContext) + Send + Sync + 'static,
79    ) -> Self {
80        Self {
81            handler,
82            terminal: Arc::new(terminal),
83        }
84    }
85
86    /// Return a reference to the stored handler.
87    ///
88    /// Only available in test builds — the handler is an implementation detail;
89    /// production code has no need to inspect it.
90    #[cfg(test)]
91    pub(crate) fn handler(&self) -> &FatalHandler {
92        &self.handler
93    }
94
95    /// Invoke the handler (catch-guarded so a handler panic still reaches the
96    /// terminal), then invoke the terminal.
97    ///
98    /// In production the terminal calls `std::process::abort()` and therefore
99    /// this function diverges. In tests the terminal records and returns.
100    pub(crate) fn fire(&self, ctx: &FatalContext) {
101        // SAFETY: on the production path the terminal calls std::process::abort()
102        // and the process never resumes use of `ctx` or any state captured by the
103        // handler closure, so inconsistent state is never observable. The test
104        // terminal returns, but test closures hold no cross-unwind invariants.
105        let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| (self.handler)(ctx)));
106        // Production terminal diverges (abort). Test terminal records + returns.
107        //
108        // Deliberately NOT catch-guarded: the production terminal is
109        // `std::process::abort()`, which cannot unwind, so there is nothing to
110        // catch. A panicking terminal can only come from a `#[cfg(test)]`
111        // fixture, where it is a test bug that must surface loudly rather than
112        // be masked.
113        (self.terminal)(ctx);
114    }
115}
116
117/// Run `f`, converting any escaping (framework-internal) panic into a fail-fast.
118/// Returns `Some(r)` on success. On panic, calls `fatal.fire(...)`; in production
119/// `fire` aborts and this never returns, so the `None` is observable only under a
120/// test terminal.
121pub(crate) fn guard_or_fatal<R>(
122    fatal: &FatalDispatch,
123    site: FatalSite,
124    f: impl FnOnce() -> R,
125) -> Option<R> {
126    // SAFETY: on the production path `fatal.fire` calls std::process::abort() and
127    // the process never resumes use of any state captured by `f`, so a
128    // possibly-inconsistent captured state is never observed after the panic.
129    // (The test terminal returns, but test closures hold no cross-unwind
130    // invariants.) This matches the existing AssertUnwindSafe convention in this
131    // crate's catch-unwind boundaries.
132    match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
133        Ok(r) => Some(r),
134        Err(payload) => {
135            let cause =
136                panic_payload_message(&*payload).unwrap_or_else(|| "framework panic".to_string());
137            fatal.fire(&FatalContext { cause, site });
138            None
139        }
140    }
141}
142
143// ── Shared helper (moved from executor.rs Task 1) ─────────────────────────────
144
145/// Extract a human-readable message from a panic payload.
146///
147/// Returns `Some(msg)` when the payload is a `&str` or `String`, and `None`
148/// for any other payload type.  Callers may supply their own fallback for the
149/// `None` case, which makes the helper reusable across different catch-unwind
150/// boundaries that may want different default messages.
151pub(crate) fn panic_payload_message(payload: &(dyn core::any::Any + Send)) -> Option<String> {
152    payload
153        .downcast_ref::<&str>()
154        .map(|s| (*s).to_string())
155        .or_else(|| payload.downcast_ref::<String>().cloned())
156}
157
158// ── Tests ─────────────────────────────────────────────────────────────────────
159
160#[cfg(test)]
161mod tests {
162    use super::*;
163    use std::sync::{Arc, Mutex};
164
165    // ── panic_payload_message ─────────────────────────────────────────────────
166
167    #[test]
168    fn panic_payload_message_str_payload() {
169        let payload = std::panic::catch_unwind(|| panic!("static str msg")).unwrap_err();
170        assert_eq!(
171            panic_payload_message(&*payload),
172            Some("static str msg".to_string())
173        );
174    }
175
176    #[test]
177    fn panic_payload_message_string_payload() {
178        let msg = "owned string msg".to_string();
179        let payload = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| panic!("{}", msg)))
180            .unwrap_err();
181        assert_eq!(
182            panic_payload_message(&*payload),
183            Some("owned string msg".to_string())
184        );
185    }
186
187    #[test]
188    fn panic_payload_message_non_string_payload() {
189        let payload = std::panic::catch_unwind(|| std::panic::panic_any(42_u32)).unwrap_err();
190        assert_eq!(panic_payload_message(&*payload), None);
191    }
192
193    // ── FatalDispatch ─────────────────────────────────────────────────────────
194
195    /// Helper: build a recording terminal + a shared log Vec.
196    fn recording_terminal() -> (
197        Arc<Mutex<Vec<String>>>,
198        impl Fn(&FatalContext) + Send + Sync + 'static,
199    ) {
200        let log: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
201        let log2 = Arc::clone(&log);
202        let terminal = move |ctx: &FatalContext| {
203            log2.lock().unwrap().push(format!("terminal:{}", ctx.cause));
204        };
205        (log, terminal)
206    }
207
208    #[test]
209    fn fire_runs_handler_then_terminal_in_order() {
210        let order: Arc<Mutex<Vec<&'static str>>> = Arc::new(Mutex::new(Vec::new()));
211
212        let order_h = Arc::clone(&order);
213        let handler: FatalHandler = Arc::new(move |_ctx| {
214            order_h.lock().unwrap().push("handler");
215        });
216
217        let order_t = Arc::clone(&order);
218        let terminal = move |_ctx: &FatalContext| {
219            order_t.lock().unwrap().push("terminal");
220        };
221
222        let dispatch = FatalDispatch::with_terminal(handler, terminal);
223        dispatch.fire(&FatalContext {
224            cause: "boom".to_string(),
225            site: FatalSite::PoolWorker,
226        });
227
228        let log = order.lock().unwrap().clone();
229        assert_eq!(
230            log,
231            vec!["handler", "terminal"],
232            "handler must run before terminal"
233        );
234    }
235
236    #[test]
237    fn fire_handler_panic_still_reaches_terminal() {
238        let (log, terminal) = recording_terminal();
239
240        let panicking_handler: FatalHandler = Arc::new(|_ctx| panic!("handler exploded"));
241
242        let dispatch = FatalDispatch::with_terminal(panicking_handler, terminal);
243        dispatch.fire(&FatalContext {
244            cause: "cause-xyz".to_string(),
245            site: FatalSite::ExecutorRunLoop,
246        });
247
248        let entries = log.lock().unwrap().clone();
249        // Terminal must have been reached even though handler panicked.
250        assert!(
251            entries.iter().any(|e| e.contains("terminal:cause-xyz")),
252            "terminal not reached after handler panic; log: {entries:?}"
253        );
254    }
255
256    // ── guard_or_fatal (TEST_0823 mechanism) ──────────────────────────────────
257
258    /// Helper: a `FatalDispatch` whose terminal records `(site, cause)` into a
259    /// shared Vec instead of aborting, so the boundary is observable in-process.
260    type Recorder = Arc<Mutex<Vec<(FatalSite, String)>>>;
261
262    fn recording_dispatch() -> (Recorder, FatalDispatch) {
263        let rec: Recorder = Arc::new(Mutex::new(Vec::new()));
264        let rec2 = Arc::clone(&rec);
265        let handler: FatalHandler = Arc::new(|_ctx| {});
266        let dispatch = FatalDispatch::with_terminal(handler, move |ctx| {
267            rec2.lock().unwrap().push((ctx.site, ctx.cause.clone()));
268        });
269        (rec, dispatch)
270    }
271
272    #[test]
273    fn guard_or_fatal_success_returns_some_and_does_not_fire() {
274        let (rec, dispatch) = recording_dispatch();
275        let out = guard_or_fatal(&dispatch, FatalSite::ExecutorRunLoop, || 7_u32);
276        assert_eq!(out, Some(7));
277        assert!(
278            rec.lock().unwrap().is_empty(),
279            "terminal must not fire on success"
280        );
281    }
282
283    #[test]
284    fn guard_or_fatal_panic_fires_once_with_site_and_cause() {
285        let (rec, dispatch) = recording_dispatch();
286        let out: Option<()> = guard_or_fatal(&dispatch, FatalSite::PoolWorker, || {
287            panic!("synthetic infra panic")
288        });
289        // Under the recording terminal `fire` returns, so `guard_or_fatal`
290        // yields `None`.
291        assert!(
292            out.is_none(),
293            "panic path must yield None under test terminal"
294        );
295        let entries = rec.lock().unwrap().clone();
296        assert_eq!(entries.len(), 1, "fatal must fire exactly once");
297        assert_eq!(entries[0].0, FatalSite::PoolWorker);
298        assert_eq!(entries[0].1, "synthetic infra panic");
299    }
300
301    #[test]
302    fn guard_or_fatal_propagates_run_loop_site() {
303        // Covers the ExecutorRunLoop site via the same mechanism (a full
304        // end-to-end executor trigger that panics *inside* the WaitSet drive is
305        // impractical to provoke deterministically without an artificial fault
306        // injection seam, so the boundary is proven at the helper level).
307        let (rec, dispatch) = recording_dispatch();
308        let out: Option<()> = guard_or_fatal(&dispatch, FatalSite::ExecutorRunLoop, || {
309            panic!("run-loop boom")
310        });
311        assert!(out.is_none());
312        let entries = rec.lock().unwrap().clone();
313        assert_eq!(entries.len(), 1);
314        assert_eq!(entries[0].0, FatalSite::ExecutorRunLoop);
315        assert_eq!(entries[0].1, "run-loop boom");
316    }
317
318    #[test]
319    fn guard_or_fatal_non_string_payload_uses_fallback_cause() {
320        let (rec, dispatch) = recording_dispatch();
321        let out: Option<()> = guard_or_fatal(&dispatch, FatalSite::InlineSubmit, || {
322            std::panic::panic_any(42_u32)
323        });
324        assert!(out.is_none());
325        let entries = rec.lock().unwrap().clone();
326        assert_eq!(entries.len(), 1);
327        assert_eq!(entries[0].1, "framework panic");
328    }
329
330    #[test]
331    fn fire_default_noop_handler_reaches_terminal() {
332        let (log, terminal) = recording_terminal();
333
334        // No-op handler — same as what ExecutorBuilder::build produces when
335        // on_fatal is not called.
336        let noop: FatalHandler = Arc::new(|_ctx| {});
337
338        let dispatch = FatalDispatch::with_terminal(noop, terminal);
339        dispatch.fire(&FatalContext {
340            cause: "default".to_string(),
341            site: FatalSite::InlineSubmit,
342        });
343
344        let entries = log.lock().unwrap().clone();
345        assert!(
346            entries.iter().any(|e| e.contains("terminal:default")),
347            "terminal not reached for default no-op handler; log: {entries:?}"
348        );
349    }
350}