taktora_executor/observer.rs
1//! `Observer` trait — lifecycle hooks invoked by the executor.
2
3use crate::error::ExecutorError;
4use crate::fault::{ExecutorFaultReason, FaultReason};
5use crate::stats::CycleObservation;
6use crate::task_id::TaskId;
7
8/// Generic user event carried by [`Observer::on_send_event`].
9///
10/// # Construction
11///
12/// Use [`UserEvent::new`] to create a value; struct literal syntax is not
13/// available from outside this crate because `UserEvent` is `#[non_exhaustive]`.
14#[derive(Clone, Debug, Default)]
15#[non_exhaustive]
16pub struct UserEvent {
17 /// User-defined event kind.
18 pub kind: u32,
19 /// Numeric payload.
20 pub int_data: i64,
21 /// Optional string payload.
22 pub string_data: Option<String>,
23}
24
25impl UserEvent {
26 /// Create a new event with the given `kind` and `int_data`.
27 #[must_use]
28 pub const fn new(kind: u32, int_data: i64) -> Self {
29 Self {
30 kind,
31 int_data,
32 string_data: None,
33 }
34 }
35
36 /// Attach an optional string payload to this event.
37 #[must_use]
38 pub fn with_string(mut self, s: impl Into<String>) -> Self {
39 self.string_data = Some(s.into());
40 self
41 }
42}
43
44/// Lifecycle observer invoked by the executor at well-defined points.
45///
46/// All methods have no-op defaults. The executor never blocks on observer
47/// callbacks — heavy work should be queued internally.
48pub trait Observer: Send + Sync {
49 /// Called once just before the dispatch loop begins.
50 fn on_executor_up(&self) {}
51 /// Called once just after the dispatch loop finishes cleanly.
52 fn on_executor_down(&self) {}
53 /// Called when the dispatch loop returns an error.
54 fn on_executor_error(&self, _e: &ExecutorError) {}
55
56 /// Called before an item with `app_id().is_some()` runs (per invocation).
57 fn on_app_start(&self, _task: TaskId, _app: u32, _instance: Option<u32>) {}
58 /// Called after such an item runs.
59 fn on_app_stop(&self, _task: TaskId) {}
60 /// Called when an item returns `Err` or panics.
61 fn on_app_error(&self, _task: TaskId, _e: &(dyn std::error::Error + 'static)) {}
62
63 /// Called when an item invokes `Context::send_event`.
64 fn on_send_event(&self, _task: TaskId, _ev: UserEvent) {}
65
66 /// Called once when a task transitions from `Running` to `Faulted`
67 /// (per-task budget overrun, `REQ_0070`). The cascade transition
68 /// triggered by an executor-wide fault does NOT fire this hook —
69 /// see [`Observer::on_executor_fault`]. `REQ_0073`.
70 fn on_task_fault(&self, _task: TaskId, _reason: FaultReason) {}
71
72 /// Called once when a task transitions from `Faulted` back to
73 /// `Running` (manual clear via `Executor::clear_task_fault`).
74 fn on_task_clear(&self, _task: TaskId) {}
75
76 /// Called once when the executor transitions from `Running` to
77 /// `Faulted` (executor-wide iteration budget breach, `REQ_0071`).
78 fn on_executor_fault(&self, _reason: ExecutorFaultReason) {}
79
80 /// Called once when the executor transitions from `Faulted` back
81 /// to `Running` (manual clear via `Executor::clear_executor_fault`).
82 fn on_executor_clear(&self) {}
83
84 /// Fires once per scan cycle of a cyclic task, including a faulted scan
85 /// (`REQ_0103`, `REQ_0107`). Default no-op for backward compatibility.
86 ///
87 /// **Containment:** runs on the executor's `WaitSet` thread outside the
88 /// per-item panic catch — a panic here routes to the fail-fast boundary
89 /// (`REQ_0123`). Implementations must not panic.
90 fn on_cycle_stats(&self, _obs: &CycleObservation) {}
91}
92
93/// No-op observer used when the user does not configure one.
94pub struct NoopObserver;
95impl Observer for NoopObserver {}
96
97#[cfg(test)]
98mod cycle_stats_hook_tests {
99 use super::*;
100 use crate::TaskId;
101 use crate::stats::CycleObservation;
102 use std::sync::Arc;
103 use std::sync::atomic::{AtomicU64, Ordering};
104
105 struct CountingObs(Arc<AtomicU64>);
106 impl Observer for CountingObs {
107 fn on_cycle_stats(&self, _: &CycleObservation) {
108 self.0.fetch_add(1, Ordering::Relaxed);
109 }
110 }
111
112 fn sample_obs() -> CycleObservation {
113 CycleObservation {
114 cycle_index: 0,
115 task_id: TaskId::from("t"),
116 task_index: 0,
117 faulted: false,
118 period_ns: 0,
119 pre_ns: 0,
120 actual_period_ns: None,
121 jitter_ns: None,
122 lateness_ns: None,
123 took_ns: None,
124 }
125 }
126
127 #[test]
128 fn default_on_cycle_stats_is_noop() {
129 let noop = NoopObserver;
130 noop.on_cycle_stats(&sample_obs()); // default no-op: must compile & not panic
131 }
132
133 #[test]
134 fn overridden_on_cycle_stats_fires() {
135 let n = Arc::new(AtomicU64::new(0));
136 let c = CountingObs(Arc::clone(&n));
137 c.on_cycle_stats(&sample_obs());
138 assert_eq!(n.load(Ordering::Relaxed), 1);
139 }
140}