obj-core 1.1.2

Storage engine internals for the obj embedded document database (pager, WAL, B-tree, codec, catalog).
Documentation
//! WAL append / commit / recovery tests.
//!
//! These exercise the [`Wal`] API in isolation, without going through
//! the pager. The pager-WAL integration tests live in
//! `crates/obj-core/src/pager/tests.rs`.

use tempfile::TempDir;

use crate::pager::page::{Page, PageId};
use crate::wal::{Lsn, Wal, WalConfig};

fn page_with(byte: u8) -> Page {
    let mut p = Page::zeroed();
    p.as_bytes_mut()[0] = byte;
    p.as_bytes_mut()[1000] = byte.wrapping_mul(3);
    p
}

fn id(n: u64) -> PageId {
    PageId::new(n).expect("non-zero")
}

#[test]
fn create_fresh_writes_header_only() {
    let dir = TempDir::new().expect("tempdir");
    let path = dir.path().join("db.obj-wal");
    let wal = Wal::create_fresh(&path, WalConfig::default()).expect("create");
    assert_eq!(wal.committed_frames(), 0);
    assert_eq!(wal.next_lsn(), Lsn::ONE);
}

#[test]
fn append_and_commit_one_frame() {
    let dir = TempDir::new().expect("tempdir");
    let path = dir.path().join("db.obj-wal");
    let mut wal = Wal::create_fresh(&path, WalConfig::default()).expect("create");
    let mut txn = wal.begin_txn();
    txn.append(id(1), &page_with(0xAA)).expect("append");
    let lsn = txn.commit().expect("commit");
    assert_eq!(lsn, Lsn::new(1));
    assert_eq!(wal.committed_frames(), 1);
}

#[test]
fn group_commit_assigns_consecutive_lsns() {
    let dir = TempDir::new().expect("tempdir");
    let path = dir.path().join("db.obj-wal");
    let mut wal = Wal::create_fresh(&path, WalConfig::default()).expect("create");
    let mut txn = wal.begin_txn();
    for n in 1u8..=4 {
        txn.append(id(u64::from(n)), &page_with(n)).expect("append");
    }
    let last_lsn = txn.commit().expect("commit");
    assert_eq!(last_lsn, Lsn::new(4));
    assert_eq!(wal.committed_frames(), 4);
    assert_eq!(wal.next_lsn(), Lsn::new(5));
}

#[test]
fn empty_txn_is_noop() {
    let dir = TempDir::new().expect("tempdir");
    let path = dir.path().join("db.obj-wal");
    let mut wal = Wal::create_fresh(&path, WalConfig::default()).expect("create");
    let txn = wal.begin_txn();
    let lsn = txn.commit().expect("empty commit");
    assert_eq!(lsn, Lsn::ZERO);
    assert_eq!(wal.committed_frames(), 0);
}

#[test]
fn reset_after_checkpoint_rotates_salt_and_truncates() {
    let dir = TempDir::new().expect("tempdir");
    let path = dir.path().join("db.obj-wal");
    let mut wal = Wal::create_fresh(&path, WalConfig::default()).expect("create");
    let old_salt = wal.salt();
    let mut txn = wal.begin_txn();
    txn.append(id(1), &page_with(0xAA)).expect("append");
    let _ = txn.commit().expect("commit");
    wal.reset_after_checkpoint().expect("reset");
    assert_ne!(wal.salt(), old_salt);
    assert_eq!(wal.committed_frames(), 0);
    assert_eq!(wal.next_lsn(), Lsn::ONE);
}

// --- Recovery tests (issue #15) ---------------------------------------

