Skip to main content

actionqueue_actor/
heartbeat.rs

1//! Per-actor heartbeat tracking and timeout detection.
2
3use std::collections::HashMap;
4
5use actionqueue_core::actor::HeartbeatPolicy;
6use actionqueue_core::ids::ActorId;
7use tracing;
8
9/// Per-actor heartbeat state.
10struct HeartbeatState {
11    last_heartbeat_at: u64,
12    policy: HeartbeatPolicy,
13}
14
15/// Tracks last heartbeat timestamps and detects timeout conditions.
16///
17/// The dispatch loop calls [`check_timeouts`](HeartbeatMonitor::check_timeouts)
18/// once per tick to find actors whose heartbeat has expired.
19#[derive(Default)]
20pub struct HeartbeatMonitor {
21    heartbeats: HashMap<ActorId, HeartbeatState>,
22}
23
24impl HeartbeatMonitor {
25    /// Creates an empty monitor.
26    pub fn new() -> Self {
27        Self::default()
28    }
29
30    /// Records actor registration with initial heartbeat timestamp.
31    pub fn record_registration(&mut self, actor_id: ActorId, policy: HeartbeatPolicy, now: u64) {
32        tracing::debug!(
33            %actor_id,
34            timeout_secs = policy.timeout_secs(),
35            "heartbeat tracking started"
36        );
37        self.heartbeats.insert(actor_id, HeartbeatState { last_heartbeat_at: now, policy });
38    }
39
40    /// Records a heartbeat for an active actor.
41    pub fn record_heartbeat(&mut self, actor_id: ActorId, timestamp: u64) {
42        if let Some(state) = self.heartbeats.get_mut(&actor_id) {
43            tracing::trace!(%actor_id, timestamp, "heartbeat recorded");
44            state.last_heartbeat_at = timestamp;
45        }
46    }
47
48    /// Removes tracking for an actor (called on deregistration).
49    pub fn remove(&mut self, actor_id: ActorId) {
50        self.heartbeats.remove(&actor_id);
51    }
52
53    /// Returns actor IDs whose heartbeat has timed out at `now`.
54    ///
55    /// An actor times out when `now >= last_heartbeat_at + timeout_secs`.
56    pub fn check_timeouts(&self, now: u64) -> Vec<ActorId> {
57        let timed_out: Vec<ActorId> = self
58            .heartbeats
59            .iter()
60            .filter(|(_, state)| {
61                let deadline = state.last_heartbeat_at.saturating_add(state.policy.timeout_secs());
62                now >= deadline
63            })
64            .map(|(&id, _)| id)
65            .collect();
66        for &actor_id in &timed_out {
67            tracing::warn!(%actor_id, now, "actor heartbeat timeout detected");
68        }
69        timed_out
70    }
71
72    /// Returns `true` if the actor's heartbeat is within the timeout window.
73    pub fn is_alive(&self, actor_id: ActorId, now: u64) -> bool {
74        self.heartbeats.get(&actor_id).is_some_and(|state| {
75            let deadline = state.last_heartbeat_at.saturating_add(state.policy.timeout_secs());
76            now < deadline
77        })
78    }
79}
80
81#[cfg(test)]
82mod tests {
83    use actionqueue_core::actor::HeartbeatPolicy;
84    use actionqueue_core::ids::ActorId;
85
86    use super::HeartbeatMonitor;
87
88    fn policy(interval_secs: u64) -> HeartbeatPolicy {
89        HeartbeatPolicy::new(interval_secs, 3)
90    }
91
92    #[test]
93    fn actor_alive_within_window() {
94        let mut monitor = HeartbeatMonitor::new();
95        let id = ActorId::new();
96        monitor.record_registration(id, policy(10), 1000);
97        // timeout = 10 * 3 = 30 secs. At t=1029, still alive.
98        assert!(monitor.is_alive(id, 1029));
99    }
100
101    #[test]
102    fn actor_times_out_after_deadline() {
103        let mut monitor = HeartbeatMonitor::new();
104        let id = ActorId::new();
105        monitor.record_registration(id, policy(10), 1000);
106        // timeout = 30. At t=1030, timed out.
107        let timed_out = monitor.check_timeouts(1030);
108        assert!(timed_out.contains(&id));
109        assert!(!monitor.is_alive(id, 1030));
110    }
111
112    #[test]
113    fn heartbeat_resets_timeout() {
114        let mut monitor = HeartbeatMonitor::new();
115        let id = ActorId::new();
116        monitor.record_registration(id, policy(10), 1000);
117        monitor.record_heartbeat(id, 1025);
118        // New deadline: 1025 + 30 = 1055. At t=1054, alive.
119        assert!(monitor.is_alive(id, 1054));
120        assert!(monitor.check_timeouts(1054).is_empty());
121    }
122
123    #[test]
124    fn remove_stops_tracking() {
125        let mut monitor = HeartbeatMonitor::new();
126        let id = ActorId::new();
127        monitor.record_registration(id, policy(10), 1000);
128        monitor.remove(id);
129        assert!(!monitor.is_alive(id, 1030));
130        assert!(monitor.check_timeouts(9999).is_empty());
131    }
132}