bplus_store 0.4.0

Copy-on-write B+ tree with page-aligned storage, split/merge, and crash-safety primitives.
Documentation
//! Tests for the epoch manager: concurrent pin/unpin, guard cleanup, reclaim safety.

use crate::storage::epoch::EpochManager;
use std::sync::Arc;
use std::thread;

#[test]
fn guard_drop_unpins_reader() {
    let mgr = EpochManager::new_shared();
    {
        let _guard = mgr.pin();
        assert_eq!(mgr.get_active_readers().len(), 1);
    }
    assert_eq!(
        mgr.get_active_readers().len(),
        0,
        "reader should be unpinned after guard is dropped"
    );
}

#[test]
fn oldest_active_returns_global_when_no_readers() {
    let mgr = EpochManager::new_shared();
    mgr.advance(); // epoch 2
    mgr.advance(); // epoch 3
    assert_eq!(
        mgr.oldest_active(),
        mgr.current(),
        "with no readers, oldest_active should equal global epoch"
    );
}

#[test]
fn oldest_active_tracks_oldest_pinned_reader() {
    let mgr = EpochManager::new_shared();
    let _guard1 = mgr.pin(); // pinned at epoch 1
    mgr.advance(); // epoch 2
    mgr.advance(); // epoch 3
    let _guard2 = mgr.pin(); // pinned at epoch 3

    // oldest_active only works across threads; both guards are on this thread
    // so the reader map has one entry. Test multi-thread scenario below.
    // Here just verify the guard is alive and epoch didn't advance past it.
    assert!(mgr.oldest_active() <= mgr.current());
}

#[test]
fn concurrent_pin_unpin_across_threads() {
    let mgr = EpochManager::new_shared();
    let num_threads = 8;
    let barrier = Arc::new(std::sync::Barrier::new(num_threads));

    let handles: Vec<_> = (0..num_threads)
        .map(|_| {
            let m = Arc::clone(&mgr);
            let b = Arc::clone(&barrier);
            thread::spawn(move || {
                b.wait();
                let guard = m.pin();
                let epoch = m.current();
                // Hold pin briefly.
                std::thread::yield_now();
                drop(guard);
                epoch
            })
        })
        .collect();

    for h in handles {
        h.join().unwrap();
    }

    assert_eq!(
        mgr.get_active_readers().len(),
        0,
        "all readers should be unpinned after threads complete"
    );
}

#[test]
fn reclaim_returns_only_pages_at_or_before_safe_epoch() {
    let mgr = EpochManager::new_shared();

    mgr.add_reclaim_candidate(1, 100);
    mgr.add_reclaim_candidate(1, 101);
    mgr.add_reclaim_candidate(2, 200);
    mgr.add_reclaim_candidate(3, 300);

    let reclaimed = mgr.reclaim(2);
    assert!(reclaimed.contains(&100));
    assert!(reclaimed.contains(&101));
    assert!(reclaimed.contains(&200));
    assert!(
        !reclaimed.contains(&300),
        "epoch 3 should not be reclaimed at safe_epoch=2"
    );

    // Epoch 3 should still be deferred.
    let remaining = mgr.get_deferred_pages();
    assert!(remaining.contains_key(&3));
    assert!(!remaining.contains_key(&1));
    assert!(!remaining.contains_key(&2));
}

#[test]
fn reclaim_with_no_candidates_returns_empty() {
    let mgr = EpochManager::new_shared();
    let reclaimed = mgr.reclaim(100);
    assert!(reclaimed.is_empty());
}

#[test]
fn advance_is_monotonic() {
    let mgr = EpochManager::new_shared();
    let mut prev = mgr.current();
    for _ in 0..100 {
        let next = mgr.advance();
        assert_eq!(next, prev + 1);
        prev = next;
    }
}

#[test]
fn pinned_reader_prevents_reclaim_of_its_epoch() {
    let mgr = EpochManager::new_shared();

    // Pin at epoch 1.
    let _guard = mgr.pin();

    mgr.advance(); // epoch 2
    mgr.add_reclaim_candidate(1, 42);

    // oldest_active is 1 on the main thread; pages at epoch 1 should NOT
    // be reclaimed if safe_epoch < pinned epoch.
    // Note: oldest_active returns the min of all thread pins.
    let safe = mgr.oldest_active();
    // reclaim only epochs strictly <= safe. Since our reader is pinned at 1,
    // oldest_active returns 1. Pages at epoch 1 are included in reclaim(1).
    // But a correct GC loop would check safe_epoch = oldest_active - 1.
    // This test documents current behavior.
    let reclaimed = mgr.reclaim(safe.saturating_sub(1));
    assert!(
        reclaimed.is_empty(),
        "pages at pinned epoch should not be reclaimed when safe < pinned"
    );
}

#[test]
fn multiple_candidates_at_same_epoch() {
    let mgr = EpochManager::new_shared();
    for id in 0..100u64 {
        mgr.add_reclaim_candidate(5, id);
    }
    let reclaimed = mgr.reclaim(5);
    assert_eq!(reclaimed.len(), 100);
    for id in 0..100u64 {
        assert!(reclaimed.contains(&id));
    }
}