use std::{collections::HashMap, sync::Arc};
use tokio::sync::RwLock;
use crate::events::{Event, EventKind};
#[derive(Debug, Clone)]
struct TaskState {
last_seq: u64,
alive: bool,
}
pub(crate) struct AliveTracker {
state: RwLock<HashMap<Arc<str>, TaskState>>,
}
impl AliveTracker {
pub fn new() -> Self {
Self {
state: RwLock::new(HashMap::new()),
}
}
pub async fn update(&self, ev: &Event) -> bool {
let Some(name) = ev.task.as_deref() else {
return false;
};
let mut map = self.state.write().await;
if matches!(ev.kind, EventKind::TaskRemoved) {
return match map.get(name) {
Some(state) if ev.seq > state.last_seq => {
map.remove(name);
true
}
_ => false,
};
}
let entry = if let Some(existing) = map.get_mut(name) {
existing
} else {
map.entry(Arc::from(name)).or_insert(TaskState {
last_seq: 0,
alive: false,
})
};
if ev.seq <= entry.last_seq {
return false;
}
let next_alive = match ev.kind {
EventKind::TaskStarting => true,
EventKind::TaskStopped
| EventKind::TaskFailed
| EventKind::ActorExhausted
| EventKind::ActorDead => false,
_ => entry.alive,
};
let changed = next_alive != entry.alive;
entry.alive = next_alive;
entry.last_seq = ev.seq;
changed
}
pub async fn snapshot(&self) -> Vec<Arc<str>> {
let state = self.state.read().await;
let mut alive: Vec<Arc<str>> = state
.iter()
.filter(|(_, ts)| ts.alive)
.map(|(name, _)| name.clone())
.collect();
alive.sort_unstable();
alive
}
pub async fn is_alive(&self, name: &str) -> bool {
self.state
.read()
.await
.get(name)
.map(|ts| ts.alive)
.unwrap_or(false)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn ev(kind: EventKind, task: &str, seq: u64) -> Event {
let mut e = Event::new(kind).with_task(task);
e.seq = seq;
e
}
#[tokio::test]
async fn task_starting_sets_alive() {
let tracker = AliveTracker::new();
let changed = tracker.update(&ev(EventKind::TaskStarting, "t1", 1)).await;
assert!(
changed,
"first TaskStarting should change alive from false to true"
);
assert!(tracker.is_alive("t1").await);
}
#[tokio::test]
async fn task_stopped_sets_dead() {
let tracker = AliveTracker::new();
tracker.update(&ev(EventKind::TaskStarting, "t1", 1)).await;
let changed = tracker.update(&ev(EventKind::TaskStopped, "t1", 2)).await;
assert!(changed);
assert!(!tracker.is_alive("t1").await);
}
#[tokio::test]
async fn stale_event_rejected() {
let tracker = AliveTracker::new();
tracker.update(&ev(EventKind::TaskStopped, "t1", 100)).await;
let changed = tracker.update(&ev(EventKind::TaskStarting, "t1", 99)).await;
assert!(!changed, "stale event must not change state");
assert!(
!tracker.is_alive("t1").await,
"task must remain dead after stale event"
);
}
#[tokio::test]
async fn equal_seq_rejected() {
let tracker = AliveTracker::new();
tracker.update(&ev(EventKind::TaskStopped, "t1", 50)).await;
let changed = tracker.update(&ev(EventKind::TaskStarting, "t1", 50)).await;
assert!(!changed);
assert!(!tracker.is_alive("t1").await);
}
#[tokio::test]
async fn task_removed_deletes_entry() {
let tracker = AliveTracker::new();
tracker.update(&ev(EventKind::TaskStarting, "t1", 1)).await;
assert!(tracker.is_alive("t1").await);
let changed = tracker.update(&ev(EventKind::TaskRemoved, "t1", 2)).await;
assert!(changed, "TaskRemoved should report change");
assert!(!tracker.is_alive("t1").await);
tracker.update(&ev(EventKind::TaskStarting, "t1", 3)).await;
assert!(tracker.is_alive("t1").await, "fresh entry after removal");
}
#[tokio::test]
async fn stale_task_removed_ignored() {
let tracker = AliveTracker::new();
tracker.update(&ev(EventKind::TaskStarting, "t1", 10)).await;
let changed = tracker.update(&ev(EventKind::TaskRemoved, "t1", 5)).await;
assert!(!changed);
assert!(tracker.is_alive("t1").await, "task must remain alive");
}
#[tokio::test]
async fn task_removed_for_unknown_task() {
let tracker = AliveTracker::new();
let changed = tracker
.update(&ev(EventKind::TaskRemoved, "ghost", 1))
.await;
assert!(!changed, "removing unknown task is a no-op");
}
#[tokio::test]
async fn snapshot_returns_alive_sorted() {
let tracker = AliveTracker::new();
tracker
.update(&ev(EventKind::TaskStarting, "charlie", 1))
.await;
tracker
.update(&ev(EventKind::TaskStarting, "alpha", 2))
.await;
tracker
.update(&ev(EventKind::TaskStarting, "bravo", 3))
.await;
tracker
.update(&ev(EventKind::TaskStopped, "bravo", 4))
.await;
let alive = tracker.snapshot().await;
let names: Vec<&str> = alive.iter().map(|a| &**a).collect();
assert_eq!(names, vec!["alpha", "charlie"]);
}
#[tokio::test]
async fn is_alive_unknown_returns_false() {
let tracker = AliveTracker::new();
assert!(!tracker.is_alive("nonexistent").await);
}
#[tokio::test]
async fn event_without_task_name_ignored() {
let tracker = AliveTracker::new();
let mut e = Event::new(EventKind::TaskStarting);
e.seq = 1;
let changed = tracker.update(&e).await;
assert!(!changed);
assert!(tracker.snapshot().await.is_empty());
}
#[tokio::test]
async fn non_lifecycle_event_advances_seq_but_keeps_alive() {
let tracker = AliveTracker::new();
tracker.update(&ev(EventKind::TaskStarting, "t1", 1)).await;
let changed = tracker
.update(&ev(EventKind::BackoffScheduled, "t1", 2))
.await;
assert!(!changed, "non-lifecycle event should not change alive flag");
assert!(tracker.is_alive("t1").await);
let changed = tracker.update(&ev(EventKind::TaskStopped, "t1", 1)).await;
assert!(!changed, "seq=1 is stale after seq=2");
assert!(tracker.is_alive("t1").await);
}
#[tokio::test]
async fn all_death_events_set_alive_false() {
for kind in [
EventKind::TaskStopped,
EventKind::TaskFailed,
EventKind::ActorExhausted,
EventKind::ActorDead,
] {
let tracker = AliveTracker::new();
tracker.update(&ev(EventKind::TaskStarting, "t", 1)).await;
let changed = tracker.update(&ev(kind, "t", 2)).await;
assert!(changed, "{kind:?} should set alive=false");
assert!(
!tracker.is_alive("t").await,
"{kind:?} should make task dead"
);
}
}
}