use std::time::{Duration, Instant};
use noxu_sync::RwLock;
use super::phi_detector::PhiAccrualDetector;
pub struct MasterTracker {
current_master: RwLock<Option<String>>,
master_term: RwLock<u64>,
last_heartbeat: RwLock<Option<Instant>>,
heartbeat_timeout: Duration,
phi_detector: Option<PhiAccrualDetector>,
}
impl MasterTracker {
pub fn new(heartbeat_timeout: Duration) -> Self {
Self {
current_master: RwLock::new(None),
master_term: RwLock::new(0),
last_heartbeat: RwLock::new(None),
heartbeat_timeout,
phi_detector: None,
}
}
pub fn with_phi(mut self, detector: PhiAccrualDetector) -> Self {
self.phi_detector = Some(detector);
self
}
pub fn set_master(&self, name: &str, term: u64) {
*self.current_master.write() = Some(name.to_string());
*self.master_term.write() = term;
*self.last_heartbeat.write() = Some(Instant::now());
}
pub fn clear_master(&self) {
*self.current_master.write() = None;
*self.last_heartbeat.write() = None;
}
pub fn get_master(&self) -> Option<String> {
self.current_master.read().clone()
}
pub fn get_term(&self) -> u64 {
*self.master_term.read()
}
pub fn record_heartbeat(&self) {
*self.last_heartbeat.write() = Some(Instant::now());
if let Some(ref phi) = self.phi_detector {
phi.record_heartbeat();
}
}
pub fn is_master_alive(&self) -> bool {
let master = self.current_master.read();
if master.is_none() {
return false;
}
drop(master);
match &self.phi_detector {
Some(phi) => phi.is_available(),
None => {
let hb = self.last_heartbeat.read();
match *hb {
Some(t) => t.elapsed() < self.heartbeat_timeout,
None => false,
}
}
}
}
pub fn phi(&self) -> Option<f64> {
self.phi_detector.as_ref().map(|d| d.phi())
}
pub fn time_since_heartbeat(&self) -> Option<Duration> {
self.last_heartbeat.read().map(|t| t.elapsed())
}
pub fn update_master(&self, name: &str, term: u64) -> bool {
let mut current_term = self.master_term.write();
if term < *current_term {
return false;
}
*current_term = term;
*self.current_master.write() = Some(name.to_string());
*self.last_heartbeat.write() = Some(Instant::now());
true
}
pub fn heartbeat_timeout(&self) -> Duration {
self.heartbeat_timeout
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
#[test]
fn test_initial_state() {
let tracker = MasterTracker::new(Duration::from_secs(5));
assert!(tracker.get_master().is_none());
assert_eq!(tracker.get_term(), 0);
assert!(!tracker.is_master_alive());
assert!(tracker.time_since_heartbeat().is_none());
}
#[test]
fn test_set_master() {
let tracker = MasterTracker::new(Duration::from_secs(5));
tracker.set_master("node1", 1);
assert_eq!(tracker.get_master(), Some("node1".to_string()));
assert_eq!(tracker.get_term(), 1);
assert!(tracker.is_master_alive());
}
#[test]
fn test_clear_master() {
let tracker = MasterTracker::new(Duration::from_secs(5));
tracker.set_master("node1", 1);
tracker.clear_master();
assert!(tracker.get_master().is_none());
assert!(!tracker.is_master_alive());
}
#[test]
fn test_set_master_replaces_previous() {
let tracker = MasterTracker::new(Duration::from_secs(5));
tracker.set_master("node1", 1);
tracker.set_master("node2", 2);
assert_eq!(tracker.get_master(), Some("node2".to_string()));
assert_eq!(tracker.get_term(), 2);
}
#[test]
fn test_record_heartbeat() {
let tracker = MasterTracker::new(Duration::from_secs(5));
tracker.set_master("node1", 1);
let since = tracker.time_since_heartbeat().unwrap();
assert!(since < Duration::from_millis(100));
tracker.record_heartbeat();
let since2 = tracker.time_since_heartbeat().unwrap();
assert!(since2 < Duration::from_millis(100));
}
#[test]
fn test_stale_master_detection() {
let tracker = MasterTracker::new(Duration::from_millis(10));
tracker.set_master("node1", 1);
assert!(tracker.is_master_alive());
thread::sleep(Duration::from_millis(20));
assert!(!tracker.is_master_alive());
}
#[test]
fn test_heartbeat_refresh_keeps_alive() {
let tracker = MasterTracker::new(Duration::from_millis(50));
tracker.set_master("node1", 1);
thread::sleep(Duration::from_millis(30));
tracker.record_heartbeat();
assert!(tracker.is_master_alive());
thread::sleep(Duration::from_millis(30));
tracker.record_heartbeat();
assert!(tracker.is_master_alive());
}
#[test]
fn test_time_since_heartbeat_increases() {
let tracker = MasterTracker::new(Duration::from_secs(5));
tracker.set_master("node1", 1);
let t1 = tracker.time_since_heartbeat().unwrap();
thread::sleep(Duration::from_millis(10));
let t2 = tracker.time_since_heartbeat().unwrap();
assert!(t2 > t1);
}
#[test]
fn test_update_master_higher_term_accepted() {
let tracker = MasterTracker::new(Duration::from_secs(5));
tracker.set_master("node1", 1);
assert!(tracker.update_master("node2", 2));
assert_eq!(tracker.get_master(), Some("node2".to_string()));
assert_eq!(tracker.get_term(), 2);
}
#[test]
fn test_update_master_same_term_accepted() {
let tracker = MasterTracker::new(Duration::from_secs(5));
tracker.set_master("node1", 5);
assert!(tracker.update_master("node2", 5));
assert_eq!(tracker.get_master(), Some("node2".to_string()));
}
#[test]
fn test_update_master_lower_term_rejected() {
let tracker = MasterTracker::new(Duration::from_secs(5));
tracker.set_master("node1", 5);
assert!(!tracker.update_master("node2", 3));
assert_eq!(tracker.get_master(), Some("node1".to_string()));
assert_eq!(tracker.get_term(), 5);
}
#[test]
fn test_update_master_from_no_master() {
let tracker = MasterTracker::new(Duration::from_secs(5));
assert!(tracker.update_master("node1", 1));
assert_eq!(tracker.get_master(), Some("node1".to_string()));
}
#[test]
fn test_heartbeat_timeout_accessor() {
let tracker = MasterTracker::new(Duration::from_secs(42));
assert_eq!(tracker.heartbeat_timeout(), Duration::from_secs(42));
}
#[test]
fn test_no_heartbeat_means_not_alive() {
let tracker = MasterTracker::new(Duration::from_secs(5));
tracker.set_master("node1", 1);
tracker.clear_master();
assert!(!tracker.is_master_alive());
}
#[test]
fn test_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<MasterTracker>();
}
#[test]
fn test_concurrent_updates() {
use std::sync::Arc;
let tracker = Arc::new(MasterTracker::new(Duration::from_secs(5)));
let mut handles = vec![];
for i in 0..10 {
let t = Arc::clone(&tracker);
handles.push(thread::spawn(move || {
let name = format!("node{}", i);
t.update_master(&name, i as u64);
t.record_heartbeat();
t.get_master();
t.is_master_alive();
}));
}
for h in handles {
h.join().unwrap();
}
assert!(tracker.get_master().is_some());
assert!(tracker.get_term() >= 1);
}
}