noxu_rep/elections/
master_tracker.rs1use std::time::{Duration, Instant};
13
14use noxu_sync::RwLock;
15
16use super::phi_detector::PhiAccrualDetector;
17
18pub struct MasterTracker {
27 current_master: RwLock<Option<String>>,
29 master_term: RwLock<u64>,
31 last_heartbeat: RwLock<Option<Instant>>,
33 heartbeat_timeout: Duration,
35 phi_detector: Option<PhiAccrualDetector>,
37}
38
39impl MasterTracker {
40 pub fn new(heartbeat_timeout: Duration) -> Self {
45 Self {
46 current_master: RwLock::new(None),
47 master_term: RwLock::new(0),
48 last_heartbeat: RwLock::new(None),
49 heartbeat_timeout,
50 phi_detector: None,
51 }
52 }
53
54 pub fn with_phi(mut self, detector: PhiAccrualDetector) -> Self {
59 self.phi_detector = Some(detector);
60 self
61 }
62
63 pub fn set_master(&self, name: &str, term: u64) {
67 *self.current_master.write() = Some(name.to_string());
68 *self.master_term.write() = term;
69 *self.last_heartbeat.write() = Some(Instant::now());
70 }
71
72 pub fn clear_master(&self) {
76 *self.current_master.write() = None;
77 *self.last_heartbeat.write() = None;
78 }
79
80 pub fn get_master(&self) -> Option<String> {
82 self.current_master.read().clone()
83 }
84
85 pub fn get_term(&self) -> u64 {
87 *self.master_term.read()
88 }
89
90 pub fn record_heartbeat(&self) {
92 *self.last_heartbeat.write() = Some(Instant::now());
93 if let Some(ref phi) = self.phi_detector {
94 phi.record_heartbeat();
95 }
96 }
97
98 pub fn is_master_alive(&self) -> bool {
104 let master = self.current_master.read();
105 if master.is_none() {
106 return false;
107 }
108 drop(master);
109
110 match &self.phi_detector {
111 Some(phi) => phi.is_available(),
112 None => {
113 let hb = self.last_heartbeat.read();
114 match *hb {
115 Some(t) => t.elapsed() < self.heartbeat_timeout,
116 None => false,
117 }
118 }
119 }
120 }
121
122 pub fn phi(&self) -> Option<f64> {
125 self.phi_detector.as_ref().map(|d| d.phi())
126 }
127
128 pub fn time_since_heartbeat(&self) -> Option<Duration> {
131 self.last_heartbeat.read().map(|t| t.elapsed())
132 }
133
134 pub fn update_master(&self, name: &str, term: u64) -> bool {
143 let mut current_term = self.master_term.write();
145 if term < *current_term {
146 return false;
147 }
148
149 *current_term = term;
150 *self.current_master.write() = Some(name.to_string());
151 *self.last_heartbeat.write() = Some(Instant::now());
152
153 true
154 }
155
156 pub fn heartbeat_timeout(&self) -> Duration {
158 self.heartbeat_timeout
159 }
160}
161
162#[cfg(test)]
163mod tests {
164 use super::*;
165 use std::thread;
166
167 #[test]
170 fn test_initial_state() {
171 let tracker = MasterTracker::new(Duration::from_secs(5));
172 assert!(tracker.get_master().is_none());
173 assert_eq!(tracker.get_term(), 0);
174 assert!(!tracker.is_master_alive());
175 assert!(tracker.time_since_heartbeat().is_none());
176 }
177
178 #[test]
179 fn test_set_master() {
180 let tracker = MasterTracker::new(Duration::from_secs(5));
181 tracker.set_master("node1", 1);
182
183 assert_eq!(tracker.get_master(), Some("node1".to_string()));
184 assert_eq!(tracker.get_term(), 1);
185 assert!(tracker.is_master_alive());
186 }
187
188 #[test]
189 fn test_clear_master() {
190 let tracker = MasterTracker::new(Duration::from_secs(5));
191 tracker.set_master("node1", 1);
192 tracker.clear_master();
193
194 assert!(tracker.get_master().is_none());
195 assert!(!tracker.is_master_alive());
196 }
197
198 #[test]
199 fn test_set_master_replaces_previous() {
200 let tracker = MasterTracker::new(Duration::from_secs(5));
201 tracker.set_master("node1", 1);
202 tracker.set_master("node2", 2);
203
204 assert_eq!(tracker.get_master(), Some("node2".to_string()));
205 assert_eq!(tracker.get_term(), 2);
206 }
207
208 #[test]
211 fn test_record_heartbeat() {
212 let tracker = MasterTracker::new(Duration::from_secs(5));
213 tracker.set_master("node1", 1);
214
215 let since = tracker.time_since_heartbeat().unwrap();
217 assert!(since < Duration::from_millis(100));
218
219 tracker.record_heartbeat();
221 let since2 = tracker.time_since_heartbeat().unwrap();
222 assert!(since2 < Duration::from_millis(100));
223 }
224
225 #[test]
226 fn test_stale_master_detection() {
227 let tracker = MasterTracker::new(Duration::from_millis(10));
229 tracker.set_master("node1", 1);
230
231 assert!(tracker.is_master_alive());
232
233 thread::sleep(Duration::from_millis(20));
235 assert!(!tracker.is_master_alive());
236 }
237
238 #[test]
239 fn test_heartbeat_refresh_keeps_alive() {
240 let tracker = MasterTracker::new(Duration::from_millis(50));
241 tracker.set_master("node1", 1);
242
243 thread::sleep(Duration::from_millis(30));
244 tracker.record_heartbeat();
245 assert!(tracker.is_master_alive());
246
247 thread::sleep(Duration::from_millis(30));
248 tracker.record_heartbeat();
249 assert!(tracker.is_master_alive());
250 }
251
252 #[test]
253 fn test_time_since_heartbeat_increases() {
254 let tracker = MasterTracker::new(Duration::from_secs(5));
255 tracker.set_master("node1", 1);
256
257 let t1 = tracker.time_since_heartbeat().unwrap();
258 thread::sleep(Duration::from_millis(10));
259 let t2 = tracker.time_since_heartbeat().unwrap();
260
261 assert!(t2 > t1);
262 }
263
264 #[test]
267 fn test_update_master_higher_term_accepted() {
268 let tracker = MasterTracker::new(Duration::from_secs(5));
269 tracker.set_master("node1", 1);
270
271 assert!(tracker.update_master("node2", 2));
272 assert_eq!(tracker.get_master(), Some("node2".to_string()));
273 assert_eq!(tracker.get_term(), 2);
274 }
275
276 #[test]
277 fn test_update_master_same_term_accepted() {
278 let tracker = MasterTracker::new(Duration::from_secs(5));
279 tracker.set_master("node1", 5);
280
281 assert!(tracker.update_master("node2", 5));
284 assert_eq!(tracker.get_master(), Some("node2".to_string()));
285 }
286
287 #[test]
288 fn test_update_master_lower_term_rejected() {
289 let tracker = MasterTracker::new(Duration::from_secs(5));
290 tracker.set_master("node1", 5);
291
292 assert!(!tracker.update_master("node2", 3));
293 assert_eq!(tracker.get_master(), Some("node1".to_string()));
295 assert_eq!(tracker.get_term(), 5);
296 }
297
298 #[test]
299 fn test_update_master_from_no_master() {
300 let tracker = MasterTracker::new(Duration::from_secs(5));
301 assert!(tracker.update_master("node1", 1));
302 assert_eq!(tracker.get_master(), Some("node1".to_string()));
303 }
304
305 #[test]
308 fn test_heartbeat_timeout_accessor() {
309 let tracker = MasterTracker::new(Duration::from_secs(42));
310 assert_eq!(tracker.heartbeat_timeout(), Duration::from_secs(42));
311 }
312
313 #[test]
314 fn test_no_heartbeat_means_not_alive() {
315 let tracker = MasterTracker::new(Duration::from_secs(5));
316 tracker.set_master("node1", 1);
318 tracker.clear_master();
319 assert!(!tracker.is_master_alive());
320 }
321
322 #[test]
325 fn test_send_sync() {
326 fn assert_send_sync<T: Send + Sync>() {}
327 assert_send_sync::<MasterTracker>();
328 }
329
330 #[test]
333 fn test_concurrent_updates() {
334 use std::sync::Arc;
335
336 let tracker = Arc::new(MasterTracker::new(Duration::from_secs(5)));
337 let mut handles = vec![];
338
339 for i in 0..10 {
340 let t = Arc::clone(&tracker);
341 handles.push(thread::spawn(move || {
342 let name = format!("node{}", i);
343 t.update_master(&name, i as u64);
344 t.record_heartbeat();
345 t.get_master();
346 t.is_master_alive();
347 }));
348 }
349
350 for h in handles {
351 h.join().unwrap();
352 }
353
354 assert!(tracker.get_master().is_some());
357 assert!(tracker.get_term() >= 1);
358 }
359}