noxu-log 7.0.0

Log-structured storage engine for Noxu DB
Documentation
// Copyright (C) 2024-2025 Greg Burd.  Licensed under either of the
// Apache License, Version 2.0 or the MIT license, at your option.
// See LICENSE-APACHE and LICENSE-MIT at the root of this repository.
// SPDX-License-Identifier: Apache-2.0 OR MIT

//! Shuttle concurrency-permutation tests for the `FsyncManager` group-commit
//! (leader/waiter) protocol (DST Milestone 2, Phase 2a).
//!
//! The whole file compiles to nothing unless built with `--cfg noxu_shuttle`,
//! so the default `cargo test` and every production build are unaffected.
//! Under the cfg, `FsyncManager`'s `Mutex`/`Condvar`/atomics resolve (through
//! `noxu_util::dst_sync`) to shuttle-instrumented primitives, so shuttle's
//! scheduler explores the leader/waiter/piggyback interleavings of the *real*
//! group-commit code.
//!
//! # A known limitation: this protocol's liveness depends on a timeout
//!
//! shuttle found a real, latent lost-wakeup in the group-commit hand-off, and
//! shuttle's model cannot prove liveness around it.  Concretely:
//!
//!   * When a leader finishes it calls `wakeup_one()` on the *next* waiter
//!     cohort to designate a new leader.  `wakeup_one` is a bare
//!     `Condvar::notify_one`; unlike the completion path (`wakeup_all`) it does
//!     **not** set a "signal pending" atomic.  If that `notify_one` lands
//!     before the next waiter reaches `wait_for_event`, the notification is
//!     lost (a `notify` with no waiter is a no-op).
//!   * Similarly, a waiter woken as `DoLeaderFsync` that finds
//!     `work_in_progress` already set does its *own* fsync but does **not**
//!     wake the rest of its cohort, orphaning them.
//!
//! In production both cases are recovered by `LOG_FSYNC_TIMEOUT` (default
//! 500 ms): the orphaned waiter's `Condvar::wait_timeout` eventually times out
//! and it performs its own fsync (`DoTimeoutFsync`).  The commit is never lost
//! — it is at worst delayed by the timeout.
//!
//! shuttle's `Condvar::wait_timeout` **never times out** (it is a hard block
//! until an explicit notify), so it cannot model that recovery: the orphaned
//! waiter blocks forever and shuttle reports a deadlock.  This is a real
//! property of the protocol (its liveness *does* rely on the timeout), not a
//! harness artifact — but it means a `check_random` over the full protocol
//! cannot be a green gate the way the notify-driven `DaemonManager` shutdown
//! test ([`shuttle_daemon_shutdown`]) can.
//!
//! Two things follow, and this file encodes both:
//!
//!   1. [`shuttle_catches_the_lost_wakeup`] (a normal, PASSING test) proves the
//!      shuttle harness *does* detect the orphan — i.e. the gate is not blind.
//!      It runs the real protocol under shuttle and asserts that shuttle
//!      reports a deadlock, so if the protocol were ever made timeout-free the
//!      test would (correctly) start failing and prompt a re-think.
//!   2. [`fsync_coalescing_and_coverage_hold`] (marked `#[ignore]`) carries the
//!      full safety oracle (coverage / coalescing / `FsyncedNeverDecreases`)
//!      ready to run the moment the hand-off is made timeout-independent; until
//!      then it is skipped because it hits the timeout-liveness deadlock above.
//!
//! # Running
//!
//! ```sh
//! RUSTFLAGS="--cfg noxu_shuttle" cargo test -p noxu-log --test shuttle_fsync_manager
//! ```
#![cfg(noxu_shuttle)]

use std::sync::atomic::Ordering;

use noxu_log::fsync_manager::FsyncManager;
use noxu_util::dst_invariants::{
    assert_durable_covers_commit, assert_fsynced_never_decreases,
};
use shuttle::sync::Arc;
use shuttle::sync::atomic::{AtomicU64, AtomicUsize};

/// Number of interleavings shuttle explores per test.
const ITERATIONS: usize = 5_000;

