use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use atomr_core::actor::Address;
use parking_lot::RwLock;
#[derive(Debug, Clone)]
pub struct PeerHeartbeat {
pub last_tick: Instant,
pub ticks: u64,
}
#[derive(Default)]
pub struct HeartbeatSender {
interval: Duration,
peers: RwLock<HashMap<String, PeerHeartbeat>>,
}
impl HeartbeatSender {
pub fn new(interval: Duration) -> Arc<Self> {
assert!(!interval.is_zero(), "heartbeat interval must be > 0");
Arc::new(Self { interval, peers: RwLock::new(HashMap::new()) })
}
pub fn interval(&self) -> Duration {
self.interval
}
pub fn add_peer(&self, addr: &Address) {
self.peers.write().insert(
addr.to_string(),
PeerHeartbeat {
last_tick: Instant::now() - self.interval, ticks: 0,
},
);
}
pub fn remove_peer(&self, addr: &Address) {
self.peers.write().remove(&addr.to_string());
}
pub fn peer_count(&self) -> usize {
self.peers.read().len()
}
pub fn due_peers(&self, now: Instant) -> Vec<Address> {
let g = self.peers.read();
g.values()
.filter_map(|hb| {
if now.duration_since(hb.last_tick) >= self.interval {
Address::parse(&_addr_round_trip(&g, hb))
} else {
None
}
})
.collect()
}
pub fn record_tick(&self, addr: &Address, now: Instant) {
let mut g = self.peers.write();
if let Some(hb) = g.get_mut(&addr.to_string()) {
hb.last_tick = now;
hb.ticks += 1;
}
}
pub fn ticks_per_peer(&self) -> Vec<(String, u64)> {
let mut v: Vec<(String, u64)> =
self.peers.read().iter().map(|(k, hb)| (k.clone(), hb.ticks)).collect();
v.sort_by(|a, b| a.0.cmp(&b.0));
v
}
}
fn _addr_round_trip(map: &HashMap<String, PeerHeartbeat>, target: &PeerHeartbeat) -> String {
for (k, v) in map {
if std::ptr::eq(v as *const PeerHeartbeat, target as *const PeerHeartbeat) {
return k.clone();
}
}
String::new()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn add_and_remove_peer() {
let s = HeartbeatSender::new(Duration::from_secs(1));
let a = Address::local("a");
s.add_peer(&a);
assert_eq!(s.peer_count(), 1);
s.remove_peer(&a);
assert_eq!(s.peer_count(), 0);
}
#[test]
fn record_tick_increments_count() {
let s = HeartbeatSender::new(Duration::from_millis(10));
let a = Address::local("a");
s.add_peer(&a);
let now = Instant::now();
s.record_tick(&a, now);
s.record_tick(&a, now);
let snap = s.ticks_per_peer();
assert_eq!(snap.len(), 1);
assert_eq!(snap[0].1, 2);
}
#[test]
fn due_peers_respects_interval() {
let s = HeartbeatSender::new(Duration::from_secs(60));
let a = Address::local("a");
s.add_peer(&a);
let now = Instant::now();
let due = s.due_peers(now);
assert_eq!(due.len(), 1);
s.record_tick(&a, now);
assert!(s.due_peers(now).is_empty());
}
}