Skip to main content

atomr_cluster/
heartbeat_sender.rs

1//! Heartbeat sender — periodically updates the failure detector
2//! with a synthetic heartbeat per peer.
3//!
4//! The sender owns the per-peer interval timer and feeds the local
5//! [`crate::HeartbeatState`] book-keeping. The actual cross-node heartbeat
6//! PDU exchange wires in once Phase 5.D's reader/writer split + Phase
7//! 6.D's gossip transport land.
8
9use std::collections::HashMap;
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12
13use atomr_core::actor::Address;
14use parking_lot::RwLock;
15
16/// Per-peer heartbeat record kept by the sender.
17#[derive(Debug, Clone)]
18pub struct PeerHeartbeat {
19    /// Last time the local sender ticked for this peer.
20    pub last_tick: Instant,
21    /// Number of ticks emitted since the peer was added.
22    pub ticks: u64,
23}
24
25/// In-memory heartbeat-sender state.
26#[derive(Default)]
27pub struct HeartbeatSender {
28    interval: Duration,
29    peers: RwLock<HashMap<String, PeerHeartbeat>>,
30}
31
32impl HeartbeatSender {
33    pub fn new(interval: Duration) -> Arc<Self> {
34        assert!(!interval.is_zero(), "heartbeat interval must be > 0");
35        Arc::new(Self { interval, peers: RwLock::new(HashMap::new()) })
36    }
37
38    pub fn interval(&self) -> Duration {
39        self.interval
40    }
41
42    /// Add a peer to the rotation.
43    pub fn add_peer(&self, addr: &Address) {
44        self.peers.write().insert(
45            addr.to_string(),
46            PeerHeartbeat {
47                last_tick: Instant::now() - self.interval, // tick on first poll
48                ticks: 0,
49            },
50        );
51    }
52
53    pub fn remove_peer(&self, addr: &Address) {
54        self.peers.write().remove(&addr.to_string());
55    }
56
57    pub fn peer_count(&self) -> usize {
58        self.peers.read().len()
59    }
60
61    /// Run one tick — return the addresses whose `last_tick` is older
62    /// than `interval`. The caller emits a heartbeat PDU to each and
63    /// then calls [`Self::record_tick`].
64    pub fn due_peers(&self, now: Instant) -> Vec<Address> {
65        let g = self.peers.read();
66        g.values()
67            .filter_map(|hb| {
68                if now.duration_since(hb.last_tick) >= self.interval {
69                    Address::parse(&_addr_round_trip(&g, hb))
70                } else {
71                    None
72                }
73            })
74            .collect()
75    }
76
77    /// Bump the per-peer last-tick to `now` and increment the
78    /// counter. No-op if the peer is unknown.
79    pub fn record_tick(&self, addr: &Address, now: Instant) {
80        let mut g = self.peers.write();
81        if let Some(hb) = g.get_mut(&addr.to_string()) {
82            hb.last_tick = now;
83            hb.ticks += 1;
84        }
85    }
86
87    /// Snapshot of (peer-address-string, ticks-emitted).
88    pub fn ticks_per_peer(&self) -> Vec<(String, u64)> {
89        let mut v: Vec<(String, u64)> =
90            self.peers.read().iter().map(|(k, hb)| (k.clone(), hb.ticks)).collect();
91        v.sort_by(|a, b| a.0.cmp(&b.0));
92        v
93    }
94}
95
96// helper — recover the address string from a record by scanning the
97// map. We don't carry the address inside `PeerHeartbeat` to keep that
98// struct small; the map key is the canonical form.
99fn _addr_round_trip(map: &HashMap<String, PeerHeartbeat>, target: &PeerHeartbeat) -> String {
100    for (k, v) in map {
101        if std::ptr::eq(v as *const PeerHeartbeat, target as *const PeerHeartbeat) {
102            return k.clone();
103        }
104    }
105    String::new()
106}
107
108#[cfg(test)]
109mod tests {
110    use super::*;
111
112    #[test]
113    fn add_and_remove_peer() {
114        let s = HeartbeatSender::new(Duration::from_secs(1));
115        let a = Address::local("a");
116        s.add_peer(&a);
117        assert_eq!(s.peer_count(), 1);
118        s.remove_peer(&a);
119        assert_eq!(s.peer_count(), 0);
120    }
121
122    #[test]
123    fn record_tick_increments_count() {
124        let s = HeartbeatSender::new(Duration::from_millis(10));
125        let a = Address::local("a");
126        s.add_peer(&a);
127        let now = Instant::now();
128        s.record_tick(&a, now);
129        s.record_tick(&a, now);
130        let snap = s.ticks_per_peer();
131        assert_eq!(snap.len(), 1);
132        assert_eq!(snap[0].1, 2);
133    }
134
135    #[test]
136    fn due_peers_respects_interval() {
137        let s = HeartbeatSender::new(Duration::from_secs(60));
138        let a = Address::local("a");
139        s.add_peer(&a);
140        // Just-added peers tick on first poll (last_tick is in the past).
141        let now = Instant::now();
142        let due = s.due_peers(now);
143        assert_eq!(due.len(), 1);
144        // After recording a tick, they're not due again until interval passes.
145        s.record_tick(&a, now);
146        assert!(s.due_peers(now).is_empty());
147    }
148}