atomr_cluster/
heartbeat_sender.rs1use std::collections::HashMap;
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12
13use atomr_core::actor::Address;
14use parking_lot::RwLock;
15
16#[derive(Debug, Clone)]
18pub struct PeerHeartbeat {
19 pub last_tick: Instant,
21 pub ticks: u64,
23}
24
25#[derive(Default)]
27pub struct HeartbeatSender {
28 interval: Duration,
29 peers: RwLock<HashMap<String, PeerHeartbeat>>,
30}
31
32impl HeartbeatSender {
33 pub fn new(interval: Duration) -> Arc<Self> {
34 assert!(!interval.is_zero(), "heartbeat interval must be > 0");
35 Arc::new(Self { interval, peers: RwLock::new(HashMap::new()) })
36 }
37
38 pub fn interval(&self) -> Duration {
39 self.interval
40 }
41
42 pub fn add_peer(&self, addr: &Address) {
44 self.peers.write().insert(
45 addr.to_string(),
46 PeerHeartbeat {
47 last_tick: Instant::now() - self.interval, ticks: 0,
49 },
50 );
51 }
52
53 pub fn remove_peer(&self, addr: &Address) {
54 self.peers.write().remove(&addr.to_string());
55 }
56
57 pub fn peer_count(&self) -> usize {
58 self.peers.read().len()
59 }
60
61 pub fn due_peers(&self, now: Instant) -> Vec<Address> {
65 let g = self.peers.read();
66 g.values()
67 .filter_map(|hb| {
68 if now.duration_since(hb.last_tick) >= self.interval {
69 Address::parse(&_addr_round_trip(&g, hb))
70 } else {
71 None
72 }
73 })
74 .collect()
75 }
76
77 pub fn record_tick(&self, addr: &Address, now: Instant) {
80 let mut g = self.peers.write();
81 if let Some(hb) = g.get_mut(&addr.to_string()) {
82 hb.last_tick = now;
83 hb.ticks += 1;
84 }
85 }
86
87 pub fn ticks_per_peer(&self) -> Vec<(String, u64)> {
89 let mut v: Vec<(String, u64)> =
90 self.peers.read().iter().map(|(k, hb)| (k.clone(), hb.ticks)).collect();
91 v.sort_by(|a, b| a.0.cmp(&b.0));
92 v
93 }
94}
95
96fn _addr_round_trip(map: &HashMap<String, PeerHeartbeat>, target: &PeerHeartbeat) -> String {
100 for (k, v) in map {
101 if std::ptr::eq(v as *const PeerHeartbeat, target as *const PeerHeartbeat) {
102 return k.clone();
103 }
104 }
105 String::new()
106}
107
108#[cfg(test)]
109mod tests {
110 use super::*;
111
112 #[test]
113 fn add_and_remove_peer() {
114 let s = HeartbeatSender::new(Duration::from_secs(1));
115 let a = Address::local("a");
116 s.add_peer(&a);
117 assert_eq!(s.peer_count(), 1);
118 s.remove_peer(&a);
119 assert_eq!(s.peer_count(), 0);
120 }
121
122 #[test]
123 fn record_tick_increments_count() {
124 let s = HeartbeatSender::new(Duration::from_millis(10));
125 let a = Address::local("a");
126 s.add_peer(&a);
127 let now = Instant::now();
128 s.record_tick(&a, now);
129 s.record_tick(&a, now);
130 let snap = s.ticks_per_peer();
131 assert_eq!(snap.len(), 1);
132 assert_eq!(snap[0].1, 2);
133 }
134
135 #[test]
136 fn due_peers_respects_interval() {
137 let s = HeartbeatSender::new(Duration::from_secs(60));
138 let a = Address::local("a");
139 s.add_peer(&a);
140 let now = Instant::now();
142 let due = s.due_peers(now);
143 assert_eq!(due.len(), 1);
144 s.record_tick(&a, now);
146 assert!(s.due_peers(now).is_empty());
147 }
148}