Skip to main content

taktora_executor/
fault.rs

1//! Cycle-overrun fault primitive — implements `FEAT_0018` (`REQ_0070`,
2//! `REQ_0071`, `REQ_0073`) and feeds `REQ_0102` (overrun counter).
3//!
4//! State machines are `AtomicU64`-packed so the dispatch hot path
5//! (`REQ_0060`, `REQ_0104`) can read/write them wait-free without
6//! `Mutex` or allocation.
7//!
8//! `BB_0093`.
9
10// The `FaultAtomic` / `ExecutorFaultAtomic` packed-atomic storage is
11// `pub(crate)` for use by `executor.rs` / `TaskEntry` in later tasks
12// (the cycle-overrun fault primitive lands in stages — BB_0093 ships
13// the state-machine module first; integration follows in Task 6+).
14// Until then, the storage types are unused — silence the dead-code
15// and redundant-pub-crate lints uniformly.
16#![allow(dead_code)]
17#![allow(clippy::redundant_pub_crate)]
18
19use core::sync::atomic::{AtomicU64, Ordering};
20use core::time::Duration;
21use std::time::Instant;
22
23/// Per-task fault state. Stored as packed `AtomicU64` in `TaskEntry`;
24/// the public API hands callers this snapshot view.
25#[derive(Clone, Copy, Debug, PartialEq, Eq)]
26pub enum FaultState {
27    /// Task is healthy and dispatches normally.
28    Running,
29    /// Task is faulted; main item is not dispatched until cleared.
30    Faulted {
31        /// Why the task transitioned to `Faulted`.
32        reason: FaultReason,
33        /// Approximate transition time, executor-relative milliseconds.
34        /// Resolve to `Instant` via the executor's `start_time`.
35        since_ms: u32,
36    },
37}
38
39/// Why a task transitioned to `Faulted`.
40#[derive(Clone, Copy, Debug, PartialEq, Eq)]
41pub enum FaultReason {
42    /// The task's `execute()` ran longer than its declared budget.
43    /// `took_ms` is saturated to `u32::MAX` ms (~49.7 days).
44    BudgetExceeded {
45        /// Observed execution time in ms (saturated).
46        took_ms: u32,
47        /// Declared budget in ms (saturated).
48        budget_ms: u32,
49    },
50    /// The executor entered `Faulted` while this task was `Running`.
51    /// The cascade transition is automatic; per-task observers do
52    /// not fire (only `Observer::on_executor_fault` does).
53    ExecutorFaulted,
54}
55
56/// Executor-wide fault state.
57#[derive(Clone, Copy, Debug, PartialEq, Eq)]
58pub enum ExecutorFaultState {
59    /// Executor is healthy; all tasks dispatch normally.
60    Running,
61    /// Executor-wide budget breached. All tasks halt (or route to
62    /// their handlers) until cleared.
63    Faulted {
64        /// Why the executor transitioned to `Faulted`.
65        reason: ExecutorFaultReason,
66        /// Approximate transition time, executor-relative milliseconds.
67        since_ms: u32,
68    },
69}
70
71/// Why the executor transitioned to `Faulted`.
72#[derive(Clone, Copy, Debug, PartialEq, Eq)]
73pub enum ExecutorFaultReason {
74    /// A task's `execute()` exceeded the executor-wide iteration budget.
75    /// `task_idx` is the internal task-table index; resolve to `TaskId`
76    /// via the executor's task table.
77    IterationBudgetExceeded {
78        /// Internal index of the offending task in the executor's task vec.
79        task_idx: u32,
80        /// Observed execution time in ms (saturated).
81        took_ms: u32,
82        /// Declared executor-wide budget in ms (saturated).
83        budget_ms: u32,
84    },
85}
86
87/// Wait-free packed storage for `FaultState`.
88///
89/// Layout (bit positions, MSB to LSB):
90///   63..62 (2 bits)  — discriminant:
91///       0 = Running
92///       1 = Faulted{BudgetExceeded}
93///       2 = Faulted{ExecutorFaulted}
94///       3 = reserved
95///   61..32 (30 bits) — `took_ms` (saturated; 0 for `ExecutorFaulted`)
96///   31..0  (32 bits) — `since_ms`
97///
98/// `budget_ms` is stored separately (it's a static property of the task,
99/// not a runtime measurement), so it's reconstructed by the `unpack`
100/// caller from the task's stored `budget`.
101#[derive(Debug, Default)]
102pub(crate) struct FaultAtomic(AtomicU64);
103
104impl FaultAtomic {
105    /// Construct a fresh `FaultAtomic` in the `Running` state.
106    pub(crate) const fn new() -> Self {
107        Self(AtomicU64::new(0))
108    }
109
110    /// Pack `state` into a u64.
111    #[allow(clippy::cast_possible_truncation)]
112    pub(crate) fn pack(state: FaultState) -> u64 {
113        match state {
114            FaultState::Running => 0,
115            FaultState::Faulted { reason, since_ms } => {
116                let (disc, took_ms) = match reason {
117                    FaultReason::BudgetExceeded { took_ms, .. } => (1_u64, took_ms),
118                    FaultReason::ExecutorFaulted => (2_u64, 0_u32),
119                };
120                (disc << 62) | ((u64::from(took_ms) & 0x3FFF_FFFF) << 32) | u64::from(since_ms)
121            }
122        }
123    }
124
125    /// Recover `FaultState` from a packed u64 + the task's stored budget.
126    #[allow(clippy::cast_possible_truncation)]
127    pub(crate) const fn unpack(packed: u64, budget_ms: u32) -> FaultState {
128        let disc = (packed >> 62) & 0x3;
129        let took_ms = ((packed >> 32) & 0x3FFF_FFFF) as u32;
130        let since_ms = (packed & 0xFFFF_FFFF) as u32;
131        match disc {
132            1 => FaultState::Faulted {
133                reason: FaultReason::BudgetExceeded { took_ms, budget_ms },
134                since_ms,
135            },
136            2 => FaultState::Faulted {
137                reason: FaultReason::ExecutorFaulted,
138                since_ms,
139            },
140            // 0 = Running; 3 = reserved, treat as Running
141            _ => FaultState::Running,
142        }
143    }
144
145    /// Load the current state.
146    pub(crate) fn load(&self, budget_ms: u32) -> FaultState {
147        Self::unpack(self.0.load(Ordering::Acquire), budget_ms)
148    }
149
150    /// Store `state` and return the previous state. Callers use the
151    /// returned value to detect "first transition" (Observer callback
152    /// fires) vs "redundant store" (no callback).
153    pub(crate) fn swap(&self, state: FaultState, budget_ms: u32) -> FaultState {
154        Self::unpack(self.0.swap(Self::pack(state), Ordering::AcqRel), budget_ms)
155    }
156}
157
158/// Wait-free packed storage for `ExecutorFaultState`. Same shape as
159/// `FaultAtomic`; the offending task is stored as a u32 index in
160/// adjacent atomics (see `Executor` storage in Task 6).
161///
162/// Layout:
163///   63..62 (2 bits)  — discriminant:
164///       0 = Running
165///       1 = Faulted{IterationBudgetExceeded}
166///       2..3 = reserved
167///   61..32 (30 bits) — `took_ms` (saturated)
168///   31..0  (32 bits) — `since_ms`
169#[derive(Debug, Default)]
170pub(crate) struct ExecutorFaultAtomic(AtomicU64);
171
172impl ExecutorFaultAtomic {
173    /// Construct a fresh `ExecutorFaultAtomic` in the `Running` state.
174    pub(crate) const fn new() -> Self {
175        Self(AtomicU64::new(0))
176    }
177
178    /// Pack `state` into a u64.
179    #[allow(clippy::cast_possible_truncation)]
180    pub(crate) fn pack(state: ExecutorFaultState) -> u64 {
181        match state {
182            ExecutorFaultState::Running => 0,
183            ExecutorFaultState::Faulted { reason, since_ms } => {
184                let took_ms = match reason {
185                    ExecutorFaultReason::IterationBudgetExceeded { took_ms, .. } => took_ms,
186                };
187                (1_u64 << 62) | ((u64::from(took_ms) & 0x3FFF_FFFF) << 32) | u64::from(since_ms)
188            }
189        }
190    }
191
192    /// Recover `ExecutorFaultState`. `task_idx` and `budget_ms` are
193    /// supplied externally — they live in adjacent atomics next to
194    /// this one on the `Executor` (see Task 6).
195    #[allow(clippy::cast_possible_truncation)]
196    pub(crate) const fn unpack(packed: u64, task_idx: u32, budget_ms: u32) -> ExecutorFaultState {
197        let disc = (packed >> 62) & 0x3;
198        let took_ms = ((packed >> 32) & 0x3FFF_FFFF) as u32;
199        let since_ms = (packed & 0xFFFF_FFFF) as u32;
200        match disc {
201            1 => ExecutorFaultState::Faulted {
202                reason: ExecutorFaultReason::IterationBudgetExceeded {
203                    task_idx,
204                    took_ms,
205                    budget_ms,
206                },
207                since_ms,
208            },
209            // 0 = Running; 2..=3 = reserved, treat as Running
210            _ => ExecutorFaultState::Running,
211        }
212    }
213
214    /// Load the current state.
215    pub(crate) fn load(&self, task_idx: u32, budget_ms: u32) -> ExecutorFaultState {
216        Self::unpack(self.0.load(Ordering::Acquire), task_idx, budget_ms)
217    }
218
219    /// Store `state` and return the previous state.
220    pub(crate) fn swap(
221        &self,
222        state: ExecutorFaultState,
223        task_idx: u32,
224        budget_ms: u32,
225    ) -> ExecutorFaultState {
226        Self::unpack(
227            self.0.swap(Self::pack(state), Ordering::AcqRel),
228            task_idx,
229            budget_ms,
230        )
231    }
232}
233
234/// Helper: convert a `Duration` to ms, saturated to `u32::MAX`.
235#[allow(clippy::cast_possible_truncation)]
236pub(crate) fn duration_to_ms_sat(d: Duration) -> u32 {
237    let ms = d.as_millis();
238    if ms > u128::from(u32::MAX) {
239        u32::MAX
240    } else {
241        ms as u32
242    }
243}
244
245/// Helper: convert an executor-relative `Instant` to ms since start,
246/// saturated to `u32::MAX`.
247pub(crate) fn instant_to_since_ms(at: Instant, start: Instant) -> u32 {
248    duration_to_ms_sat(at.saturating_duration_since(start))
249}
250
251#[cfg(test)]
252mod tests {
253    use super::*;
254
255    #[test]
256    fn pack_unpack_running() {
257        let s = FaultState::Running;
258        let p = FaultAtomic::pack(s);
259        assert_eq!(p, 0);
260        assert_eq!(FaultAtomic::unpack(p, 100), FaultState::Running);
261    }
262
263    #[test]
264    fn pack_unpack_budget_exceeded_round_trip() {
265        let s = FaultState::Faulted {
266            reason: FaultReason::BudgetExceeded {
267                took_ms: 17,
268                budget_ms: 10,
269            },
270            since_ms: 1_234,
271        };
272        let p = FaultAtomic::pack(s);
273        // budget_ms is provided externally on unpack; pack stored took_ms only
274        let restored = FaultAtomic::unpack(p, 10);
275        assert_eq!(restored, s);
276    }
277
278    #[test]
279    fn pack_unpack_executor_faulted_round_trip() {
280        let s = FaultState::Faulted {
281            reason: FaultReason::ExecutorFaulted,
282            since_ms: u32::MAX,
283        };
284        let p = FaultAtomic::pack(s);
285        // budget_ms is unused for this discriminant
286        let restored = FaultAtomic::unpack(p, 999);
287        assert_eq!(restored, s);
288    }
289
290    #[test]
291    fn pack_unpack_saturates_took_ms() {
292        let s = FaultState::Faulted {
293            reason: FaultReason::BudgetExceeded {
294                took_ms: 0x3FFF_FFFF,
295                budget_ms: 5,
296            },
297            since_ms: 42,
298        };
299        let p = FaultAtomic::pack(s);
300        let restored = FaultAtomic::unpack(p, 5);
301        assert_eq!(restored, s);
302    }
303
304    #[test]
305    fn fault_atomic_swap_returns_previous() {
306        let fa = FaultAtomic::new();
307        assert_eq!(fa.load(0), FaultState::Running);
308        let prev = fa.swap(
309            FaultState::Faulted {
310                reason: FaultReason::BudgetExceeded {
311                    took_ms: 5,
312                    budget_ms: 3,
313                },
314                since_ms: 100,
315            },
316            3,
317        );
318        assert_eq!(prev, FaultState::Running);
319        let prev = fa.swap(FaultState::Running, 3);
320        assert!(matches!(
321            prev,
322            FaultState::Faulted {
323                reason: FaultReason::BudgetExceeded { .. },
324                ..
325            }
326        ));
327    }
328
329    #[test]
330    fn executor_fault_atomic_swap_returns_previous() {
331        let efa = ExecutorFaultAtomic::new();
332        assert_eq!(efa.load(0, 0), ExecutorFaultState::Running);
333        let prev = efa.swap(
334            ExecutorFaultState::Faulted {
335                reason: ExecutorFaultReason::IterationBudgetExceeded {
336                    task_idx: 3,
337                    took_ms: 20,
338                    budget_ms: 10,
339                },
340                since_ms: 50,
341            },
342            3,
343            10,
344        );
345        assert_eq!(prev, ExecutorFaultState::Running);
346    }
347
348    #[test]
349    fn duration_helpers_saturate() {
350        assert_eq!(duration_to_ms_sat(Duration::from_millis(5)), 5);
351        let huge = Duration::from_secs(60 * 60 * 24 * 365 * 100); // 100 years
352        assert_eq!(duration_to_ms_sat(huge), u32::MAX);
353    }
354}