1use std::collections::HashMap;
8
9#[derive(Debug, Clone, PartialEq, Eq)]
13pub enum NodeRole {
14 Leader,
15 Follower,
16 Candidate,
17 Observer,
18}
19
20#[derive(Debug, Clone)]
22pub struct NodeInfo {
23 pub id: String,
25 pub address: String,
27 pub role: NodeRole,
29 pub joined_at: u64,
31}
32
33impl NodeInfo {
34 pub fn new(
36 id: impl Into<String>,
37 address: impl Into<String>,
38 role: NodeRole,
39 joined_at: u64,
40 ) -> Self {
41 Self {
42 id: id.into(),
43 address: address.into(),
44 role,
45 joined_at,
46 }
47 }
48}
49
50#[derive(Debug, Clone, PartialEq, Eq)]
54pub enum NodeState {
55 Alive,
57 Suspected,
59 Dead,
61}
62
63#[derive(Debug, Clone, PartialEq)]
67pub struct HeartbeatRecord {
68 pub node_id: String,
70 pub received_at: u64,
72 pub latency_ms: u64,
74}
75
76pub struct NodeMonitor {
80 nodes: HashMap<String, NodeInfo>,
81 states: HashMap<String, NodeState>,
82 heartbeats: HashMap<String, Vec<HeartbeatRecord>>,
83 timeout_ms: u64,
84 max_history: usize,
85}
86
87impl NodeMonitor {
88 pub fn new(timeout_ms: u64) -> Self {
93 Self {
94 nodes: HashMap::new(),
95 states: HashMap::new(),
96 heartbeats: HashMap::new(),
97 timeout_ms,
98 max_history: 100,
99 }
100 }
101
102 pub fn with_max_history(mut self, max_history: usize) -> Self {
104 self.max_history = max_history;
105 self
106 }
107
108 pub fn register(&mut self, node: NodeInfo) {
110 self.states.insert(node.id.clone(), NodeState::Alive);
111 self.heartbeats.entry(node.id.clone()).or_default();
112 self.nodes.insert(node.id.clone(), node);
113 }
114
115 pub fn record_heartbeat(&mut self, node_id: &str, received_at: u64, latency_ms: u64) -> bool {
120 if !self.nodes.contains_key(node_id) {
121 return false;
122 }
123 self.states.insert(node_id.to_string(), NodeState::Alive);
124 let records = self.heartbeats.entry(node_id.to_string()).or_default();
125 records.push(HeartbeatRecord {
126 node_id: node_id.to_string(),
127 received_at,
128 latency_ms,
129 });
130 if records.len() > self.max_history {
132 let drain_count = records.len() - self.max_history;
133 records.drain(..drain_count);
134 }
135 true
136 }
137
138 pub fn check_timeouts(&mut self, now: u64) -> Vec<String> {
143 let mut timed_out = Vec::new();
144 for (id, records) in &self.heartbeats {
145 let last_seen = records.last().map(|r| r.received_at).unwrap_or(0);
146 let elapsed = now.saturating_sub(last_seen);
147 if elapsed >= self.timeout_ms {
148 if self.states.get(id) != Some(&NodeState::Dead) {
149 timed_out.push(id.clone());
150 }
151 }
152 }
153 for id in &timed_out {
154 self.states.insert(id.clone(), NodeState::Dead);
155 }
156 timed_out
157 }
158
159 pub fn state(&self, node_id: &str) -> Option<&NodeState> {
161 self.states.get(node_id)
162 }
163
164 pub fn alive_nodes(&self) -> Vec<&NodeInfo> {
166 self.nodes
167 .values()
168 .filter(|n| self.states.get(&n.id) == Some(&NodeState::Alive))
169 .collect()
170 }
171
172 pub fn dead_nodes(&self) -> Vec<&NodeInfo> {
174 self.nodes
175 .values()
176 .filter(|n| self.states.get(&n.id) == Some(&NodeState::Dead))
177 .collect()
178 }
179
180 pub fn avg_latency(&self, node_id: &str) -> Option<f64> {
184 let records = self.heartbeats.get(node_id)?;
185 if records.is_empty() {
186 return None;
187 }
188 let sum: u64 = records.iter().map(|r| r.latency_ms).sum();
189 Some(sum as f64 / records.len() as f64)
190 }
191
192 pub fn node_count(&self) -> usize {
194 self.nodes.len()
195 }
196
197 pub fn remove(&mut self, node_id: &str) -> bool {
199 if self.nodes.remove(node_id).is_some() {
200 self.states.remove(node_id);
201 self.heartbeats.remove(node_id);
202 true
203 } else {
204 false
205 }
206 }
207
208 pub fn last_heartbeat(&self, node_id: &str) -> Option<&HeartbeatRecord> {
210 self.heartbeats.get(node_id)?.last()
211 }
212
213 pub fn heartbeat_count(&self, node_id: &str) -> usize {
215 self.heartbeats.get(node_id).map_or(0, |v| v.len())
216 }
217}
218
219#[cfg(test)]
222mod tests {
223 use super::*;
224
225 fn follower(id: &str) -> NodeInfo {
226 NodeInfo::new(id, "127.0.0.1:7000", NodeRole::Follower, 0)
227 }
228
229 fn monitor() -> NodeMonitor {
230 NodeMonitor::new(5000) }
232
233 #[test]
236 fn test_new_empty() {
237 let m = monitor();
238 assert_eq!(m.node_count(), 0);
239 }
240
241 #[test]
242 fn test_register_increments_count() {
243 let mut m = monitor();
244 m.register(follower("n1"));
245 assert_eq!(m.node_count(), 1);
246 }
247
248 #[test]
249 fn test_register_multiple_nodes() {
250 let mut m = monitor();
251 m.register(follower("n1"));
252 m.register(follower("n2"));
253 m.register(follower("n3"));
254 assert_eq!(m.node_count(), 3);
255 }
256
257 #[test]
258 fn test_register_overwrites_same_id() {
259 let mut m = monitor();
260 m.register(NodeInfo::new("n1", "addr1", NodeRole::Follower, 0));
261 m.register(NodeInfo::new("n1", "addr2", NodeRole::Leader, 100));
262 assert_eq!(m.node_count(), 1);
263 assert_eq!(m.nodes["n1"].address, "addr2");
264 }
265
266 #[test]
267 fn test_new_node_state_is_alive() {
268 let mut m = monitor();
269 m.register(follower("n1"));
270 assert_eq!(m.state("n1"), Some(&NodeState::Alive));
271 }
272
273 #[test]
276 fn test_record_heartbeat_returns_true_for_known_node() {
277 let mut m = monitor();
278 m.register(follower("n1"));
279 assert!(m.record_heartbeat("n1", 1000, 5));
280 }
281
282 #[test]
283 fn test_record_heartbeat_returns_false_for_unknown_node() {
284 let mut m = monitor();
285 assert!(!m.record_heartbeat("unknown", 1000, 5));
286 }
287
288 #[test]
289 fn test_record_heartbeat_sets_alive() {
290 let mut m = NodeMonitor::new(1000);
291 m.register(follower("n1"));
292 m.states.insert("n1".to_string(), NodeState::Dead); m.record_heartbeat("n1", 1000, 5);
294 assert_eq!(m.state("n1"), Some(&NodeState::Alive));
295 }
296
297 #[test]
298 fn test_record_heartbeat_increments_history() {
299 let mut m = monitor();
300 m.register(follower("n1"));
301 m.record_heartbeat("n1", 1000, 5);
302 m.record_heartbeat("n1", 2000, 6);
303 assert_eq!(m.heartbeat_count("n1"), 2);
304 }
305
306 #[test]
309 fn test_check_timeouts_no_timeout_within_window() {
310 let mut m = NodeMonitor::new(5000);
311 m.register(follower("n1"));
312 m.record_heartbeat("n1", 1000, 5);
313 let timed_out = m.check_timeouts(5999); assert!(timed_out.is_empty());
315 }
316
317 #[test]
318 fn test_check_timeouts_marks_dead() {
319 let mut m = NodeMonitor::new(5000);
320 m.register(follower("n1"));
321 m.record_heartbeat("n1", 1000, 5);
322 let timed_out = m.check_timeouts(6001); assert!(timed_out.contains(&"n1".to_string()));
324 assert_eq!(m.state("n1"), Some(&NodeState::Dead));
325 }
326
327 #[test]
328 fn test_check_timeouts_no_heartbeat_marks_dead() {
329 let mut m = NodeMonitor::new(5000);
330 m.register(follower("n1"));
331 let timed_out = m.check_timeouts(5000);
333 assert!(timed_out.contains(&"n1".to_string()));
334 }
335
336 #[test]
337 fn test_check_timeouts_already_dead_not_returned_again() {
338 let mut m = NodeMonitor::new(5000);
339 m.register(follower("n1"));
340 m.check_timeouts(6000); let timed_out2 = m.check_timeouts(7000); assert!(!timed_out2.contains(&"n1".to_string()));
343 }
344
345 #[test]
348 fn test_alive_nodes_all_alive() {
349 let mut m = monitor();
350 m.register(follower("n1"));
351 m.register(follower("n2"));
352 assert_eq!(m.alive_nodes().len(), 2);
353 }
354
355 #[test]
356 fn test_dead_nodes_empty_initially() {
357 let mut m = monitor();
358 m.register(follower("n1"));
359 assert_eq!(m.dead_nodes().len(), 0);
360 }
361
362 #[test]
363 fn test_alive_dead_after_timeout() {
364 let mut m = NodeMonitor::new(1000);
365 m.register(follower("n1"));
366 m.register(follower("n2"));
367 m.record_heartbeat("n1", 0, 5);
368 m.record_heartbeat("n2", 0, 5);
369 m.check_timeouts(1001); assert_eq!(m.dead_nodes().len(), 2);
371 assert_eq!(m.alive_nodes().len(), 0);
372 }
373
374 #[test]
377 fn test_avg_latency_none_for_no_history() {
378 let mut m = monitor();
379 m.register(follower("n1"));
380 assert!(m.avg_latency("n1").is_none());
381 }
382
383 #[test]
384 fn test_avg_latency_single_record() {
385 let mut m = monitor();
386 m.register(follower("n1"));
387 m.record_heartbeat("n1", 1000, 10);
388 assert_eq!(m.avg_latency("n1"), Some(10.0));
389 }
390
391 #[test]
392 fn test_avg_latency_multiple_records() {
393 let mut m = monitor();
394 m.register(follower("n1"));
395 m.record_heartbeat("n1", 1000, 10);
396 m.record_heartbeat("n1", 2000, 20);
397 m.record_heartbeat("n1", 3000, 30);
398 assert!((m.avg_latency("n1").unwrap() - 20.0).abs() < 0.001);
400 }
401
402 #[test]
403 fn test_avg_latency_unknown_node_none() {
404 let m = monitor();
405 assert!(m.avg_latency("unknown").is_none());
406 }
407
408 #[test]
411 fn test_remove_existing_returns_true() {
412 let mut m = monitor();
413 m.register(follower("n1"));
414 assert!(m.remove("n1"));
415 }
416
417 #[test]
418 fn test_remove_decrements_count() {
419 let mut m = monitor();
420 m.register(follower("n1"));
421 m.remove("n1");
422 assert_eq!(m.node_count(), 0);
423 }
424
425 #[test]
426 fn test_remove_missing_returns_false() {
427 let mut m = monitor();
428 assert!(!m.remove("nobody"));
429 }
430
431 #[test]
432 fn test_remove_clears_state() {
433 let mut m = monitor();
434 m.register(follower("n1"));
435 m.remove("n1");
436 assert_eq!(m.state("n1"), None);
437 }
438
439 #[test]
442 fn test_leader_role_preserved() {
443 let mut m = monitor();
444 m.register(NodeInfo::new(
445 "leader",
446 "10.0.0.1:7000",
447 NodeRole::Leader,
448 0,
449 ));
450 assert_eq!(m.nodes["leader"].role, NodeRole::Leader);
451 }
452
453 #[test]
454 fn test_observer_role_preserved() {
455 let mut m = monitor();
456 m.register(NodeInfo::new("obs", "10.0.0.2:7000", NodeRole::Observer, 0));
457 assert_eq!(m.nodes["obs"].role, NodeRole::Observer);
458 }
459
460 #[test]
463 fn test_last_heartbeat_none_if_no_history() {
464 let mut m = monitor();
465 m.register(follower("n1"));
466 assert!(m.last_heartbeat("n1").is_none());
467 }
468
469 #[test]
470 fn test_last_heartbeat_returns_most_recent() {
471 let mut m = monitor();
472 m.register(follower("n1"));
473 m.record_heartbeat("n1", 1000, 5);
474 m.record_heartbeat("n1", 2000, 7);
475 let hb = m.last_heartbeat("n1").expect("should have record");
476 assert_eq!(hb.received_at, 2000);
477 }
478
479 #[test]
482 fn test_heartbeat_count_zero_initially() {
483 let mut m = monitor();
484 m.register(follower("n1"));
485 assert_eq!(m.heartbeat_count("n1"), 0);
486 }
487
488 #[test]
489 fn test_heartbeat_count_increments() {
490 let mut m = monitor();
491 m.register(follower("n1"));
492 m.record_heartbeat("n1", 1000, 5);
493 m.record_heartbeat("n1", 2000, 5);
494 assert_eq!(m.heartbeat_count("n1"), 2);
495 }
496
497 #[test]
498 fn test_max_history_trims_records() {
499 let mut m = NodeMonitor::new(5000).with_max_history(3);
500 m.register(follower("n1"));
501 for i in 0..10u64 {
502 m.record_heartbeat("n1", i * 100, 5);
503 }
504 assert_eq!(m.heartbeat_count("n1"), 3);
505 }
506
507 #[test]
510 fn test_node_info_fields() {
511 let n = NodeInfo::new("id1", "10.0.0.1:7000", NodeRole::Candidate, 42);
512 assert_eq!(n.id, "id1");
513 assert_eq!(n.address, "10.0.0.1:7000");
514 assert_eq!(n.role, NodeRole::Candidate);
515 assert_eq!(n.joined_at, 42);
516 }
517
518 #[test]
519 fn test_node_state_suspected() {
520 let state = NodeState::Suspected;
521 assert_eq!(state, NodeState::Suspected);
522 }
523
524 #[test]
525 fn test_heartbeat_record_fields() {
526 let hb = HeartbeatRecord {
527 node_id: "n1".to_string(),
528 received_at: 999,
529 latency_ms: 12,
530 };
531 assert_eq!(hb.node_id, "n1");
532 assert_eq!(hb.received_at, 999);
533 assert_eq!(hb.latency_ms, 12);
534 }
535
536 #[test]
537 fn test_avg_latency_zero_latency() {
538 let mut m = monitor();
539 m.register(follower("n1"));
540 m.record_heartbeat("n1", 1000, 0);
541 assert_eq!(m.avg_latency("n1"), Some(0.0));
542 }
543
544 #[test]
545 fn test_check_timeouts_at_exact_boundary() {
546 let mut m = NodeMonitor::new(5000);
547 m.register(follower("n1"));
548 m.record_heartbeat("n1", 1000, 5);
549 let timed_out = m.check_timeouts(6000);
551 assert!(timed_out.contains(&"n1".to_string()));
552 }
553
554 #[test]
555 fn test_multiple_registrations_independent() {
556 let mut m = monitor();
557 m.register(NodeInfo::new("a", "addr_a", NodeRole::Leader, 0));
558 m.register(NodeInfo::new("b", "addr_b", NodeRole::Follower, 0));
559 assert_eq!(m.node_count(), 2);
560 assert!(m.state("a").is_some());
561 assert!(m.state("b").is_some());
562 }
563
564 #[test]
565 fn test_remove_then_re_register() {
566 let mut m = monitor();
567 m.register(follower("n1"));
568 m.remove("n1");
569 m.register(NodeInfo::new("n1", "new_addr", NodeRole::Leader, 100));
570 assert_eq!(m.node_count(), 1);
571 }
572
573 #[test]
574 fn test_alive_nodes_excludes_dead() {
575 let mut m = NodeMonitor::new(1000);
576 m.register(follower("n1"));
577 m.register(follower("n2"));
578 m.record_heartbeat("n1", 0, 5);
579 m.record_heartbeat("n2", 0, 5);
580 m.check_timeouts(1001); assert_eq!(m.alive_nodes().len(), 0);
582 }
583
584 #[test]
585 fn test_check_timeouts_partial() {
586 let mut m = NodeMonitor::new(5000);
587 m.register(follower("n1"));
588 m.register(follower("n2"));
589 m.record_heartbeat("n1", 1000, 5); m.record_heartbeat("n2", 5000, 5); let timed_out = m.check_timeouts(6001); assert!(timed_out.contains(&"n1".to_string()));
593 assert!(!timed_out.contains(&"n2".to_string()));
594 }
595
596 #[test]
597 fn test_heartbeat_count_unknown_node() {
598 let m = monitor();
599 assert_eq!(m.heartbeat_count("ghost"), 0);
600 }
601
602 #[test]
603 fn test_state_unknown_node_none() {
604 let m = monitor();
605 assert_eq!(m.state("unknown"), None);
606 }
607
608 #[test]
609 fn test_last_heartbeat_unknown_node_none() {
610 let m = monitor();
611 assert_eq!(m.last_heartbeat("unknown"), None);
612 }
613
614 #[test]
615 fn test_node_monitor_new_timeout() {
616 let m = NodeMonitor::new(3000);
617 assert_eq!(m.timeout_ms, 3000);
618 }
619
620 #[test]
621 fn test_record_heartbeat_for_multiple_nodes() {
622 let mut m = monitor();
623 m.register(follower("n1"));
624 m.register(follower("n2"));
625 assert!(m.record_heartbeat("n1", 1000, 5));
626 assert!(m.record_heartbeat("n2", 2000, 10));
627 assert_eq!(m.heartbeat_count("n1"), 1);
628 assert_eq!(m.heartbeat_count("n2"), 1);
629 }
630}