Skip to main content

taktora_executor/
observer.rs

1//! `Observer` trait — lifecycle hooks invoked by the executor.
2
3use crate::error::ExecutorError;
4use crate::fault::{ExecutorFaultReason, FaultReason};
5use crate::stats::CycleObservation;
6use crate::task_id::TaskId;
7
8/// Generic user event carried by [`Observer::on_send_event`].
9///
10/// # Construction
11///
12/// Use [`UserEvent::new`] to create a value; struct literal syntax is not
13/// available from outside this crate because `UserEvent` is `#[non_exhaustive]`.
14#[derive(Clone, Debug, Default)]
15#[non_exhaustive]
16pub struct UserEvent {
17    /// User-defined event kind.
18    pub kind: u32,
19    /// Numeric payload.
20    pub int_data: i64,
21    /// Optional string payload.
22    pub string_data: Option<String>,
23}
24
25impl UserEvent {
26    /// Create a new event with the given `kind` and `int_data`.
27    #[must_use]
28    pub const fn new(kind: u32, int_data: i64) -> Self {
29        Self {
30            kind,
31            int_data,
32            string_data: None,
33        }
34    }
35
36    /// Attach an optional string payload to this event.
37    #[must_use]
38    pub fn with_string(mut self, s: impl Into<String>) -> Self {
39        self.string_data = Some(s.into());
40        self
41    }
42}
43
44/// Lifecycle observer invoked by the executor at well-defined points.
45///
46/// All methods have no-op defaults. The executor never blocks on observer
47/// callbacks — heavy work should be queued internally.
48pub trait Observer: Send + Sync {
49    /// Called once just before the dispatch loop begins.
50    fn on_executor_up(&self) {}
51    /// Called once just after the dispatch loop finishes cleanly.
52    fn on_executor_down(&self) {}
53    /// Called when the dispatch loop returns an error.
54    fn on_executor_error(&self, _e: &ExecutorError) {}
55
56    /// Called before an item with `app_id().is_some()` runs (per invocation).
57    fn on_app_start(&self, _task: TaskId, _app: u32, _instance: Option<u32>) {}
58    /// Called after such an item runs.
59    fn on_app_stop(&self, _task: TaskId) {}
60    /// Called when an item returns `Err` or panics.
61    fn on_app_error(&self, _task: TaskId, _e: &(dyn std::error::Error + 'static)) {}
62
63    /// Called when an item invokes `Context::send_event`.
64    fn on_send_event(&self, _task: TaskId, _ev: UserEvent) {}
65
66    /// Called once when a task transitions from `Running` to `Faulted`
67    /// (per-task budget overrun, `REQ_0070`). The cascade transition
68    /// triggered by an executor-wide fault does NOT fire this hook —
69    /// see [`Observer::on_executor_fault`]. `REQ_0073`.
70    fn on_task_fault(&self, _task: TaskId, _reason: FaultReason) {}
71
72    /// Called once when a task transitions from `Faulted` back to
73    /// `Running` (manual clear via `Executor::clear_task_fault`).
74    fn on_task_clear(&self, _task: TaskId) {}
75
76    /// Called once when the executor transitions from `Running` to
77    /// `Faulted` (executor-wide iteration budget breach, `REQ_0071`).
78    fn on_executor_fault(&self, _reason: ExecutorFaultReason) {}
79
80    /// Called once when the executor transitions from `Faulted` back
81    /// to `Running` (manual clear via `Executor::clear_executor_fault`).
82    fn on_executor_clear(&self) {}
83
84    /// Fires once per scan cycle of a cyclic task, including a faulted scan
85    /// (`REQ_0103`, `REQ_0107`). Default no-op for backward compatibility.
86    ///
87    /// **Containment:** runs on the executor's `WaitSet` thread outside the
88    /// per-item panic catch — a panic here routes to the fail-fast boundary
89    /// (`REQ_0123`). Implementations must not panic.
90    fn on_cycle_stats(&self, _obs: &CycleObservation) {}
91}
92
93/// No-op observer used when the user does not configure one.
94pub struct NoopObserver;
95impl Observer for NoopObserver {}
96
97#[cfg(test)]
98mod cycle_stats_hook_tests {
99    use super::*;
100    use crate::TaskId;
101    use crate::stats::CycleObservation;
102    use std::sync::Arc;
103    use std::sync::atomic::{AtomicU64, Ordering};
104
105    struct CountingObs(Arc<AtomicU64>);
106    impl Observer for CountingObs {
107        fn on_cycle_stats(&self, _: &CycleObservation) {
108            self.0.fetch_add(1, Ordering::Relaxed);
109        }
110    }
111
112    fn sample_obs() -> CycleObservation {
113        CycleObservation {
114            cycle_index: 0,
115            task_id: TaskId::from("t"),
116            task_index: 0,
117            faulted: false,
118            period_ns: 0,
119            pre_ns: 0,
120            actual_period_ns: None,
121            jitter_ns: None,
122            lateness_ns: None,
123            took_ns: None,
124        }
125    }
126
127    #[test]
128    fn default_on_cycle_stats_is_noop() {
129        let noop = NoopObserver;
130        noop.on_cycle_stats(&sample_obs()); // default no-op: must compile & not panic
131    }
132
133    #[test]
134    fn overridden_on_cycle_stats_fires() {
135        let n = Arc::new(AtomicU64::new(0));
136        let c = CountingObs(Arc::clone(&n));
137        c.on_cycle_stats(&sample_obs());
138        assert_eq!(n.load(Ordering::Relaxed), 1);
139    }
140}