taktora_executor/
fatal.rs1#![allow(clippy::redundant_pub_crate)]
12
13use std::sync::Arc;
14
15#[non_exhaustive]
22pub struct FatalContext {
23 pub cause: String,
25 pub site: FatalSite,
27}
28
29#[non_exhaustive]
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum FatalSite {
36 PoolWorker,
38 InlineSubmit,
40 ExecutorRunLoop,
42}
43
44pub type FatalHandler = Arc<dyn Fn(&FatalContext) + Send + Sync + 'static>;
51
52pub(crate) struct FatalDispatch {
58 handler: FatalHandler,
59 terminal: Arc<dyn Fn(&FatalContext) + Send + Sync + 'static>,
60}
61
62impl FatalDispatch {
63 pub(crate) fn new(handler: FatalHandler) -> Self {
65 Self {
66 handler,
67 terminal: Arc::new(|_ctx| std::process::abort()),
68 }
69 }
70
71 #[cfg(test)]
76 pub(crate) fn with_terminal(
77 handler: FatalHandler,
78 terminal: impl Fn(&FatalContext) + Send + Sync + 'static,
79 ) -> Self {
80 Self {
81 handler,
82 terminal: Arc::new(terminal),
83 }
84 }
85
86 #[cfg(test)]
91 pub(crate) fn handler(&self) -> &FatalHandler {
92 &self.handler
93 }
94
95 pub(crate) fn fire(&self, ctx: &FatalContext) {
101 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| (self.handler)(ctx)));
106 (self.terminal)(ctx);
114 }
115}
116
117pub(crate) fn guard_or_fatal<R>(
122 fatal: &FatalDispatch,
123 site: FatalSite,
124 f: impl FnOnce() -> R,
125) -> Option<R> {
126 match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
133 Ok(r) => Some(r),
134 Err(payload) => {
135 let cause =
136 panic_payload_message(&*payload).unwrap_or_else(|| "framework panic".to_string());
137 fatal.fire(&FatalContext { cause, site });
138 None
139 }
140 }
141}
142
143pub(crate) fn panic_payload_message(payload: &(dyn core::any::Any + Send)) -> Option<String> {
152 payload
153 .downcast_ref::<&str>()
154 .map(|s| (*s).to_string())
155 .or_else(|| payload.downcast_ref::<String>().cloned())
156}
157
158#[cfg(test)]
161mod tests {
162 use super::*;
163 use std::sync::{Arc, Mutex};
164
165 #[test]
168 fn panic_payload_message_str_payload() {
169 let payload = std::panic::catch_unwind(|| panic!("static str msg")).unwrap_err();
170 assert_eq!(
171 panic_payload_message(&*payload),
172 Some("static str msg".to_string())
173 );
174 }
175
176 #[test]
177 fn panic_payload_message_string_payload() {
178 let msg = "owned string msg".to_string();
179 let payload = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| panic!("{}", msg)))
180 .unwrap_err();
181 assert_eq!(
182 panic_payload_message(&*payload),
183 Some("owned string msg".to_string())
184 );
185 }
186
187 #[test]
188 fn panic_payload_message_non_string_payload() {
189 let payload = std::panic::catch_unwind(|| std::panic::panic_any(42_u32)).unwrap_err();
190 assert_eq!(panic_payload_message(&*payload), None);
191 }
192
193 fn recording_terminal() -> (
197 Arc<Mutex<Vec<String>>>,
198 impl Fn(&FatalContext) + Send + Sync + 'static,
199 ) {
200 let log: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
201 let log2 = Arc::clone(&log);
202 let terminal = move |ctx: &FatalContext| {
203 log2.lock().unwrap().push(format!("terminal:{}", ctx.cause));
204 };
205 (log, terminal)
206 }
207
208 #[test]
209 fn fire_runs_handler_then_terminal_in_order() {
210 let order: Arc<Mutex<Vec<&'static str>>> = Arc::new(Mutex::new(Vec::new()));
211
212 let order_h = Arc::clone(&order);
213 let handler: FatalHandler = Arc::new(move |_ctx| {
214 order_h.lock().unwrap().push("handler");
215 });
216
217 let order_t = Arc::clone(&order);
218 let terminal = move |_ctx: &FatalContext| {
219 order_t.lock().unwrap().push("terminal");
220 };
221
222 let dispatch = FatalDispatch::with_terminal(handler, terminal);
223 dispatch.fire(&FatalContext {
224 cause: "boom".to_string(),
225 site: FatalSite::PoolWorker,
226 });
227
228 let log = order.lock().unwrap().clone();
229 assert_eq!(
230 log,
231 vec!["handler", "terminal"],
232 "handler must run before terminal"
233 );
234 }
235
236 #[test]
237 fn fire_handler_panic_still_reaches_terminal() {
238 let (log, terminal) = recording_terminal();
239
240 let panicking_handler: FatalHandler = Arc::new(|_ctx| panic!("handler exploded"));
241
242 let dispatch = FatalDispatch::with_terminal(panicking_handler, terminal);
243 dispatch.fire(&FatalContext {
244 cause: "cause-xyz".to_string(),
245 site: FatalSite::ExecutorRunLoop,
246 });
247
248 let entries = log.lock().unwrap().clone();
249 assert!(
251 entries.iter().any(|e| e.contains("terminal:cause-xyz")),
252 "terminal not reached after handler panic; log: {entries:?}"
253 );
254 }
255
256 type Recorder = Arc<Mutex<Vec<(FatalSite, String)>>>;
261
262 fn recording_dispatch() -> (Recorder, FatalDispatch) {
263 let rec: Recorder = Arc::new(Mutex::new(Vec::new()));
264 let rec2 = Arc::clone(&rec);
265 let handler: FatalHandler = Arc::new(|_ctx| {});
266 let dispatch = FatalDispatch::with_terminal(handler, move |ctx| {
267 rec2.lock().unwrap().push((ctx.site, ctx.cause.clone()));
268 });
269 (rec, dispatch)
270 }
271
272 #[test]
273 fn guard_or_fatal_success_returns_some_and_does_not_fire() {
274 let (rec, dispatch) = recording_dispatch();
275 let out = guard_or_fatal(&dispatch, FatalSite::ExecutorRunLoop, || 7_u32);
276 assert_eq!(out, Some(7));
277 assert!(
278 rec.lock().unwrap().is_empty(),
279 "terminal must not fire on success"
280 );
281 }
282
283 #[test]
284 fn guard_or_fatal_panic_fires_once_with_site_and_cause() {
285 let (rec, dispatch) = recording_dispatch();
286 let out: Option<()> = guard_or_fatal(&dispatch, FatalSite::PoolWorker, || {
287 panic!("synthetic infra panic")
288 });
289 assert!(
292 out.is_none(),
293 "panic path must yield None under test terminal"
294 );
295 let entries = rec.lock().unwrap().clone();
296 assert_eq!(entries.len(), 1, "fatal must fire exactly once");
297 assert_eq!(entries[0].0, FatalSite::PoolWorker);
298 assert_eq!(entries[0].1, "synthetic infra panic");
299 }
300
301 #[test]
302 fn guard_or_fatal_propagates_run_loop_site() {
303 let (rec, dispatch) = recording_dispatch();
308 let out: Option<()> = guard_or_fatal(&dispatch, FatalSite::ExecutorRunLoop, || {
309 panic!("run-loop boom")
310 });
311 assert!(out.is_none());
312 let entries = rec.lock().unwrap().clone();
313 assert_eq!(entries.len(), 1);
314 assert_eq!(entries[0].0, FatalSite::ExecutorRunLoop);
315 assert_eq!(entries[0].1, "run-loop boom");
316 }
317
318 #[test]
319 fn guard_or_fatal_non_string_payload_uses_fallback_cause() {
320 let (rec, dispatch) = recording_dispatch();
321 let out: Option<()> = guard_or_fatal(&dispatch, FatalSite::InlineSubmit, || {
322 std::panic::panic_any(42_u32)
323 });
324 assert!(out.is_none());
325 let entries = rec.lock().unwrap().clone();
326 assert_eq!(entries.len(), 1);
327 assert_eq!(entries[0].1, "framework panic");
328 }
329
330 #[test]
331 fn fire_default_noop_handler_reaches_terminal() {
332 let (log, terminal) = recording_terminal();
333
334 let noop: FatalHandler = Arc::new(|_ctx| {});
337
338 let dispatch = FatalDispatch::with_terminal(noop, terminal);
339 dispatch.fire(&FatalContext {
340 cause: "default".to_string(),
341 site: FatalSite::InlineSubmit,
342 });
343
344 let entries = log.lock().unwrap().clone();
345 assert!(
346 entries.iter().any(|e| e.contains("terminal:default")),
347 "terminal not reached for default no-op handler; log: {entries:?}"
348 );
349 }
350}