noxu-rep 7.2.1

Replication and high availability for Noxu DB
Documentation
//! Tests for `PhiAccrualDetector` and its integration with `MasterTracker`.

use std::thread;
use std::time::Duration;

use noxu_rep::PhiAccrualDetector;
use noxu_rep::elections::MasterTracker;

// ---------------------------------------------------------------------------
// PhiAccrualDetector unit tests
// ---------------------------------------------------------------------------

#[test]
fn test_phi_zero_with_insufficient_samples() {
    // With fewer than 2 inter-arrival samples the detector returns phi=0.0
    // and considers the process available.
    let det = PhiAccrualDetector::new(8.0, 200);
    assert_eq!(det.phi(), 0.0, "phi must be 0.0 with no samples");
    assert!(det.is_available(), "must be available with no samples");

    // One heartbeat recorded but still only 0 inter-arrival samples.
    det.record_heartbeat();
    assert_eq!(
        det.phi(),
        0.0,
        "phi must be 0.0 after first heartbeat (no interval yet)"
    );
    assert!(det.is_available());
}

#[test]
fn test_phi_rises_over_time_without_heartbeat() {
    // Populate the window with regular 10 ms heartbeats.
    let det = PhiAccrualDetector::new(8.0, 200);
    for _ in 0..20 {
        det.record_heartbeat();
        thread::sleep(Duration::from_millis(10));
    }
    // Record a fresh heartbeat so last_heartbeat is current regardless of
    // OS scheduling jitter during the loop's final sleep.
    det.record_heartbeat();
    // After the heartbeats stop, phi should be zero or low immediately.
    let phi_now = det.phi();
    assert!(
        phi_now < 8.0,
        "phi must be below threshold right after last heartbeat (got {phi_now:.4})"
    );

    // Wait 3× the mean interval (≈ 30 ms) and check that phi has risen.
    thread::sleep(Duration::from_millis(80));
    let phi_later = det.phi();
    assert!(
        phi_later > phi_now,
        "phi must rise over time without heartbeats (before={phi_now:.4} after={phi_later:.4})"
    );
}

#[test]
fn test_phi_resets_after_heartbeat() {
    let det = PhiAccrualDetector::new(8.0, 200);
    for _ in 0..20 {
        det.record_heartbeat();
        thread::sleep(Duration::from_millis(10));
    }
    // Let phi build up.
    thread::sleep(Duration::from_millis(50));
    let phi_before = det.phi();

    // A new heartbeat resets the last-seen time so phi drops immediately.
    det.record_heartbeat();
    let phi_after = det.phi();
    assert!(
        phi_after < phi_before,
        "phi must drop after a heartbeat (before={phi_before:.4} after={phi_after:.4})"
    );
}

#[test]
fn test_is_available_below_threshold() {
    let threshold = 8.0_f64;
    let det = PhiAccrualDetector::new(threshold, 200);
    assert_eq!(det.threshold(), threshold);

    // Populate the window then check immediately — phi should be well below 8.0.
    for _ in 0..20 {
        det.record_heartbeat();
        thread::sleep(Duration::from_millis(10));
    }
    det.record_heartbeat(); // final heartbeat
    assert!(
        det.is_available(),
        "must be available immediately after last heartbeat"
    );
}

#[test]
fn test_suspected_above_threshold() {
    // Use a very low threshold (1.0) so we can reliably cross it in a short
    // test without very long sleeps.
    let det = PhiAccrualDetector::new(1.0, 50);

    // Establish a mean of 10 ms.
    for _ in 0..25 {
        det.record_heartbeat();
        thread::sleep(Duration::from_millis(10));
    }

    // Wait 150 ms (≈ 15× mean) — phi should far exceed 1.0.
    thread::sleep(Duration::from_millis(150));
    let phi = det.phi();
    assert!(
        phi >= 1.0,
        "phi must exceed threshold=1.0 after 150 ms silence (got {phi:.4})"
    );
    assert!(
        !det.is_available(),
        "process must be suspected after 150 ms silence"
    );
}

// ---------------------------------------------------------------------------
// MasterTracker + phi integration
// ---------------------------------------------------------------------------

