Skip to main content

taskvisor/events/
event.rs

1//! # Runtime events emitted by the supervisor and task actors.
2//!
3//! The [`EventKind`] enum classifies event types across three categories:
4//! - **Lifecycle events**: task execution flow (starting, stopped, failed, timeout)
5//! - **Management events**: runtime task control (add/remove requests and confirmations)
6//! - **Terminal events**: actor final states (exhausted policy, dead)
7//!
8//! The [`Event`] struct carries additional metadata such as timestamps, task name,
9//! reasons, and backoff delays.
10//!
11//! ## Ordering guarantees
12//! Each event has a globally unique sequence number (`seq`) that increases monotonically.
13//! Use `seq` to restore the exact order when events are delivered out of order.
14//!
15//! ## Example
16//! ```rust
17//! use std::time::Duration;
18//! use taskvisor::{Event, EventKind};
19//!
20//! let ev = Event::new(EventKind::TaskFailed)
21//!     .with_task("demo-task")
22//!     .with_reason("boom")
23//!     .with_attempt(3)
24//!     .with_timeout(Duration::from_secs(5));
25//!
26//! assert_eq!(ev.kind, EventKind::TaskFailed);
27//! assert_eq!(ev.task.as_deref(), Some("demo-task"));
28//! assert_eq!(ev.reason.as_deref(), Some("boom"));
29//! ```
30
31use std::sync::Arc;
32use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
33use std::time::{Duration, SystemTime};
34
35/// Global sequence counter for event ordering.
36static EVENT_SEQ: AtomicU64 = AtomicU64::new(0);
37
38/// Classification of runtime events.
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum EventKind {
41    // === Subscriber events ===
42    /// Subscriber panicked during event processing.
43    ///
44    /// Sets:
45    /// - `task`: subscriber name
46    /// - `reason`: panic info/message
47    /// - `at`: wall-clock timestamp
48    /// - `seq`: global sequence
49    SubscriberPanicked,
50
51    /// Subscriber dropped an event (queue full or worker closed).
52    ///
53    /// Sets:
54    /// - `task`: subscriber name
55    /// - `reason`: reason string (e.g., "full", "closed")
56    /// - `at`: wall-clock timestamp
57    /// - `seq`: global sequence
58    SubscriberOverflow,
59
60    // === Shutdown events ===
61    /// Shutdown requested (OS signal observed).
62    ///
63    /// Sets:
64    /// - `at`: wall-clock timestamp
65    /// - `seq`: global sequence
66    ShutdownRequested,
67
68    /// All tasks stopped within configured grace period.
69    ///
70    /// Sets:
71    /// - `at`: wall-clock timestamp
72    /// - `seq`: global sequence
73    AllStoppedWithinGrace,
74
75    /// Grace period exceeded; some tasks did not stop in time.
76    ///
77    /// Sets:
78    /// - `at`: wall-clock timestamp
79    /// - `seq`: global sequence
80    GraceExceeded,
81
82    // === Task lifecycle events ===
83    /// Task is starting an attempt.
84    ///
85    /// Sets:
86    /// - `task`: task name
87    /// - `attempt`: attempt number (1-based, per actor)
88    /// - `at`: wall-clock timestamp
89    /// - `seq`: global sequence
90    TaskStarting,
91
92    /// Task has stopped (finished successfully **or** was cancelled gracefully).
93    ///
94    /// Sets:
95    /// - `task`: task name
96    /// - `at`: wall-clock timestamp
97    /// - `seq`: global sequence
98    TaskStopped,
99
100    /// Task failed with a (non-fatal) error for this attempt.
101    ///
102    /// Sets:
103    /// - `task`: task name
104    /// - `attempt`: attempt number
105    /// - `reason`: failure message
106    /// - `at`: wall-clock timestamp
107    /// - `seq`: global sequence
108    TaskFailed,
109
110    /// Task exceeded its configured timeout for this attempt.
111    ///
112    /// Sets:
113    /// - `task`: task name
114    /// - `attempt`: attempt number
115    /// - `timeout_ms`: configured attempt timeout (ms)
116    /// - `at`: wall-clock timestamp
117    /// - `seq`: global sequence
118    TimeoutHit,
119
120    /// Next attempt scheduled (after success or failure).
121    ///
122    /// Sets:
123    /// - `task`: task name
124    /// - `attempt`: previous attempt number
125    /// - `delay_ms`: delay before the next attempt (ms)
126    /// - `backoff_source`: `Success` or `Failure`
127    /// - `reason`: last failure message (only for failure-driven backoff)
128    /// - `at`: wall-clock timestamp
129    /// - `seq`: global sequence
130    BackoffScheduled,
131
132    // === Runtime task management events ===
133    /// Request to add a new task to the supervisor.
134    ///
135    /// Sets:
136    /// - `task`: logical task name
137    /// - `spec` (private): task spec to spawn
138    /// - `at`: wall-clock timestamp
139    /// - `seq`: global sequence
140    TaskAddRequested,
141
142    /// Task was successfully added (actor spawned and registered).
143    ///
144    /// Sets:
145    /// - `task`: task name
146    /// - `at`: wall-clock timestamp
147    /// - `seq`: global sequence
148    TaskAdded,
149
150    /// Request to remove a task from the supervisor.
151    ///
152    /// Sets:
153    /// - `task`: task name
154    /// - `at`: wall-clock timestamp
155    /// - `seq`: global sequence
156    TaskRemoveRequested,
157
158    /// Task was removed from the supervisor (after join/cleanup).
159    ///
160    /// Sets:
161    /// - `task`: task name
162    /// - `at`: wall-clock timestamp
163    /// - `seq`: global sequence
164    TaskRemoved,
165
166    // === Actor terminal states ===
167    /// Actor exhausted its restart policy and will not restart.
168    ///
169    /// Emitted when:
170    /// - `RestartPolicy::Never` → task completed (success or handled case)
171    /// - `RestartPolicy::OnFailure` → task completed successfully
172    ///
173    /// Sets:
174    /// - `task`: task name
175    /// - `attempt`: last attempt number
176    /// - `reason`: optional message
177    /// - `at`: wall-clock timestamp
178    /// - `seq`: global sequence
179    ActorExhausted,
180
181    /// Actor terminated permanently due to a fatal error.
182    ///
183    /// Emitted when:
184    /// - Task returned `TaskError::Fatal`
185    /// - (Future) max retries exceeded
186    ///
187    /// Sets:
188    /// - `task`: task name
189    /// - `attempt`: last attempt number
190    /// - `reason`: fatal error message
191    /// - `at`: wall-clock timestamp
192    /// - `seq`: global sequence
193    ActorDead,
194
195    #[cfg(feature = "controller")]
196    /// Controller submission rejected (queue full, add failed, etc).
197    ///
198    /// Sets:
199    /// - `task`: slot name
200    /// - `reason`: rejection reason ("queue_full", "add_failed: ...", etc)
201    ControllerRejected,
202
203    #[cfg(feature = "controller")]
204    /// Task submitted successfully to controller slot.
205    ///
206    /// Sets:
207    /// - `task`: slot name
208    /// - `reason`: "admission={admission} status={status} depth={N}"
209    ControllerSubmitted,
210
211    #[cfg(feature = "controller")]
212    /// Slot transitioned state (Running → Terminating, etc).
213    ///
214    /// Sets:
215    /// - `task`: slot name
216    /// - `reason`: "running→terminating" (Replace), "terminating→idle", etc
217    ControllerSlotTransition,
218}
219
220/// Reason for schedule the next run/backoff.
221#[derive(Debug, Clone, Copy, PartialEq, Eq)]
222pub enum BackoffSource {
223    Success,
224    Failure,
225}
226
227/// Runtime event with optional metadata.
228///
229/// - `seq`: monotonic global sequence for ordering
230/// - `at`: wall-clock timestamp (for logs)
231/// - other optional fields are set depending on the [`EventKind`]
232#[derive(Clone)]
233pub struct Event {
234    /// Globally unique, monotonically increasing sequence number.
235    pub seq: u64,
236    /// Wall-clock timestamp.
237    pub at: SystemTime,
238
239    /// Task timeout in milliseconds (compact).
240    pub timeout_ms: Option<u32>,
241    /// Backoff delay before next attempt in milliseconds (compact).
242    pub delay_ms: Option<u32>,
243    /// Human-readable reason (errors, overflow details, etc.).
244    pub reason: Option<Arc<str>>,
245    /// Attempt count (starting from 1).
246    pub attempt: Option<u32>,
247    /// Name of the task, if applicable.
248    pub task: Option<Arc<str>>,
249    /// Event classification.
250    pub kind: EventKind,
251    /// Source for backoff scheduling (success vs failure).
252    pub backoff_source: Option<BackoffSource>,
253
254    /// Internal task specification (used only for TaskAddRequested).
255    pub(crate) spec: Option<crate::tasks::TaskSpec>,
256}
257
258impl Event {
259    /// Creates a new event of the given kind with current timestamp and next sequence number.
260    pub fn new(kind: EventKind) -> Self {
261        Self {
262            seq: EVENT_SEQ.fetch_add(1, AtomicOrdering::Release),
263            kind,
264            at: SystemTime::now(),
265            attempt: None,
266            timeout_ms: None,
267            reason: None,
268            delay_ms: None,
269            backoff_source: None,
270            task: None,
271            spec: None,
272        }
273    }
274
275    /// Attaches a human-readable reason.
276    #[inline]
277    pub fn with_reason(mut self, reason: impl Into<Arc<str>>) -> Self {
278        self.reason = Some(reason.into());
279        self
280    }
281
282    /// Attaches a task name.
283    #[inline]
284    pub fn with_task(mut self, task: impl Into<Arc<str>>) -> Self {
285        self.task = Some(task.into());
286        self
287    }
288
289    /// Attaches a timeout duration (stored as milliseconds).
290    #[inline]
291    pub fn with_timeout(mut self, d: Duration) -> Self {
292        let ms = d.as_millis().min(u128::from(u32::MAX)) as u32;
293        self.timeout_ms = Some(ms);
294        self
295    }
296
297    /// Attaches a backoff delay (stored as milliseconds).
298    #[inline]
299    pub fn with_delay(mut self, d: Duration) -> Self {
300        let ms = d.as_millis().min(u128::from(u32::MAX)) as u32;
301        self.delay_ms = Some(ms);
302        self
303    }
304
305    /// Attaches an attempt count.
306    #[inline]
307    pub fn with_attempt(mut self, n: u32) -> Self {
308        self.attempt = Some(n);
309        self
310    }
311
312    /// Marks that this backoff comes from a successful attempt.
313    #[inline]
314    pub fn with_backoff_success(mut self) -> Self {
315        self.backoff_source = Some(BackoffSource::Success);
316        self
317    }
318
319    /// Marks that this backoff comes from a failed attempt.
320    #[inline]
321    pub fn with_backoff_failure(mut self) -> Self {
322        self.backoff_source = Some(BackoffSource::Failure);
323        self
324    }
325
326    /// Creates a subscriber overflow event.
327    #[inline]
328    pub fn subscriber_overflow(subscriber: &'static str, reason: &'static str) -> Self {
329        Event::new(EventKind::SubscriberOverflow)
330            .with_task(subscriber)
331            .with_reason(format!("subscriber={subscriber} reason={reason}"))
332    }
333
334    /// Creates a subscriber panic event.
335    #[inline]
336    pub fn subscriber_panicked(subscriber: &'static str, info: String) -> Self {
337        Event::new(EventKind::SubscriberPanicked)
338            .with_task(subscriber)
339            .with_reason(info)
340    }
341
342    #[inline]
343    pub fn is_subscriber_overflow(&self) -> bool {
344        matches!(self.kind, EventKind::SubscriberOverflow)
345    }
346
347    #[inline]
348    pub fn is_subscriber_panic(&self) -> bool {
349        matches!(self.kind, EventKind::SubscriberPanicked)
350    }
351
352    #[inline]
353    pub(crate) fn with_spec(mut self, spec: crate::tasks::TaskSpec) -> Self {
354        self.spec = Some(spec);
355        self
356    }
357}