1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
//! Heartbeat-based failure detection for cluster peers.
//!
//! [`FailureDetector`] tracks the last heartbeat timestamp for each peer and
//! emits [`FailureEvent`]s when peers time out or recover.
use std::collections::HashMap;
use std::time::{Duration, Instant};
use crate::error::{RaftError, RaftResult};
use crate::types::{FailureEvent, HeartbeatConfig, NodeId};
/// Per-peer tracking state maintained by the failure detector.
#[derive(Debug)]
struct PeerState {
/// When the most recent heartbeat was received from this peer.
last_heartbeat: Instant,
/// Monotonically increasing count of consecutive missed heartbeat rounds.
missed_count: u32,
/// Whether this peer is currently considered to have failed.
is_failed: bool,
}
impl PeerState {
fn new(now: Instant) -> Self {
Self {
last_heartbeat: now,
missed_count: 0,
is_failed: false,
}
}
}
/// Heartbeat-based peer failure detector.
///
/// Call [`FailureDetector::record_heartbeat`] each time a heartbeat (or any
/// message) is received from a peer. Call
/// [`FailureDetector::check_timeouts`] periodically (e.g., once per
/// heartbeat interval) to receive a list of [`FailureEvent`]s for peers that
/// have timed out or recovered since the last check.
#[derive(Debug)]
pub struct FailureDetector {
/// Configuration for heartbeat intervals and timeouts.
config: HeartbeatConfig,
/// This node's own ID (never tracked as a peer).
self_id: NodeId,
/// Per-peer tracking state.
peers: HashMap<NodeId, PeerState>,
}
impl FailureDetector {
/// Create a new failure detector.
///
/// # Arguments
/// * `config` — heartbeat timing configuration
/// * `self_id` — this node's ID; it will never be added to peer tracking
pub fn new(config: HeartbeatConfig, self_id: NodeId) -> Self {
Self {
config,
self_id,
peers: HashMap::new(),
}
}
/// Begin tracking a new peer.
///
/// Initialises the peer with `last_heartbeat = now` so it does not
/// immediately time out.
///
/// Returns an error if `peer_id == self_id`.
pub fn track_peer(&mut self, peer_id: NodeId) -> RaftResult<()> {
if peer_id == self.self_id {
return Err(RaftError::StorageError {
message: format!(
"Cannot track self (node {}) as a peer in FailureDetector",
peer_id
),
});
}
self.peers
.entry(peer_id)
.or_insert_with(|| PeerState::new(Instant::now()));
Ok(())
}
/// Stop tracking a peer and remove its state.
pub fn remove_peer(&mut self, peer_id: NodeId) {
self.peers.remove(&peer_id);
}
/// Record that a heartbeat (or any live message) was received from `peer_id`.
///
/// Resets the missed counter and marks the peer alive. If the peer was
/// not previously tracked it is added automatically.
///
/// Returns an error if `peer_id == self_id`.
pub fn record_heartbeat(&mut self, peer_id: NodeId) -> RaftResult<()> {
if peer_id == self.self_id {
return Err(RaftError::StorageError {
message: format!("Cannot record heartbeat from self (node {})", peer_id),
});
}
let state = self
.peers
.entry(peer_id)
.or_insert_with(|| PeerState::new(Instant::now()));
state.last_heartbeat = Instant::now();
state.missed_count = 0;
// Note: is_failed is intentionally NOT cleared here.
// Recovery is detected by check_timeouts() when it sees
// elapsed < timeout with is_failed still set.
Ok(())
}
/// Inspect all tracked peers and return any failure / recovery events.
///
/// A peer is considered failed when the elapsed time since its last
/// heartbeat exceeds `config.timeout_ms` **and** its `missed_count`
/// reaches `config.max_missed`. A previously-failed peer that now
/// satisfies the aliveness condition emits a [`FailureEvent::NodeRecovered`].
///
/// This method updates internal state (missed counters, failed flags) in
/// place.
pub fn check_timeouts(&mut self) -> RaftResult<Vec<FailureEvent>> {
let now = Instant::now();
let timeout = Duration::from_millis(self.config.timeout_ms);
let max_missed = self.config.max_missed;
let mut events = Vec::new();
for (&peer_id, state) in self.peers.iter_mut() {
let elapsed = now.duration_since(state.last_heartbeat);
if elapsed >= timeout {
// Peer has not sent a heartbeat within the timeout window
state.missed_count = state.missed_count.saturating_add(1);
if state.missed_count >= max_missed && !state.is_failed {
state.is_failed = true;
events.push(FailureEvent::NodeFailed {
node_id: peer_id,
missed_count: state.missed_count,
last_seen_ago_ms: elapsed.as_millis() as u64,
});
}
} else if state.is_failed {
// Peer has recovered
state.is_failed = false;
state.missed_count = 0;
events.push(FailureEvent::NodeRecovered { node_id: peer_id });
}
}
Ok(events)
}
/// Return the IDs of all peers currently considered failed.
pub fn failed_peers(&self) -> Vec<NodeId> {
self.peers
.iter()
.filter(|(_, s)| s.is_failed)
.map(|(&id, _)| id)
.collect()
}
/// Return the IDs of all peers currently considered alive.
pub fn alive_peers(&self) -> Vec<NodeId> {
self.peers
.iter()
.filter(|(_, s)| !s.is_failed)
.map(|(&id, _)| id)
.collect()
}
/// Reset all peer states (missed counters cleared, all marked alive).
///
/// Useful when this node becomes a new leader and stale failure data
/// should be discarded.
pub fn reset_all(&mut self) {
let now = Instant::now();
for state in self.peers.values_mut() {
state.last_heartbeat = now;
state.missed_count = 0;
state.is_failed = false;
}
}
/// Return the number of currently tracked peers.
pub fn peer_count(&self) -> usize {
self.peers.len()
}
}
#[cfg(test)]
mod tests {
use std::thread;
use super::*;
fn make_detector() -> FailureDetector {
let config = HeartbeatConfig {
interval_ms: 10,
timeout_ms: 50,
max_missed: 2,
};
FailureDetector::new(config, 1)
}
#[test]
fn test_track_peer_and_record_heartbeat() {
let mut d = make_detector();
d.track_peer(2).expect("track ok");
d.record_heartbeat(2).expect("heartbeat ok");
assert_eq!(d.peer_count(), 1);
assert!(d.alive_peers().contains(&2));
}
#[test]
fn test_track_self_is_error() {
let mut d = make_detector();
let r = d.track_peer(1); // self_id is 1
assert!(r.is_err());
}
#[test]
fn test_failure_detection_on_timeout() {
let mut d = make_detector(); // timeout_ms=50, max_missed=2
d.track_peer(2).expect("track ok");
// Sleep beyond timeout
thread::sleep(Duration::from_millis(60));
// First check: missed_count=1 (< max_missed=2), no failure yet
let events = d.check_timeouts().expect("check ok");
assert!(
events.is_empty(),
"First check should not declare failure yet"
);
// Sleep again and check
thread::sleep(Duration::from_millis(60));
let events = d.check_timeouts().expect("check ok");
let failed: Vec<_> = events
.iter()
.filter(|e| matches!(e, FailureEvent::NodeFailed { .. }))
.collect();
assert!(
!failed.is_empty(),
"Should declare failure after max_missed exceeded"
);
assert!(d.failed_peers().contains(&2));
}
#[test]
fn test_recovery_after_failure() {
let mut d = make_detector();
d.track_peer(3).expect("track ok");
// Force the peer into failed state by sleeping past timeout twice
thread::sleep(Duration::from_millis(60));
d.check_timeouts().expect("check 1 ok");
thread::sleep(Duration::from_millis(60));
d.check_timeouts().expect("check 2 ok");
assert!(d.failed_peers().contains(&3), "Peer 3 should be failed");
// Send a heartbeat — peer recovers
d.record_heartbeat(3).expect("heartbeat ok");
let events = d.check_timeouts().expect("check ok");
let recovered: Vec<_> = events
.iter()
.filter(|e| matches!(e, FailureEvent::NodeRecovered { node_id: 3 }))
.collect();
assert!(!recovered.is_empty(), "Should emit NodeRecovered event");
assert!(d.alive_peers().contains(&3));
}
#[test]
fn test_reset_all_clears_failure_state() {
let mut d = make_detector();
d.track_peer(2).expect("track ok");
d.track_peer(3).expect("track ok");
thread::sleep(Duration::from_millis(60));
d.check_timeouts().expect("ok");
thread::sleep(Duration::from_millis(60));
d.check_timeouts().expect("ok");
assert!(!d.failed_peers().is_empty(), "Some peers should be failed");
d.reset_all();
assert!(
d.failed_peers().is_empty(),
"reset_all should clear all failures"
);
assert_eq!(d.alive_peers().len(), 2);
}
#[test]
fn test_auto_track_via_heartbeat() {
let mut d = make_detector();
// Peer not tracked yet, record_heartbeat should auto-track it
d.record_heartbeat(5).expect("auto-track ok");
assert_eq!(d.peer_count(), 1);
assert!(d.alive_peers().contains(&5));
}
#[test]
fn test_remove_peer() {
let mut d = make_detector();
d.track_peer(2).expect("track ok");
d.remove_peer(2);
assert_eq!(d.peer_count(), 0);
}
}