use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
use std::time::{Duration, SystemTime};
static EVENT_SEQ: AtomicU64 = AtomicU64::new(0);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EventKind {
SubscriberPanicked,
SubscriberOverflow,
ShutdownRequested,
AllStoppedWithinGrace,
GraceExceeded,
TaskStarting,
TaskStopped,
TaskFailed,
TimeoutHit,
BackoffScheduled,
TaskAddRequested,
TaskAdded,
TaskRemoveRequested,
TaskRemoved,
ActorExhausted,
ActorDead,
#[cfg(feature = "controller")]
ControllerRejected,
#[cfg(feature = "controller")]
ControllerSubmitted,
#[cfg(feature = "controller")]
ControllerSlotTransition,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BackoffSource {
Success,
Failure,
}
#[derive(Clone)]
pub struct Event {
pub seq: u64,
pub at: SystemTime,
pub timeout_ms: Option<u32>,
pub delay_ms: Option<u32>,
pub reason: Option<Arc<str>>,
pub attempt: Option<u32>,
pub task: Option<Arc<str>>,
pub exit_code: Option<i32>,
pub kind: EventKind,
pub backoff_source: Option<BackoffSource>,
}
impl Event {
pub fn new(kind: EventKind) -> Self {
Self {
seq: EVENT_SEQ.fetch_add(1, AtomicOrdering::Release),
kind,
at: SystemTime::now(),
backoff_source: None,
timeout_ms: None,
delay_ms: None,
attempt: None,
reason: None,
task: None,
exit_code: None,
}
}
#[inline]
pub fn with_reason(mut self, reason: impl Into<Arc<str>>) -> Self {
self.reason = Some(reason.into());
self
}
#[inline]
pub fn with_task(mut self, task: impl Into<Arc<str>>) -> Self {
self.task = Some(task.into());
self
}
#[inline]
pub fn with_timeout(mut self, d: Duration) -> Self {
let ms = d.as_millis().min(u128::from(u32::MAX)) as u32;
self.timeout_ms = Some(ms);
self
}
#[inline]
pub fn with_delay(mut self, d: Duration) -> Self {
let ms = d.as_millis().min(u128::from(u32::MAX)) as u32;
self.delay_ms = Some(ms);
self
}
#[inline]
pub fn with_attempt(mut self, n: u32) -> Self {
self.attempt = Some(n);
self
}
#[inline]
pub fn with_exit_code(mut self, code: i32) -> Self {
self.exit_code = Some(code);
self
}
#[inline]
pub fn with_backoff_success(mut self) -> Self {
self.backoff_source = Some(BackoffSource::Success);
self
}
#[inline]
pub fn with_backoff_failure(mut self) -> Self {
self.backoff_source = Some(BackoffSource::Failure);
self
}
#[inline]
pub fn subscriber_overflow(subscriber: &'static str, reason: &'static str) -> Self {
Event::new(EventKind::SubscriberOverflow)
.with_task(subscriber)
.with_reason(format!("subscriber={subscriber} reason={reason}"))
}
#[inline]
pub fn subscriber_panicked(subscriber: &'static str, info: String) -> Self {
Event::new(EventKind::SubscriberPanicked)
.with_task(subscriber)
.with_reason(info)
}
#[inline]
pub fn is_subscriber_overflow(&self) -> bool {
matches!(self.kind, EventKind::SubscriberOverflow)
}
#[inline]
pub fn is_subscriber_panic(&self) -> bool {
matches!(self.kind, EventKind::SubscriberPanicked)
}
#[inline]
pub fn is_internal_diagnostic(&self) -> bool {
matches!(
self.kind,
EventKind::SubscriberOverflow | EventKind::SubscriberPanicked
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn seq_increases_monotonically() {
let a = Event::new(EventKind::TaskStarting);
let b = Event::new(EventKind::TaskStopped);
assert!(b.seq > a.seq, "seq must grow: {} vs {}", a.seq, b.seq);
}
#[test]
fn with_timeout_clamps_large_duration() {
let huge = Duration::from_millis(u64::from(u32::MAX) + 1000);
let ev = Event::new(EventKind::TimeoutHit).with_timeout(huge);
assert_eq!(ev.timeout_ms, Some(u32::MAX));
}
#[test]
fn with_delay_clamps_large_duration() {
let huge = Duration::from_millis(u64::from(u32::MAX) + 1000);
let ev = Event::new(EventKind::BackoffScheduled).with_delay(huge);
assert_eq!(ev.delay_ms, Some(u32::MAX));
}
#[test]
fn is_internal_diagnostic_covers_both_variants() {
let overflow = Event::new(EventKind::SubscriberOverflow);
let panic = Event::new(EventKind::SubscriberPanicked);
let normal = Event::new(EventKind::TaskStarting);
assert!(overflow.is_internal_diagnostic());
assert!(panic.is_internal_diagnostic());
assert!(!normal.is_internal_diagnostic());
}
#[test]
fn subscriber_overflow_factory_sets_fields() {
let ev = Event::subscriber_overflow("my-sub", "full");
assert_eq!(ev.kind, EventKind::SubscriberOverflow);
assert_eq!(ev.task.as_deref(), Some("my-sub"));
assert!(ev.reason.as_deref().unwrap().contains("subscriber=my-sub"));
assert!(ev.reason.as_deref().unwrap().contains("reason=full"));
}
#[test]
fn new_event_has_no_exit_code() {
let ev = Event::new(EventKind::TaskFailed);
assert_eq!(ev.exit_code, None);
}
#[test]
fn with_exit_code_populates_field() {
let ev = Event::new(EventKind::TaskFailed).with_exit_code(42);
assert_eq!(ev.exit_code, Some(42));
let neg = Event::new(EventKind::ActorDead).with_exit_code(-1);
assert_eq!(neg.exit_code, Some(-1));
}
}
impl std::fmt::Debug for Event {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut d = f.debug_struct("Event");
d.field("seq", &self.seq);
d.field("kind", &self.kind);
if let Some(ref task) = self.task {
d.field("task", task);
}
if let Some(attempt) = self.attempt {
d.field("attempt", &attempt);
}
if let Some(ref reason) = self.reason {
d.field("reason", reason);
}
if let Some(timeout_ms) = self.timeout_ms {
d.field("timeout_ms", &timeout_ms);
}
if let Some(delay_ms) = self.delay_ms {
d.field("delay_ms", &delay_ms);
}
if let Some(ref src) = self.backoff_source {
d.field("backoff_source", src);
}
d.finish()
}
}