use std::collections::HashMap;
use actionqueue_core::actor::HeartbeatPolicy;
use actionqueue_core::ids::ActorId;
use tracing;
struct HeartbeatState {
last_heartbeat_at: u64,
policy: HeartbeatPolicy,
}
#[derive(Default)]
pub struct HeartbeatMonitor {
heartbeats: HashMap<ActorId, HeartbeatState>,
}
impl HeartbeatMonitor {
pub fn new() -> Self {
Self::default()
}
pub fn record_registration(&mut self, actor_id: ActorId, policy: HeartbeatPolicy, now: u64) {
tracing::debug!(
%actor_id,
timeout_secs = policy.timeout_secs(),
"heartbeat tracking started"
);
self.heartbeats.insert(actor_id, HeartbeatState { last_heartbeat_at: now, policy });
}
pub fn record_heartbeat(&mut self, actor_id: ActorId, timestamp: u64) {
if let Some(state) = self.heartbeats.get_mut(&actor_id) {
tracing::trace!(%actor_id, timestamp, "heartbeat recorded");
state.last_heartbeat_at = timestamp;
}
}
pub fn remove(&mut self, actor_id: ActorId) {
self.heartbeats.remove(&actor_id);
}
pub fn check_timeouts(&self, now: u64) -> Vec<ActorId> {
let timed_out: Vec<ActorId> = self
.heartbeats
.iter()
.filter(|(_, state)| {
let deadline = state.last_heartbeat_at.saturating_add(state.policy.timeout_secs());
now >= deadline
})
.map(|(&id, _)| id)
.collect();
for &actor_id in &timed_out {
tracing::warn!(%actor_id, now, "actor heartbeat timeout detected");
}
timed_out
}
pub fn is_alive(&self, actor_id: ActorId, now: u64) -> bool {
self.heartbeats.get(&actor_id).is_some_and(|state| {
let deadline = state.last_heartbeat_at.saturating_add(state.policy.timeout_secs());
now < deadline
})
}
}
#[cfg(test)]
mod tests {
use actionqueue_core::actor::HeartbeatPolicy;
use actionqueue_core::ids::ActorId;
use super::HeartbeatMonitor;
fn policy(interval_secs: u64) -> HeartbeatPolicy {
HeartbeatPolicy::new(interval_secs, 3)
}
#[test]
fn actor_alive_within_window() {
let mut monitor = HeartbeatMonitor::new();
let id = ActorId::new();
monitor.record_registration(id, policy(10), 1000);
assert!(monitor.is_alive(id, 1029));
}
#[test]
fn actor_times_out_after_deadline() {
let mut monitor = HeartbeatMonitor::new();
let id = ActorId::new();
monitor.record_registration(id, policy(10), 1000);
let timed_out = monitor.check_timeouts(1030);
assert!(timed_out.contains(&id));
assert!(!monitor.is_alive(id, 1030));
}
#[test]
fn heartbeat_resets_timeout() {
let mut monitor = HeartbeatMonitor::new();
let id = ActorId::new();
monitor.record_registration(id, policy(10), 1000);
monitor.record_heartbeat(id, 1025);
assert!(monitor.is_alive(id, 1054));
assert!(monitor.check_timeouts(1054).is_empty());
}
#[test]
fn remove_stops_tracking() {
let mut monitor = HeartbeatMonitor::new();
let id = ActorId::new();
monitor.record_registration(id, policy(10), 1000);
monitor.remove(id);
assert!(!monitor.is_alive(id, 1030));
assert!(monitor.check_timeouts(9999).is_empty());
}
}