Skip to main content

noxu_rep/elections/
master_tracker.rs

1//! Master tracking.
2//!
3//! maintains
4//! knowledge of the current master and its liveness based on heartbeats.
5//!
6//! The tracker is consulted by replicas to determine:
7//! - Who the current master is.
8//! - Whether the master is still alive (heartbeat within timeout).
9//! - Whether a new election result (with a higher term) should supersede the
10//!   current master.
11
12use std::time::{Duration, Instant};
13
14use noxu_sync::RwLock;
15
16use super::phi_detector::PhiAccrualDetector;
17
18/// Tracks the current known master of the replication group.
19///
20/// All methods are safe to call concurrently from multiple threads. Reads use
21/// a shared lock; writes use an exclusive lock.
22///
23/// When a [`PhiAccrualDetector`] is attached (via [`MasterTracker::with_phi`]),
24/// `is_master_alive` uses the continuous φ suspicion level instead of the
25/// binary heartbeat timeout.  This adapts automatically to network jitter.
26pub struct MasterTracker {
27    /// Name of the current master, if known.
28    current_master: RwLock<Option<String>>,
29    /// Term in which the current master was elected.
30    master_term: RwLock<u64>,
31    /// Time of the last heartbeat from the master.
32    last_heartbeat: RwLock<Option<Instant>>,
33    /// Maximum time between heartbeats before the master is considered dead.
34    heartbeat_timeout: Duration,
35    /// Optional phi accrual failure detector.
36    phi_detector: Option<PhiAccrualDetector>,
37}
38
39impl MasterTracker {
40    /// Create a new tracker with the given heartbeat timeout.
41    ///
42    /// Uses binary heartbeat timeout for liveness detection.  Call
43    /// [`with_phi`](Self::with_phi) to switch to phi accrual detection.
44    pub fn new(heartbeat_timeout: Duration) -> Self {
45        Self {
46            current_master: RwLock::new(None),
47            master_term: RwLock::new(0),
48            last_heartbeat: RwLock::new(None),
49            heartbeat_timeout,
50            phi_detector: None,
51        }
52    }
53
54    /// Attach a phi accrual failure detector.
55    ///
56    /// When attached, [`is_master_alive`](Self::is_master_alive) returns
57    /// `phi_detector.is_available()` instead of the binary timeout check.
58    pub fn with_phi(mut self, detector: PhiAccrualDetector) -> Self {
59        self.phi_detector = Some(detector);
60        self
61    }
62
63    /// Set the current master and its term unconditionally.
64    ///
65    /// Also records a heartbeat at the current time.
66    pub fn set_master(&self, name: &str, term: u64) {
67        *self.current_master.write() = Some(name.to_string());
68        *self.master_term.write() = term;
69        *self.last_heartbeat.write() = Some(Instant::now());
70    }
71
72    /// Clear the current master.
73    ///
74    /// After this call, [`get_master`](Self::get_master) returns `None`.
75    pub fn clear_master(&self) {
76        *self.current_master.write() = None;
77        *self.last_heartbeat.write() = None;
78    }
79
80    /// Returns the name of the current master, if known.
81    pub fn get_master(&self) -> Option<String> {
82        self.current_master.read().clone()
83    }
84
85    /// Returns the term of the current master.
86    pub fn get_term(&self) -> u64 {
87        *self.master_term.read()
88    }
89
90    /// Record a heartbeat from the master at the current time.
91    pub fn record_heartbeat(&self) {
92        *self.last_heartbeat.write() = Some(Instant::now());
93        if let Some(ref phi) = self.phi_detector {
94            phi.record_heartbeat();
95        }
96    }
97
98    /// Returns `true` if a master is set and it is still considered alive.
99    ///
100    /// When a phi detector is configured, uses the continuous suspicion level
101    /// (`phi.is_available()`).  Otherwise falls back to binary heartbeat
102    /// timeout (`elapsed < heartbeat_timeout`).
103    pub fn is_master_alive(&self) -> bool {
104        let master = self.current_master.read();
105        if master.is_none() {
106            return false;
107        }
108        drop(master);
109
110        match &self.phi_detector {
111            Some(phi) => phi.is_available(),
112            None => {
113                let hb = self.last_heartbeat.read();
114                match *hb {
115                    Some(t) => t.elapsed() < self.heartbeat_timeout,
116                    None => false,
117                }
118            }
119        }
120    }
121
122    /// Returns the current phi suspicion value, or `None` if no phi detector
123    /// is configured.
124    pub fn phi(&self) -> Option<f64> {
125        self.phi_detector.as_ref().map(|d| d.phi())
126    }
127
128    /// Returns the duration since the last heartbeat, or `None` if no
129    /// heartbeat has been recorded.
130    pub fn time_since_heartbeat(&self) -> Option<Duration> {
131        self.last_heartbeat.read().map(|t| t.elapsed())
132    }
133
134    /// Update the master only if `term` is greater than or equal to the
135    /// current term.
136    ///
137    /// This ensures that stale election results (from older terms) cannot
138    /// overwrite a more recent master.
139    ///
140    /// Returns `true` if the master was updated, `false` if the update was
141    /// rejected due to a stale term.
142    pub fn update_master(&self, name: &str, term: u64) -> bool {
143        // Take write locks to perform the check-and-set atomically.
144        let mut current_term = self.master_term.write();
145        if term < *current_term {
146            return false;
147        }
148
149        *current_term = term;
150        *self.current_master.write() = Some(name.to_string());
151        *self.last_heartbeat.write() = Some(Instant::now());
152
153        true
154    }
155
156    /// Returns the configured heartbeat timeout.
157    pub fn heartbeat_timeout(&self) -> Duration {
158        self.heartbeat_timeout
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    use super::*;
165    use std::thread;
166
167    // --- Basic set / get / clear ---
168
169    #[test]
170    fn test_initial_state() {
171        let tracker = MasterTracker::new(Duration::from_secs(5));
172        assert!(tracker.get_master().is_none());
173        assert_eq!(tracker.get_term(), 0);
174        assert!(!tracker.is_master_alive());
175        assert!(tracker.time_since_heartbeat().is_none());
176    }
177
178    #[test]
179    fn test_set_master() {
180        let tracker = MasterTracker::new(Duration::from_secs(5));
181        tracker.set_master("node1", 1);
182
183        assert_eq!(tracker.get_master(), Some("node1".to_string()));
184        assert_eq!(tracker.get_term(), 1);
185        assert!(tracker.is_master_alive());
186    }
187
188    #[test]
189    fn test_clear_master() {
190        let tracker = MasterTracker::new(Duration::from_secs(5));
191        tracker.set_master("node1", 1);
192        tracker.clear_master();
193
194        assert!(tracker.get_master().is_none());
195        assert!(!tracker.is_master_alive());
196    }
197
198    #[test]
199    fn test_set_master_replaces_previous() {
200        let tracker = MasterTracker::new(Duration::from_secs(5));
201        tracker.set_master("node1", 1);
202        tracker.set_master("node2", 2);
203
204        assert_eq!(tracker.get_master(), Some("node2".to_string()));
205        assert_eq!(tracker.get_term(), 2);
206    }
207
208    // --- Heartbeat ---
209
210    #[test]
211    fn test_record_heartbeat() {
212        let tracker = MasterTracker::new(Duration::from_secs(5));
213        tracker.set_master("node1", 1);
214
215        // Heartbeat was set by set_master.
216        let since = tracker.time_since_heartbeat().unwrap();
217        assert!(since < Duration::from_millis(100));
218
219        // Record another heartbeat.
220        tracker.record_heartbeat();
221        let since2 = tracker.time_since_heartbeat().unwrap();
222        assert!(since2 < Duration::from_millis(100));
223    }
224
225    #[test]
226    fn test_stale_master_detection() {
227        // Use a very short timeout so the master becomes stale quickly.
228        let tracker = MasterTracker::new(Duration::from_millis(10));
229        tracker.set_master("node1", 1);
230
231        assert!(tracker.is_master_alive());
232
233        // Wait for the heartbeat to expire.
234        thread::sleep(Duration::from_millis(20));
235        assert!(!tracker.is_master_alive());
236    }
237
238    #[test]
239    fn test_heartbeat_refresh_keeps_alive() {
240        let tracker = MasterTracker::new(Duration::from_millis(50));
241        tracker.set_master("node1", 1);
242
243        thread::sleep(Duration::from_millis(30));
244        tracker.record_heartbeat();
245        assert!(tracker.is_master_alive());
246
247        thread::sleep(Duration::from_millis(30));
248        tracker.record_heartbeat();
249        assert!(tracker.is_master_alive());
250    }
251
252    #[test]
253    fn test_time_since_heartbeat_increases() {
254        let tracker = MasterTracker::new(Duration::from_secs(5));
255        tracker.set_master("node1", 1);
256
257        let t1 = tracker.time_since_heartbeat().unwrap();
258        thread::sleep(Duration::from_millis(10));
259        let t2 = tracker.time_since_heartbeat().unwrap();
260
261        assert!(t2 > t1);
262    }
263
264    // --- Term ordering ---
265
266    #[test]
267    fn test_update_master_higher_term_accepted() {
268        let tracker = MasterTracker::new(Duration::from_secs(5));
269        tracker.set_master("node1", 1);
270
271        assert!(tracker.update_master("node2", 2));
272        assert_eq!(tracker.get_master(), Some("node2".to_string()));
273        assert_eq!(tracker.get_term(), 2);
274    }
275
276    #[test]
277    fn test_update_master_same_term_accepted() {
278        let tracker = MasterTracker::new(Duration::from_secs(5));
279        tracker.set_master("node1", 5);
280
281        // Same term  -  update is accepted (could be a re-election in the same
282        // term or a late notification).
283        assert!(tracker.update_master("node2", 5));
284        assert_eq!(tracker.get_master(), Some("node2".to_string()));
285    }
286
287    #[test]
288    fn test_update_master_lower_term_rejected() {
289        let tracker = MasterTracker::new(Duration::from_secs(5));
290        tracker.set_master("node1", 5);
291
292        assert!(!tracker.update_master("node2", 3));
293        // Master unchanged.
294        assert_eq!(tracker.get_master(), Some("node1".to_string()));
295        assert_eq!(tracker.get_term(), 5);
296    }
297
298    #[test]
299    fn test_update_master_from_no_master() {
300        let tracker = MasterTracker::new(Duration::from_secs(5));
301        assert!(tracker.update_master("node1", 1));
302        assert_eq!(tracker.get_master(), Some("node1".to_string()));
303    }
304
305    // --- Misc ---
306
307    #[test]
308    fn test_heartbeat_timeout_accessor() {
309        let tracker = MasterTracker::new(Duration::from_secs(42));
310        assert_eq!(tracker.heartbeat_timeout(), Duration::from_secs(42));
311    }
312
313    #[test]
314    fn test_no_heartbeat_means_not_alive() {
315        let tracker = MasterTracker::new(Duration::from_secs(5));
316        // Master is cleared (no heartbeat).
317        tracker.set_master("node1", 1);
318        tracker.clear_master();
319        assert!(!tracker.is_master_alive());
320    }
321
322    // --- Send + Sync ---
323
324    #[test]
325    fn test_send_sync() {
326        fn assert_send_sync<T: Send + Sync>() {}
327        assert_send_sync::<MasterTracker>();
328    }
329
330    // --- Concurrent access ---
331
332    #[test]
333    fn test_concurrent_updates() {
334        use std::sync::Arc;
335
336        let tracker = Arc::new(MasterTracker::new(Duration::from_secs(5)));
337        let mut handles = vec![];
338
339        for i in 0..10 {
340            let t = Arc::clone(&tracker);
341            handles.push(thread::spawn(move || {
342                let name = format!("node{}", i);
343                t.update_master(&name, i as u64);
344                t.record_heartbeat();
345                t.get_master();
346                t.is_master_alive();
347            }));
348        }
349
350        for h in handles {
351            h.join().unwrap();
352        }
353
354        // After all threads finish, the master should be the one with the
355        // highest term that was accepted.
356        assert!(tracker.get_master().is_some());
357        assert!(tracker.get_term() >= 1);
358    }
359}