taskvisor/events/event.rs
1//! # Runtime events emitted by the supervisor and task actors.
2//!
3//! The [`EventKind`] enum classifies event types across three categories:
4//! - **Lifecycle events**: task execution flow (starting, stopped, failed, timeout)
5//! - **Management events**: runtime task control (add/remove requests and confirmations)
6//! - **Terminal events**: actor final states (exhausted policy, dead)
7//!
8//! The [`Event`] struct carries additional metadata such as timestamps, task name,
9//! reasons, and backoff delays.
10//!
11//! ## Ordering guarantees
12//! Each event has a globally unique sequence number (`seq`) that increases monotonically.
13//! Use `seq` to restore the exact order when events are delivered out of order.
14//!
15//! ## Example
16//! ```rust
17//! use std::time::Duration;
18//! use taskvisor::{Event, EventKind};
19//!
20//! let ev = Event::new(EventKind::TaskFailed)
21//! .with_task("demo-task")
22//! .with_reason("boom")
23//! .with_attempt(3)
24//! .with_timeout(Duration::from_secs(5));
25//!
26//! assert_eq!(ev.kind, EventKind::TaskFailed);
27//! assert_eq!(ev.task.as_deref(), Some("demo-task"));
28//! assert_eq!(ev.reason.as_deref(), Some("boom"));
29//! ```
30
31use std::sync::Arc;
32use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
33use std::time::{Duration, SystemTime};
34
35/// Global sequence counter for event ordering.
36static EVENT_SEQ: AtomicU64 = AtomicU64::new(0);
37
38/// Classification of runtime events.
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum EventKind {
41 // === Subscriber events ===
42 /// Subscriber panicked during event processing.
43 ///
44 /// Sets:
45 /// - `task`: subscriber name
46 /// - `reason`: panic info/message
47 /// - `at`: wall-clock timestamp
48 /// - `seq`: global sequence
49 SubscriberPanicked,
50
51 /// Subscriber dropped an event (queue full or worker closed).
52 ///
53 /// Sets:
54 /// - `task`: subscriber name
55 /// - `reason`: reason string (e.g., "full", "closed")
56 /// - `at`: wall-clock timestamp
57 /// - `seq`: global sequence
58 SubscriberOverflow,
59
60 // === Shutdown events ===
61 /// Shutdown requested (OS signal observed).
62 ///
63 /// Sets:
64 /// - `at`: wall-clock timestamp
65 /// - `seq`: global sequence
66 ShutdownRequested,
67
68 /// All tasks stopped within configured grace period.
69 ///
70 /// Sets:
71 /// - `at`: wall-clock timestamp
72 /// - `seq`: global sequence
73 AllStoppedWithinGrace,
74
75 /// Grace period exceeded; some tasks did not stop in time.
76 ///
77 /// Sets:
78 /// - `at`: wall-clock timestamp
79 /// - `seq`: global sequence
80 GraceExceeded,
81
82 // === Task lifecycle events ===
83 /// Task is starting an attempt.
84 ///
85 /// Sets:
86 /// - `task`: task name
87 /// - `attempt`: attempt number (1-based, per actor)
88 /// - `at`: wall-clock timestamp
89 /// - `seq`: global sequence
90 TaskStarting,
91
92 /// Task has stopped (finished successfully **or** was cancelled gracefully).
93 ///
94 /// Sets:
95 /// - `task`: task name
96 /// - `at`: wall-clock timestamp
97 /// - `seq`: global sequence
98 TaskStopped,
99
100 /// Task failed with a (non-fatal) error for this attempt.
101 ///
102 /// Sets:
103 /// - `task`: task name
104 /// - `attempt`: attempt number
105 /// - `reason`: failure message
106 /// - `at`: wall-clock timestamp
107 /// - `seq`: global sequence
108 TaskFailed,
109
110 /// Task exceeded its configured timeout for this attempt.
111 ///
112 /// Sets:
113 /// - `task`: task name
114 /// - `attempt`: attempt number
115 /// - `timeout_ms`: configured attempt timeout (ms)
116 /// - `at`: wall-clock timestamp
117 /// - `seq`: global sequence
118 TimeoutHit,
119
120 /// Next attempt scheduled (after success or failure).
121 ///
122 /// Sets:
123 /// - `task`: task name
124 /// - `attempt`: previous attempt number
125 /// - `delay_ms`: delay before the next attempt (ms)
126 /// - `backoff_source`: `Success` or `Failure`
127 /// - `reason`: last failure message (only for failure-driven backoff)
128 /// - `at`: wall-clock timestamp
129 /// - `seq`: global sequence
130 BackoffScheduled,
131
132 // === Runtime task management events ===
133 /// Request to add a new task to the supervisor.
134 ///
135 /// Sets:
136 /// - `task`: logical task name
137 /// - `spec` (private): task spec to spawn
138 /// - `at`: wall-clock timestamp
139 /// - `seq`: global sequence
140 TaskAddRequested,
141
142 /// Task was successfully added (actor spawned and registered).
143 ///
144 /// Sets:
145 /// - `task`: task name
146 /// - `at`: wall-clock timestamp
147 /// - `seq`: global sequence
148 TaskAdded,
149
150 /// Request to remove a task from the supervisor.
151 ///
152 /// Sets:
153 /// - `task`: task name
154 /// - `at`: wall-clock timestamp
155 /// - `seq`: global sequence
156 TaskRemoveRequested,
157
158 /// Task was removed from the supervisor (after join/cleanup).
159 ///
160 /// Sets:
161 /// - `task`: task name
162 /// - `at`: wall-clock timestamp
163 /// - `seq`: global sequence
164 TaskRemoved,
165
166 // === Actor terminal states ===
167 /// Actor exhausted its restart policy and will not restart.
168 ///
169 /// Emitted when:
170 /// - `RestartPolicy::Never` → task completed (success or handled case)
171 /// - `RestartPolicy::OnFailure` → task completed successfully
172 ///
173 /// Sets:
174 /// - `task`: task name
175 /// - `attempt`: last attempt number
176 /// - `reason`: optional message
177 /// - `at`: wall-clock timestamp
178 /// - `seq`: global sequence
179 ActorExhausted,
180
181 /// Actor terminated permanently due to a fatal error.
182 ///
183 /// Emitted when:
184 /// - Task returned `TaskError::Fatal`
185 /// - (Future) max retries exceeded
186 ///
187 /// Sets:
188 /// - `task`: task name
189 /// - `attempt`: last attempt number
190 /// - `reason`: fatal error message
191 /// - `at`: wall-clock timestamp
192 /// - `seq`: global sequence
193 ActorDead,
194
195 #[cfg(feature = "controller")]
196 /// Controller submission rejected (queue full, add failed, etc).
197 ///
198 /// Sets:
199 /// - `task`: slot name
200 /// - `reason`: rejection reason ("queue_full", "add_failed: ...", etc)
201 ControllerRejected,
202
203 #[cfg(feature = "controller")]
204 /// Task submitted successfully to controller slot.
205 ///
206 /// Sets:
207 /// - `task`: slot name
208 /// - `reason`: "admission={admission} status={status} depth={N}"
209 ControllerSubmitted,
210
211 #[cfg(feature = "controller")]
212 /// Slot transitioned state (Running → Terminating, etc).
213 ///
214 /// Sets:
215 /// - `task`: slot name
216 /// - `reason`: "running→terminating" (Replace), "terminating→idle", etc
217 ControllerSlotTransition,
218}
219
220/// Reason for schedule the next run/backoff.
221#[derive(Debug, Clone, Copy, PartialEq, Eq)]
222pub enum BackoffSource {
223 Success,
224 Failure,
225}
226
227/// Runtime event with optional metadata.
228///
229/// - `seq`: monotonic global sequence for ordering
230/// - `at`: wall-clock timestamp (for logs)
231/// - other optional fields are set depending on the [`EventKind`]
232#[derive(Clone)]
233pub struct Event {
234 /// Globally unique, monotonically increasing sequence number.
235 pub seq: u64,
236 /// Wall-clock timestamp.
237 pub at: SystemTime,
238
239 /// Task timeout in milliseconds (compact).
240 pub timeout_ms: Option<u32>,
241 /// Backoff delay before next attempt in milliseconds (compact).
242 pub delay_ms: Option<u32>,
243 /// Human-readable reason (errors, overflow details, etc.).
244 pub reason: Option<Arc<str>>,
245 /// Attempt count (starting from 1).
246 pub attempt: Option<u32>,
247 /// Name of the task, if applicable.
248 pub task: Option<Arc<str>>,
249 /// Event classification.
250 pub kind: EventKind,
251 /// Source for backoff scheduling (success vs failure).
252 pub backoff_source: Option<BackoffSource>,
253
254 /// Internal task specification (used only for TaskAddRequested).
255 pub(crate) spec: Option<crate::tasks::TaskSpec>,
256}
257
258impl Event {
259 /// Creates a new event of the given kind with current timestamp and next sequence number.
260 pub fn new(kind: EventKind) -> Self {
261 Self {
262 seq: EVENT_SEQ.fetch_add(1, AtomicOrdering::Release),
263 kind,
264 at: SystemTime::now(),
265 attempt: None,
266 timeout_ms: None,
267 reason: None,
268 delay_ms: None,
269 backoff_source: None,
270 task: None,
271 spec: None,
272 }
273 }
274
275 /// Attaches a human-readable reason.
276 #[inline]
277 pub fn with_reason(mut self, reason: impl Into<Arc<str>>) -> Self {
278 self.reason = Some(reason.into());
279 self
280 }
281
282 /// Attaches a task name.
283 #[inline]
284 pub fn with_task(mut self, task: impl Into<Arc<str>>) -> Self {
285 self.task = Some(task.into());
286 self
287 }
288
289 /// Attaches a timeout duration (stored as milliseconds).
290 #[inline]
291 pub fn with_timeout(mut self, d: Duration) -> Self {
292 let ms = d.as_millis().min(u128::from(u32::MAX)) as u32;
293 self.timeout_ms = Some(ms);
294 self
295 }
296
297 /// Attaches a backoff delay (stored as milliseconds).
298 #[inline]
299 pub fn with_delay(mut self, d: Duration) -> Self {
300 let ms = d.as_millis().min(u128::from(u32::MAX)) as u32;
301 self.delay_ms = Some(ms);
302 self
303 }
304
305 /// Attaches an attempt count.
306 #[inline]
307 pub fn with_attempt(mut self, n: u32) -> Self {
308 self.attempt = Some(n);
309 self
310 }
311
312 /// Marks that this backoff comes from a successful attempt.
313 #[inline]
314 pub fn with_backoff_success(mut self) -> Self {
315 self.backoff_source = Some(BackoffSource::Success);
316 self
317 }
318
319 /// Marks that this backoff comes from a failed attempt.
320 #[inline]
321 pub fn with_backoff_failure(mut self) -> Self {
322 self.backoff_source = Some(BackoffSource::Failure);
323 self
324 }
325
326 /// Creates a subscriber overflow event.
327 #[inline]
328 pub fn subscriber_overflow(subscriber: &'static str, reason: &'static str) -> Self {
329 Event::new(EventKind::SubscriberOverflow)
330 .with_task(subscriber)
331 .with_reason(format!("subscriber={subscriber} reason={reason}"))
332 }
333
334 /// Creates a subscriber panic event.
335 #[inline]
336 pub fn subscriber_panicked(subscriber: &'static str, info: String) -> Self {
337 Event::new(EventKind::SubscriberPanicked)
338 .with_task(subscriber)
339 .with_reason(info)
340 }
341
342 #[inline]
343 pub fn is_subscriber_overflow(&self) -> bool {
344 matches!(self.kind, EventKind::SubscriberOverflow)
345 }
346
347 #[inline]
348 pub fn is_subscriber_panic(&self) -> bool {
349 matches!(self.kind, EventKind::SubscriberPanicked)
350 }
351
352 #[inline]
353 pub(crate) fn with_spec(mut self, spec: crate::tasks::TaskSpec) -> Self {
354 self.spec = Some(spec);
355 self
356 }
357}