actionqueue_actor/
heartbeat.rs1use std::collections::HashMap;
4
5use actionqueue_core::actor::HeartbeatPolicy;
6use actionqueue_core::ids::ActorId;
7use tracing;
8
9struct HeartbeatState {
11 last_heartbeat_at: u64,
12 policy: HeartbeatPolicy,
13}
14
15#[derive(Default)]
20pub struct HeartbeatMonitor {
21 heartbeats: HashMap<ActorId, HeartbeatState>,
22}
23
24impl HeartbeatMonitor {
25 pub fn new() -> Self {
27 Self::default()
28 }
29
30 pub fn record_registration(&mut self, actor_id: ActorId, policy: HeartbeatPolicy, now: u64) {
32 tracing::debug!(
33 %actor_id,
34 timeout_secs = policy.timeout_secs(),
35 "heartbeat tracking started"
36 );
37 self.heartbeats.insert(actor_id, HeartbeatState { last_heartbeat_at: now, policy });
38 }
39
40 pub fn record_heartbeat(&mut self, actor_id: ActorId, timestamp: u64) {
42 if let Some(state) = self.heartbeats.get_mut(&actor_id) {
43 tracing::trace!(%actor_id, timestamp, "heartbeat recorded");
44 state.last_heartbeat_at = timestamp;
45 }
46 }
47
48 pub fn remove(&mut self, actor_id: ActorId) {
50 self.heartbeats.remove(&actor_id);
51 }
52
53 pub fn check_timeouts(&self, now: u64) -> Vec<ActorId> {
57 let timed_out: Vec<ActorId> = self
58 .heartbeats
59 .iter()
60 .filter(|(_, state)| {
61 let deadline = state.last_heartbeat_at.saturating_add(state.policy.timeout_secs());
62 now >= deadline
63 })
64 .map(|(&id, _)| id)
65 .collect();
66 for &actor_id in &timed_out {
67 tracing::warn!(%actor_id, now, "actor heartbeat timeout detected");
68 }
69 timed_out
70 }
71
72 pub fn is_alive(&self, actor_id: ActorId, now: u64) -> bool {
74 self.heartbeats.get(&actor_id).is_some_and(|state| {
75 let deadline = state.last_heartbeat_at.saturating_add(state.policy.timeout_secs());
76 now < deadline
77 })
78 }
79}
80
81#[cfg(test)]
82mod tests {
83 use actionqueue_core::actor::HeartbeatPolicy;
84 use actionqueue_core::ids::ActorId;
85
86 use super::HeartbeatMonitor;
87
88 fn policy(interval_secs: u64) -> HeartbeatPolicy {
89 HeartbeatPolicy::new(interval_secs, 3)
90 }
91
92 #[test]
93 fn actor_alive_within_window() {
94 let mut monitor = HeartbeatMonitor::new();
95 let id = ActorId::new();
96 monitor.record_registration(id, policy(10), 1000);
97 assert!(monitor.is_alive(id, 1029));
99 }
100
101 #[test]
102 fn actor_times_out_after_deadline() {
103 let mut monitor = HeartbeatMonitor::new();
104 let id = ActorId::new();
105 monitor.record_registration(id, policy(10), 1000);
106 let timed_out = monitor.check_timeouts(1030);
108 assert!(timed_out.contains(&id));
109 assert!(!monitor.is_alive(id, 1030));
110 }
111
112 #[test]
113 fn heartbeat_resets_timeout() {
114 let mut monitor = HeartbeatMonitor::new();
115 let id = ActorId::new();
116 monitor.record_registration(id, policy(10), 1000);
117 monitor.record_heartbeat(id, 1025);
118 assert!(monitor.is_alive(id, 1054));
120 assert!(monitor.check_timeouts(1054).is_empty());
121 }
122
123 #[test]
124 fn remove_stops_tracking() {
125 let mut monitor = HeartbeatMonitor::new();
126 let id = ActorId::new();
127 monitor.record_registration(id, policy(10), 1000);
128 monitor.remove(id);
129 assert!(!monitor.is_alive(id, 1030));
130 assert!(monitor.check_timeouts(9999).is_empty());
131 }
132}