/// PASSING: prove the shuttle harness detects the group-commit lost-wakeup.
///
/// This is the "the oracle can fail" proof for the FsyncManager target: we run
/// the real leader/waiter protocol under shuttle and assert that *some*
/// interleaving is caught as a deadlock (the timeout-masked orphan described in
/// the module docs).  `shuttle::check_random` panics on the first failing
/// schedule, so we catch that panic and require it to be the deadlock.  If the
/// protocol were ever made timeout-independent this test would stop seeing the
/// deadlock and fail — a deliberate tripwire that forces us to promote
/// [`fsync_coalescing_and_coverage_hold`] out of `#[ignore]`.
#[test]
fn shuttle_catches_the_lost_wakeup() {
    let caught = std::panic::catch_unwind(|| {
        shuttle::check_random(
            || {
                let mgr = Arc::new(FsyncManager::new(0, 0));
                let handles: Vec<_> = (0..2)
                    .map(|_| {
                        let mgr = Arc::clone(&mgr);
                        shuttle::thread::spawn(move || {
                            let _ = mgr.flush_and_sync(|| Ok(0));
                        })
                    })
                    .collect();
                for h in handles {
                    h.join().unwrap();
                }
            },
            ITERATIONS,
        );
    });
    assert!(
        caught.is_err(),
        "shuttle should have caught the timeout-masked group-commit orphan; \
         if the hand-off is now timeout-independent, un-ignore \
         fsync_coalescing_and_coverage_hold and delete this tripwire"
    );
}

/// SAFETY ORACLE (currently `#[ignore]` — see module docs): every committer's
/// returned durable watermark covers its own LSN, coalescing holds, and the
/// durable watermark never regresses, under every interleaving.
///
/// Ignored because the group-commit hand-off's liveness depends on
/// `LOG_FSYNC_TIMEOUT`, which shuttle cannot model (it hits the deadlock proved
/// by [`shuttle_catches_the_lost_wakeup`]).  Kept complete and ready to run the
/// moment the hand-off is made timeout-independent.
#[test]
#[ignore = "group-commit liveness depends on fsync_timeout; shuttle cannot \
            model timeouts (see module docs). Enable once the hand-off is \
            timeout-independent."]
fn fsync_coalescing_and_coverage_hold() {
    shuttle::check_random(
        || {
            const N: usize = 3;
            let mgr = Arc::new(FsyncManager::new(0, 0));
            let next_lsn = Arc::new(AtomicU64::new(1));
            let snap_lsn = Arc::new(AtomicU64::new(0));
            let flushed_lsn = Arc::new(AtomicU64::new(0));
            let fsync_execs = Arc::new(AtomicUsize::new(0));

            let handles: Vec<_> = (0..N)
                .map(|_| {
                    let mgr = Arc::clone(&mgr);
                    let next_lsn = Arc::clone(&next_lsn);
                    let snap_lsn = Arc::clone(&snap_lsn);
                    let flushed_lsn = Arc::clone(&flushed_lsn);
                    let fsync_execs = Arc::clone(&fsync_execs);
                    shuttle::thread::spawn(move || {
                        let my_lsn = next_lsn.fetch_add(1, Ordering::SeqCst);
                        bump_max(&snap_lsn, my_lsn);

                        let flushed_lsn2 = Arc::clone(&flushed_lsn);
                        let snap_lsn2 = Arc::clone(&snap_lsn);
                        let execs2 = Arc::clone(&fsync_execs);
                        let durable = mgr
                            .flush_and_sync(move || {
                                execs2.fetch_add(1, Ordering::SeqCst);
                                let covered = snap_lsn2.load(Ordering::SeqCst);
                                let old = flushed_lsn2.load(Ordering::SeqCst);
                                let newv = covered.max(old);
                                flushed_lsn2.store(newv, Ordering::SeqCst);
                                assert_fsynced_never_decreases(old, newv);
                                Ok(covered)
                            })
                            .expect("no fault injected: fsync must succeed");

                        assert_durable_covers_commit(durable.as_u64(), my_lsn);
                        let global = flushed_lsn.load(Ordering::SeqCst);
                        assert_durable_covers_commit(global, my_lsn);
                    })
                })
                .collect();

            for h in handles {
                h.join().unwrap();
            }

            let execs = fsync_execs.load(Ordering::SeqCst);
            assert!(
                execs >= 1 && execs <= N,
                "fsync executions {execs} out of range 1..={N}: a redundant \
                 (double) fsync or a missing one indicates a coalescing bug"
            );
            let final_flushed = flushed_lsn.load(Ordering::SeqCst);
            let highest = next_lsn.load(Ordering::SeqCst) - 1;
            assert!(
                final_flushed >= highest,
                "final durable watermark {final_flushed} < highest commit \
                 LSN {highest}: a committed write was left unsynced"
            );
        },
        ITERATIONS,
    );
}

/// Advance `cell` to at least `v` (a lock-free max).
fn bump_max(cell: &AtomicU64, v: u64) {
    let mut cur = cell.load(Ordering::SeqCst);
    while cur < v {
        match cell.compare_exchange(cur, v, Ordering::SeqCst, Ordering::SeqCst)
        {
            Ok(_) => break,
            Err(a) => cur = a,
        }
    }
}