1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
//! Tests for `PhiAccrualDetector` and its integration with `MasterTracker`.
use std::thread;
use std::time::Duration;
use noxu_rep::PhiAccrualDetector;
use noxu_rep::elections::MasterTracker;
// ---------------------------------------------------------------------------
// PhiAccrualDetector unit tests
// ---------------------------------------------------------------------------
#[test]
fn test_phi_zero_with_insufficient_samples() {
// With fewer than 2 inter-arrival samples the detector returns phi=0.0
// and considers the process available.
let det = PhiAccrualDetector::new(8.0, 200);
assert_eq!(det.phi(), 0.0, "phi must be 0.0 with no samples");
assert!(det.is_available(), "must be available with no samples");
// One heartbeat recorded but still only 0 inter-arrival samples.
det.record_heartbeat();
assert_eq!(
det.phi(),
0.0,
"phi must be 0.0 after first heartbeat (no interval yet)"
);
assert!(det.is_available());
}
#[test]
fn test_phi_rises_over_time_without_heartbeat() {
// Populate the window with regular 10 ms heartbeats.
let det = PhiAccrualDetector::new(8.0, 200);
for _ in 0..20 {
det.record_heartbeat();
thread::sleep(Duration::from_millis(10));
}
// Record a fresh heartbeat so last_heartbeat is current regardless of
// OS scheduling jitter during the loop's final sleep.
det.record_heartbeat();
// After the heartbeats stop, phi should be zero or low immediately.
let phi_now = det.phi();
assert!(
phi_now < 8.0,
"phi must be below threshold right after last heartbeat (got {phi_now:.4})"
);
// Wait 3× the mean interval (≈ 30 ms) and check that phi has risen.
thread::sleep(Duration::from_millis(80));
let phi_later = det.phi();
assert!(
phi_later > phi_now,
"phi must rise over time without heartbeats (before={phi_now:.4} after={phi_later:.4})"
);
}
#[test]
fn test_phi_resets_after_heartbeat() {
let det = PhiAccrualDetector::new(8.0, 200);
for _ in 0..20 {
det.record_heartbeat();
thread::sleep(Duration::from_millis(10));
}
// Let phi build up.
thread::sleep(Duration::from_millis(50));
let phi_before = det.phi();
// A new heartbeat resets the last-seen time so phi drops immediately.
det.record_heartbeat();
let phi_after = det.phi();
assert!(
phi_after < phi_before,
"phi must drop after a heartbeat (before={phi_before:.4} after={phi_after:.4})"
);
}
#[test]
fn test_is_available_below_threshold() {
let threshold = 8.0_f64;
let det = PhiAccrualDetector::new(threshold, 200);
assert_eq!(det.threshold(), threshold);
// Populate the window then check immediately — phi should be well below 8.0.
for _ in 0..20 {
det.record_heartbeat();
thread::sleep(Duration::from_millis(10));
}
det.record_heartbeat(); // final heartbeat
assert!(
det.is_available(),
"must be available immediately after last heartbeat"
);
}
#[test]
fn test_suspected_above_threshold() {
// Use a very low threshold (1.0) so we can reliably cross it in a short
// test without very long sleeps.
let det = PhiAccrualDetector::new(1.0, 50);
// Establish a mean of 10 ms.
for _ in 0..25 {
det.record_heartbeat();
thread::sleep(Duration::from_millis(10));
}
// Wait 150 ms (≈ 15× mean) — phi should far exceed 1.0.
thread::sleep(Duration::from_millis(150));
let phi = det.phi();
assert!(
phi >= 1.0,
"phi must exceed threshold=1.0 after 150 ms silence (got {phi:.4})"
);
assert!(
!det.is_available(),
"process must be suspected after 150 ms silence"
);
}
// ---------------------------------------------------------------------------
// MasterTracker + phi integration
// ---------------------------------------------------------------------------
#[test]
fn test_master_tracker_phi_mode() {
let det = PhiAccrualDetector::new(1.0, 50);
let tracker = MasterTracker::new(Duration::from_secs(10)).with_phi(det);
tracker.set_master("node1", 1);
// Send heartbeats to populate the phi detector's window. Use 30 samples
// (instead of 20) so a single GC-pause-style outlier cannot dominate the
// computed stddev. Wave 9-A fix 3 reduced flake; the v2.4.1 fix below
// removes the remaining ~20% miss source by dropping the racy
// "immediately-alive" assertion entirely.
for _ in 0..30 {
tracker.record_heartbeat();
thread::sleep(Duration::from_millis(10));
}
// NOTE: the previous version of this test asserted
// `tracker.is_master_alive()` here, immediately after the heartbeat loop.
// That assertion is fundamentally racy: phi is computed from
// `last_heartbeat.elapsed()`, so any scheduler delay between the final
// `record_heartbeat()` call and the `is_master_alive()` check (which on
// dev machines under workspace test load can hit ~5–20 ms) inflates the
// observed elapsed time vs. the ~10 ms training mean and pushes phi
// briefly above the 1.0 threshold — a transient, expected condition
// that does not indicate an actual master failure. The deterministic
// alive-after-heartbeats invariant is exercised by the unit tests in
// `master_tracker.rs` and `phi_accrual.rs` with controlled clocks.
// Here we only test the failure-detection path (the second half), which
// is monotonic and timing-robust.
// Silence: phi rises monotonically as `elapsed = last.elapsed()` grows.
// Poll `is_master_alive()` over an extended window so the assertion
// succeeds as soon as phi has grown past the threshold, and only fails
// if it never does.
let deadline = std::time::Instant::now() + Duration::from_secs(3);
let mut suspected = false;
let mut last_phi = 0.0_f64;
while std::time::Instant::now() < deadline {
thread::sleep(Duration::from_millis(50));
last_phi = tracker.phi().unwrap_or(0.0);
if !tracker.is_master_alive() {
suspected = true;
break;
}
}
assert!(
suspected,
"master must be suspected after extended silence; \
last observed phi={last_phi:.4} (threshold=1.0)",
);
// phi() accessor returns Some(phi_value) in phi mode.
let phi_val = tracker.phi();
assert!(
phi_val.is_some(),
"tracker.phi() must return Some(...) in phi mode"
);
assert!(
phi_val.unwrap() >= 1.0,
"phi value must exceed threshold after silence (got {:.4})",
phi_val.unwrap(),
);
}
// ---------------------------------------------------------------------------
// Adaptive timeout tests
// ---------------------------------------------------------------------------
#[test]
fn test_suggested_timeout_adapts_to_high_latency() {
let det = PhiAccrualDetector::new(8.0, 200);
// Simulate 80ms mean inter-arrival
for _ in 0..20 {
thread::sleep(Duration::from_millis(80));
det.record_heartbeat();
}
let timeout = det.suggested_phase_timeout(3.0, Duration::from_millis(500));
// Should be >= 50ms floor (mean ~80ms + 3*sigma >= 80ms) and <= 5s
// Must differ from fallback (500ms) — proves adaptation is working
assert!(
timeout >= Duration::from_millis(50),
"timeout={:?} must be >= 50ms floor",
timeout
);
assert!(timeout <= Duration::from_secs(5));
assert_ne!(
timeout,
Duration::from_millis(500),
"timeout should differ from fallback"
);
}
#[test]
fn test_suggested_timeout_falls_back_without_samples() {
let det = PhiAccrualDetector::new(8.0, 200);
let fallback = Duration::from_millis(500);
let timeout = det.suggested_phase_timeout(3.0, fallback);
assert_eq!(timeout, fallback, "should return fallback with no samples");
}
#[test]
fn test_suggested_timeout_floor_clamp() {
let det = PhiAccrualDetector::new(8.0, 200);
// Very fast heartbeats (1ms) — timeout should be clamped to 50ms floor
for _ in 0..30 {
thread::sleep(Duration::from_millis(1));
det.record_heartbeat();
}
let timeout = det.suggested_phase_timeout(3.0, Duration::from_millis(500));
assert!(
timeout >= Duration::from_millis(50),
"timeout={:?} must be >= 50ms floor",
timeout
);
}
#[test]
fn test_mean_and_stddev_interval() {
let det = PhiAccrualDetector::new(8.0, 200);
// No samples yet
assert_eq!(det.mean_interval(), None);
assert_eq!(det.stddev_interval(), None);
// Populate with regular ~50ms heartbeats
for _ in 0..15 {
thread::sleep(Duration::from_millis(50));
det.record_heartbeat();
}
let mean = det.mean_interval().unwrap();
let stddev = det.stddev_interval().unwrap();
// Mean should be approximately 50ms (0.05s), allow wide margin for CI
assert!(mean > 0.03 && mean < 0.15, "mean={mean}");
// Stddev should be small relative to mean
assert!(stddev < mean, "stddev={stddev} should be < mean={mean}");
}
#[test]
fn test_master_tracker_timeout_mode_unchanged() {
// Without a phi detector, the tracker falls back to binary heartbeat timeout.
let timeout = Duration::from_millis(100);
let tracker = MasterTracker::new(timeout);
tracker.set_master("node2", 1);
tracker.record_heartbeat();
// Immediately alive.
assert!(tracker.is_master_alive(), "must be alive right after heartbeat");
// phi() returns None in timeout mode.
assert_eq!(
tracker.phi(),
None,
"phi() must be None in binary timeout mode"
);
// After 150 ms (> timeout), master should be considered dead.
thread::sleep(Duration::from_millis(150));
assert!(
!tracker.is_master_alive(),
"master must be dead after timeout elapses"
);
}