#[test]
fn recover_two_committed_txns_with_torn_tail() {
    let dir = TempDir::new().expect("tempdir");
    let path = dir.path().join("db.obj-wal");
    let salt = {
        let mut wal = Wal::create_fresh(&path, WalConfig::default()).expect("create");
        // Txn 1: page 1 = 0xAA, page 2 = 0xBB.
        let mut t = wal.begin_txn();
        t.append(id(1), &page_with(0xAA)).expect("a1");
        t.append(id(2), &page_with(0xBB)).expect("a2");
        t.commit().expect("commit 1");
        // Txn 2: page 1 = 0xCC (overwrite), page 3 = 0xDD.
        let mut t = wal.begin_txn();
        t.append(id(1), &page_with(0xCC)).expect("a3");
        t.append(id(3), &page_with(0xDD)).expect("a4");
        t.commit().expect("commit 2");
        wal.salt()
    };
    // Append a torn-tail byte to the WAL: a partial third frame that
    // will fail to read (because the file is shorter than a full frame).
    {
        use std::fs::OpenOptions;
        use std::io::Write as _;
        let mut f = OpenOptions::new()
            .append(true)
            .open(&path)
            .expect("open wal rw");
        f.write_all(&[0xFFu8; 100]).expect("torn tail bytes");
    }
    let recovered =
        Wal::open_for_recovery(&path, salt, WalConfig::default().size_limit).expect("recover");
    assert_eq!(recovered.committed_frames, 4);
    // Page 1: latest commit is 0xCC.
    let p1 = recovered.view.get(&id(1)).expect("p1");
    assert_eq!(p1.as_bytes()[0], 0xCC);
    let p2 = recovered.view.get(&id(2)).expect("p2");
    assert_eq!(p2.as_bytes()[0], 0xBB);
    let p3 = recovered.view.get(&id(3)).expect("p3");
    assert_eq!(p3.as_bytes()[0], 0xDD);
}

#[test]
fn recover_empty_wal_with_header_only() {
    let dir = TempDir::new().expect("tempdir");
    let path = dir.path().join("db.obj-wal");
    let salt = {
        let wal = Wal::create_fresh(&path, WalConfig::default()).expect("create");
        wal.salt()
    };
    let recovered =
        Wal::open_for_recovery(&path, salt, WalConfig::default().size_limit).expect("recover");
    assert_eq!(recovered.committed_frames, 0);
    assert!(recovered.view.is_empty());
}

#[test]
fn recover_no_wal_file_is_empty() {
    let dir = TempDir::new().expect("tempdir");
    let path = dir.path().join("absent.obj-wal");
    let recovered = Wal::open_for_recovery(&path, 0xCAFE_BABE, WalConfig::default().size_limit)
        .expect("recover");
    assert_eq!(recovered.committed_frames, 0);
    assert!(recovered.view.is_empty());
}

#[test]
fn recover_stale_salt_is_treated_as_empty() {
    let dir = TempDir::new().expect("tempdir");
    let path = dir.path().join("db.obj-wal");
    {
        let mut wal = Wal::create_fresh(&path, WalConfig::default()).expect("create");
        let mut t = wal.begin_txn();
        t.append(id(1), &page_with(0xAA)).expect("a");
        t.commit().expect("commit");
    }
    // Expect a different salt than the WAL was written with: the
    // WAL is from a stale generation, treat as empty.
    let recovered = Wal::open_for_recovery(&path, 0xDEAD_BEEF, WalConfig::default().size_limit)
        .expect("recover");
    assert_eq!(recovered.committed_frames, 0);
    assert!(recovered.view.is_empty());
}

#[test]
fn recover_corrupted_tail_after_last_commit_truncates() {
    // Corrupting a frame *past* the last good commit marker is
    // torn-tail noise and silently discarded — the WAL recovers up
    // to the last intact commit.
    let dir = TempDir::new().expect("tempdir");
    let path = dir.path().join("db.obj-wal");
    let salt = {
        let mut wal = Wal::create_fresh(&path, WalConfig::default()).expect("create");
        let mut t = wal.begin_txn();
        t.append(id(1), &page_with(0xAA)).expect("a1");
        t.commit().expect("commit 1");
        // Second txn commits, but we corrupt its body below — the
        // corruption falls *past* the first commit marker (i.e. in
        // torn-tail territory), so recovery silently discards it.
        let mut t = wal.begin_txn();
        t.append(id(2), &page_with(0xBB)).expect("a2");
        t.commit().expect("commit 2");
        wal.salt()
    };
    // Corrupt one byte inside the second frame's body (offset = WAL
    // header + 1 frame + frame header + body offset = 64 + 4160 + 64
    // + 50).
    {
        use std::fs::OpenOptions;
        use std::io::{Seek, SeekFrom, Write as _};
        let mut f = OpenOptions::new()
            .read(true)
            .write(true)
            .open(&path)
            .expect("open wal rw");
        let corrupt_offset = 64u64 + 4160u64 + 64u64 + 50u64;
        f.seek(SeekFrom::Start(corrupt_offset)).expect("seek");
        f.write_all(&[0xAB]).expect("corrupt");
    }
    let recovered =
        Wal::open_for_recovery(&path, salt, WalConfig::default().size_limit).expect("recover");
    // The first commit is intact; the corrupted second frame sits
    // past the last valid commit marker (frame 1), so it is treated
    // as torn tail.
    assert_eq!(recovered.committed_frames, 1);
    assert!(recovered.view.contains_key(&id(1)));
    assert!(!recovered.view.contains_key(&id(2)));
}