#[test]
fn test_master_tracker_phi_mode() {
    let det = PhiAccrualDetector::new(1.0, 50);
    let tracker = MasterTracker::new(Duration::from_secs(10)).with_phi(det);

    tracker.set_master("node1", 1);

    // Send heartbeats to populate the phi detector's window.  Use 30 samples
    // (instead of 20) so a single GC-pause-style outlier cannot dominate the
    // computed stddev.  Wave 9-A fix 3 reduced flake; the v2.4.1 fix below
    // removes the remaining ~20% miss source by dropping the racy
    // "immediately-alive" assertion entirely.
    for _ in 0..30 {
        tracker.record_heartbeat();
        thread::sleep(Duration::from_millis(10));
    }

    // NOTE: the previous version of this test asserted
    // `tracker.is_master_alive()` here, immediately after the heartbeat loop.
    // That assertion is fundamentally racy: phi is computed from
    // `last_heartbeat.elapsed()`, so any scheduler delay between the final
    // `record_heartbeat()` call and the `is_master_alive()` check (which on
    // dev machines under workspace test load can hit ~5–20 ms) inflates the
    // observed elapsed time vs. the ~10 ms training mean and pushes phi
    // briefly above the 1.0 threshold — a transient, expected condition
    // that does not indicate an actual master failure.  The deterministic
    // alive-after-heartbeats invariant is exercised by the unit tests in
    // `master_tracker.rs` and `phi_accrual.rs` with controlled clocks.
    // Here we only test the failure-detection path (the second half), which
    // is monotonic and timing-robust.

    // Silence: phi rises monotonically as `elapsed = last.elapsed()` grows.
    // Poll `is_master_alive()` over an extended window so the assertion
    // succeeds as soon as phi has grown past the threshold, and only fails
    // if it never does.
    let deadline = std::time::Instant::now() + Duration::from_secs(3);
    let mut suspected = false;
    let mut last_phi = 0.0_f64;
    while std::time::Instant::now() < deadline {
        thread::sleep(Duration::from_millis(50));
        last_phi = tracker.phi().unwrap_or(0.0);
        if !tracker.is_master_alive() {
            suspected = true;
            break;
        }
    }
    assert!(
        suspected,
        "master must be suspected after extended silence; \
         last observed phi={last_phi:.4} (threshold=1.0)",
    );

    // phi() accessor returns Some(phi_value) in phi mode.
    let phi_val = tracker.phi();
    assert!(
        phi_val.is_some(),
        "tracker.phi() must return Some(...) in phi mode"
    );
    assert!(
        phi_val.unwrap() >= 1.0,
        "phi value must exceed threshold after silence (got {:.4})",
        phi_val.unwrap(),
    );
}

// ---------------------------------------------------------------------------
// Adaptive timeout tests
// ---------------------------------------------------------------------------

#[test]
fn test_suggested_timeout_adapts_to_high_latency() {
    let det = PhiAccrualDetector::new(8.0, 200);
    // Simulate 80ms mean inter-arrival
    for _ in 0..20 {
        thread::sleep(Duration::from_millis(80));
        det.record_heartbeat();
    }
    let timeout = det.suggested_phase_timeout(3.0, Duration::from_millis(500));
    // Should be >= 50ms floor (mean ~80ms + 3*sigma >= 80ms) and <= 5s
    // Must differ from fallback (500ms) — proves adaptation is working
    assert!(
        timeout >= Duration::from_millis(50),
        "timeout={:?} must be >= 50ms floor",
        timeout
    );
    assert!(timeout <= Duration::from_secs(5));
    assert_ne!(
        timeout,
        Duration::from_millis(500),
        "timeout should differ from fallback"
    );
}

#[test]
fn test_suggested_timeout_falls_back_without_samples() {
    let det = PhiAccrualDetector::new(8.0, 200);
    let fallback = Duration::from_millis(500);
    let timeout = det.suggested_phase_timeout(3.0, fallback);
    assert_eq!(timeout, fallback, "should return fallback with no samples");
}

#[test]
fn test_suggested_timeout_floor_clamp() {
    let det = PhiAccrualDetector::new(8.0, 200);
    // Very fast heartbeats (1ms) — timeout should be clamped to 50ms floor
    for _ in 0..30 {
        thread::sleep(Duration::from_millis(1));
        det.record_heartbeat();
    }
    let timeout = det.suggested_phase_timeout(3.0, Duration::from_millis(500));
    assert!(
        timeout >= Duration::from_millis(50),
        "timeout={:?} must be >= 50ms floor",
        timeout
    );
}

#[test]
fn test_mean_and_stddev_interval() {
    let det = PhiAccrualDetector::new(8.0, 200);
    // No samples yet
    assert_eq!(det.mean_interval(), None);
    assert_eq!(det.stddev_interval(), None);

    // Populate with regular ~50ms heartbeats
    for _ in 0..15 {
        thread::sleep(Duration::from_millis(50));
        det.record_heartbeat();
    }
    let mean = det.mean_interval().unwrap();
    let stddev = det.stddev_interval().unwrap();
    // Mean should be approximately 50ms (0.05s), allow wide margin for CI
    assert!(mean > 0.03 && mean < 0.15, "mean={mean}");
    // Stddev should be small relative to mean
    assert!(stddev < mean, "stddev={stddev} should be < mean={mean}");
}

#[test]
fn test_master_tracker_timeout_mode_unchanged() {
    // Without a phi detector, the tracker falls back to binary heartbeat timeout.
    let timeout = Duration::from_millis(100);
    let tracker = MasterTracker::new(timeout);

    tracker.set_master("node2", 1);
    tracker.record_heartbeat();

    // Immediately alive.
    assert!(tracker.is_master_alive(), "must be alive right after heartbeat");

    // phi() returns None in timeout mode.
    assert_eq!(
        tracker.phi(),
        None,
        "phi() must be None in binary timeout mode"
    );

    // After 150 ms (> timeout), master should be considered dead.
    thread::sleep(Duration::from_millis(150));
    assert!(
        !tracker.is_master_alive(),
        "master must be dead after timeout elapses"
    );
}