#[test]
fn wal_size_limit_rejects_overflow() {
    let dir = TempDir::new().expect("tempdir");
    let path = dir.path().join("db.obj-wal");
    let config = WalConfig {
        size_limit: 4096 + 4160, // header + 1 frame; second append should fail.
        ..WalConfig::default()
    };
    let mut wal = Wal::create_fresh(&path, config).expect("create");
    let mut txn = wal.begin_txn();
    txn.append(id(1), &page_with(0x11)).expect("first append");
    let err = txn.append(id(2), &page_with(0x22));
    assert!(err.is_err(), "second append must hit the size limit");
}

// --- Two-pass recovery tests (issue #21) ------------------------------

/// Flipping a byte in the FIRST committed transaction's frame body
/// must surface as `Error::WalCorruption`, NOT silently truncate at
/// the bad CRC. Pre-#21 the recovery walk stopped at the first bad
/// CRC and the second transaction was lost without notice; the
/// two-pass walk catches the mid-WAL CRC mismatch and refuses to
/// guess.
#[test]
fn recover_corrupted_first_frame_surfaces_wal_corruption() {
    let dir = TempDir::new().expect("tempdir");
    let path = dir.path().join("db.obj-wal");
    let salt = {
        let mut wal = Wal::create_fresh(&path, WalConfig::default()).expect("create");
        // Txn 1: a single-frame commit.
        let mut t = wal.begin_txn();
        t.append(id(1), &page_with(0xAA)).expect("a1");
        t.commit().expect("commit 1");
        // Txn 2: a second single-frame commit. Frame 1 is now
        // sandwiched between the WAL header and a later valid commit
        // marker (frame 2).
        let mut t = wal.begin_txn();
        t.append(id(2), &page_with(0xBB)).expect("a2");
        t.commit().expect("commit 2");
        wal.salt()
    };
    // Corrupt one byte inside the FIRST frame's body. Offset = WAL
    // header (64) + frame header (64) + body byte 50.
    {
        use std::fs::OpenOptions;
        use std::io::{Seek, SeekFrom, Write as _};
        let mut f = OpenOptions::new()
            .read(true)
            .write(true)
            .open(&path)
            .expect("open wal rw");
        let corrupt_offset = 64u64 + 64u64 + 50u64;
        f.seek(SeekFrom::Start(corrupt_offset)).expect("seek");
        f.write_all(&[0xAB]).expect("corrupt");
    }
    let err = Wal::open_for_recovery(&path, salt, WalConfig::default().size_limit)
        .expect_err("must surface WalCorruption");
    match err {
        crate::Error::WalCorruption { frame_offset } => {
            // First frame starts immediately after the WAL header.
            assert_eq!(frame_offset, 64);
        }
        other => panic!("expected WalCorruption, got {other:?}"),
    }
}

#[test]
fn recover_torn_tail_byte_past_last_commit_recovers_cleanly() {
    // Companion to the test above: a byte flipped *past* the last
    // commit marker is torn-tail noise. Recovery silently discards
    // it and returns both committed transactions intact.
    let dir = TempDir::new().expect("tempdir");
    let path = dir.path().join("db.obj-wal");
    let salt = {
        let mut wal = Wal::create_fresh(&path, WalConfig::default()).expect("create");
        let mut t = wal.begin_txn();
        t.append(id(1), &page_with(0xAA)).expect("a1");
        t.commit().expect("commit 1");
        let mut t = wal.begin_txn();
        t.append(id(2), &page_with(0xBB)).expect("a2");
        t.commit().expect("commit 2");
        wal.salt()
    };
    // Append junk bytes past the last commit marker (frame 2). The
    // file length is header + 2 frames = 64 + 8320 = 8384.
    {
        use std::fs::OpenOptions;
        use std::io::Write as _;
        let mut f = OpenOptions::new()
            .append(true)
            .open(&path)
            .expect("open wal append");
        f.write_all(&[0xFFu8; 200]).expect("append torn bytes");
    }
    let recovered =
        Wal::open_for_recovery(&path, salt, WalConfig::default().size_limit).expect("recover");
    assert_eq!(recovered.committed_frames, 2);
    assert!(recovered.view.contains_key(&id(1)));
    assert!(recovered.view.contains_key(&id(2)));
}

#[test]
fn recover_mid_wal_crc_in_multi_frame_txn_surfaces_corruption() {
    // A multi-frame transaction where a non-commit frame inside the
    // run has been corrupted. Pass 1 still finds the trailing commit
    // marker for the second transaction; pass 2 walks the prefix and
    // hits the bad CRC on the first txn's first frame.
    let dir = TempDir::new().expect("tempdir");
    let path = dir.path().join("db.obj-wal");
    let salt = {
        let mut wal = Wal::create_fresh(&path, WalConfig::default()).expect("create");
        // Two-frame txn 1.
        let mut t = wal.begin_txn();
        t.append(id(1), &page_with(0xAA)).expect("a1");
        t.append(id(2), &page_with(0xBB)).expect("a2");
        t.commit().expect("commit 1");
        // One-frame txn 2 — its commit marker is what pass 1 will
        // anchor on.
        let mut t = wal.begin_txn();
        t.append(id(3), &page_with(0xCC)).expect("a3");
        t.commit().expect("commit 2");
        wal.salt()
    };
    // Corrupt one byte inside the FIRST frame's body.
    {
        use std::fs::OpenOptions;
        use std::io::{Seek, SeekFrom, Write as _};
        let mut f = OpenOptions::new()
            .read(true)
            .write(true)
            .open(&path)
            .expect("open wal rw");
        let corrupt_offset = 64u64 + 64u64 + 200u64;
        f.seek(SeekFrom::Start(corrupt_offset)).expect("seek");
        f.write_all(&[0x55]).expect("corrupt");
    }
    let err = Wal::open_for_recovery(&path, salt, WalConfig::default().size_limit)
        .expect_err("must surface WalCorruption");
    assert!(matches!(err, crate::Error::WalCorruption { .. }));
}

/// #85: write a multi-frame transaction that INCLUDES a page-0
/// (header) frame, then recover it. Proves the `is_header` bool +
/// the reused frame scratch produce a WAL whose header frame is
/// emitted with `wire_page_id == 0` (recovered into `Recovered.header`)
/// while the regular frames recover into `Recovered.view` — i.e. the
/// allocation-hygiene refactor preserves on-disk semantics.
#[test]
fn recover_multi_frame_txn_including_header_frame() {
    let dir = TempDir::new().expect("tempdir");
    let path = dir.path().join("db.obj-wal");
    let salt = {
        let mut wal = Wal::create_fresh(&path, WalConfig::default()).expect("create");
        // One txn: regular page 1, then a header (page-0) update, then
        // regular page 2 as the commit marker. Mixing the header frame
        // in the MIDDLE exercises the index-aligned `is_header` lookup.
        let mut t = wal.begin_txn();
        t.append(id(1), &page_with(0xAA)).expect("a1");
        t.append_header(&page_with(0x11)).expect("hdr");
        t.append(id(2), &page_with(0xBB)).expect("a2");
        t.commit().expect("commit");
        wal.salt()
    };
    let recovered =
        Wal::open_for_recovery(&path, salt, WalConfig::default().size_limit).expect("recover");
    // All three frames committed.
    assert_eq!(recovered.committed_frames, 3);
    // Regular pages land in the view with their bodies intact.
    let p1 = recovered.view.get(&id(1)).expect("p1");
    assert_eq!(p1.as_bytes()[0], 0xAA);
    let p2 = recovered.view.get(&id(2)).expect("p2");
    assert_eq!(p2.as_bytes()[0], 0xBB);
    // The header frame recovered with `page_id == 0` into the
    // dedicated header slot — NOT into the view under a stand-in id.
    let header = recovered.header.expect("header recovered");
    assert_eq!(header.as_bytes()[0], 0x11);
    // The stand-in `PageId(1)` used internally for the header frame
    // must not have leaked a header body into the page-1 view slot.
    assert_eq!(
        recovered.view.get(&id(1)).expect("p1 again").as_bytes()[0],
        0xAA
    );
}