fsys 1.1.0

Filesystem IO for Rust storage engines: journal substrate, io_uring, NVMe passthrough, atomic writes, cross-platform durability.
Documentation
//! 0.8.0 R-1 — Multi-thread concurrent-append correctness +
//! group-commit verification for the journal substrate.
//!
//! These tests exercise the lock-free append path under heavy
//! concurrent load and verify:
//!
//! 1. **No data corruption.** Bytes from one thread's record
//!    don't interleave with another's; the LSN-reservation
//!    invariant holds.
//! 2. **Group-commit coherence.** When N threads call
//!    `sync_through(lsn)` simultaneously, only one fsync runs;
//!    all callers wake up with the durability guarantee
//!    fulfilled.
//! 3. **Resume correctness.** After close-and-reopen, the
//!    journal's `next_lsn` matches the file size and previously
//!    appended records are still present byte-for-byte.

use fsys::{builder, JournalHandle, JournalReader, Lsn};
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Barrier};

static C: AtomicU64 = AtomicU64::new(0);

fn tmp_path(tag: &str) -> PathBuf {
    let n = C.fetch_add(1, Ordering::Relaxed);
    std::env::temp_dir().join(format!(
        "fsys_journal_concurrent_{}_{}_{tag}",
        std::process::id(),
        n
    ))
}

struct Cleanup(PathBuf);
impl Drop for Cleanup {
    fn drop(&mut self) {
        let _ = std::fs::remove_file(&self.0);
    }
}

#[test]
fn concurrent_appends_produce_no_data_corruption() {
    // Each thread appends a fixed-size record carrying a thread
    // ID. After all threads finish, we read the file and verify
    // that every record we see is a valid record (matches one of
    // the thread IDs we expect), and the total bytes match
    // threads × records × record_size.
    const THREADS: usize = 16;
    const RECORDS_PER_THREAD: usize = 1000;
    const RECORD_SIZE: usize = 64;
    let path = tmp_path("no_corruption");
    let _g = Cleanup(path.clone());
    let fs = builder().build().expect("handle");
    let log = Arc::new(fs.journal(&path).expect("journal"));

    let barrier = Arc::new(Barrier::new(THREADS));
    let mut joins = Vec::new();
    for tid in 0..THREADS {
        let log = log.clone();
        let barrier = barrier.clone();
        joins.push(std::thread::spawn(move || {
            // All threads wait at the barrier — start hammering
            // simultaneously to maximise contention.
            barrier.wait();
            let mut record = vec![0u8; RECORD_SIZE];
            for i in 0..RECORDS_PER_THREAD {
                // Encode thread ID + record index in the record so
                // we can verify ordering later.
                let tag = format!("T{tid:03}-R{i:06}");
                let bytes = tag.as_bytes();
                record[..bytes.len()].copy_from_slice(bytes);
                let _lsn = log.append(&record).expect("append");
            }
        }));
    }
    for j in joins {
        j.join().expect("thread join");
    }
    // Final sync.
    let final_lsn = log.next_lsn();
    log.sync_through(final_lsn).expect("final sync");

    // Verify total file size matches expected — payload +
    // FRAME_OVERHEAD per record. FRAME_OVERHEAD is 12 bytes
    // (magic + length + crc32c).
    let frame_overhead: usize = 12;
    let expected_bytes = THREADS * RECORDS_PER_THREAD * (RECORD_SIZE + frame_overhead);
    let actual_bytes = std::fs::metadata(&path).expect("stat").len() as usize;
    assert_eq!(
        actual_bytes, expected_bytes,
        "file size mismatch — concurrent appends interleaved or lost data"
    );

    // Replay via JournalReader — verifies framing integrity
    // end-to-end (every record's CRC validates) AND that every
    // expected (tid, record_idx) tag appears exactly once.
    let mut reader = JournalReader::open(&path).expect("open reader");
    let mut tag_counts: std::collections::HashMap<String, usize> = std::collections::HashMap::new();
    for record in reader.iter() {
        let record = record.expect("record decode");
        let trimmed = record
            .payload
            .iter()
            .position(|&b| b == 0)
            .map(|p| &record.payload[..p])
            .unwrap_or(&record.payload);
        let tag = std::str::from_utf8(trimmed).expect("utf-8 tag");
        *tag_counts.entry(tag.to_string()).or_insert(0) += 1;
    }
    assert_eq!(
        reader.tail_state(),
        fsys::JournalTailState::CleanEnd,
        "tail should be clean after reader iteration"
    );

    // Every (tid, record_idx) tag should appear exactly once.
    assert_eq!(
        tag_counts.len(),
        THREADS * RECORDS_PER_THREAD,
        "expected every distinct tag to appear once; saw {} distinct tags",
        tag_counts.len()
    );
    for (tag, count) in &tag_counts {
        assert_eq!(
            *count, 1,
            "tag {tag} appeared {count} times — duplicate or missing record"
        );
    }
}

#[test]
fn group_commit_coalesces_concurrent_sync_calls() {
    // Many threads each append + sync_through. With group commit
    // working correctly, all threads should observe their
    // sync_through return successfully without deadlock and the
    // synced_lsn should advance monotonically.
    const THREADS: usize = 32;
    let path = tmp_path("group_commit");
    let _g = Cleanup(path.clone());
    let fs = builder().build().expect("handle");
    let log = Arc::new(fs.journal(&path).expect("journal"));

    let mut joins = Vec::new();
    for tid in 0..THREADS {
        let log = log.clone();
        joins.push(std::thread::spawn(move || {
            let lsn = log.append(format!("T{tid:03}").as_bytes()).expect("append");
            log.sync_through(lsn).expect("sync_through");
            // Verify that the synced_lsn is at least our LSN.
            assert!(log.synced_lsn() >= lsn);
            lsn
        }));
    }
    let mut max_lsn = Lsn::ZERO;
    for j in joins {
        let lsn = j.join().expect("join");
        if lsn > max_lsn {
            max_lsn = lsn;
        }
    }
    assert!(log.synced_lsn() >= max_lsn);
}

#[test]
fn close_and_reopen_resumes_correctly() {
    let path = tmp_path("reopen");
    let _g = Cleanup(path.clone());
    let fs = builder().build().expect("handle");

    // First open — append some records.
    let initial_size: u64;
    {
        let log = fs.journal(&path).expect("journal1");
        let _ = log.append(b"first record").expect("a1");
        let _ = log.append(b"second record").expect("a2");
        let lsn = log.append(b"third").expect("a3");
        log.sync_through(lsn).expect("sync");
        initial_size = log.next_lsn().as_u64();
        log.close().expect("close");
    }

    // Reopen — verify resume state.
    let log2 = fs.journal(&path).expect("journal2");
    assert_eq!(log2.next_lsn().as_u64(), initial_size);
    assert_eq!(log2.synced_lsn().as_u64(), initial_size);

    // Append more — verify LSN continues from where we left off.
    let new_lsn = log2.append(b"fourth").expect("a4");
    assert!(new_lsn.as_u64() > initial_size);
    log2.close().expect("close2");

    // Final read-back via JournalReader — verifies framing.
    let mut reader = JournalReader::open(&path).expect("open reader");
    let payloads: Vec<Vec<u8>> = reader.iter().map(|r| r.expect("record").payload).collect();
    assert_eq!(
        payloads,
        vec![
            b"first record".to_vec(),
            b"second record".to_vec(),
            b"third".to_vec(),
            b"fourth".to_vec(),
        ]
    );
    assert_eq!(reader.tail_state(), fsys::JournalTailState::CleanEnd);
}

#[test]
fn synced_lsn_is_monotonic_under_concurrent_load() {
    // Verify the synced_lsn invariant: it never decreases, even
    // under concurrent appends and syncs.
    const THREADS: usize = 8;
    const ROUNDS: usize = 100;
    let path = tmp_path("monotonic");
    let _g = Cleanup(path.clone());
    let fs = builder().build().expect("handle");
    let log = Arc::new(fs.journal(&path).expect("journal"));

    let observed_max = Arc::new(AtomicU64::new(0));
    let mut joins = Vec::new();
    for _ in 0..THREADS {
        let log = log.clone();
        let observed_max = observed_max.clone();
        joins.push(std::thread::spawn(move || {
            for _ in 0..ROUNDS {
                let lsn = log.append(b"x").expect("append");
                log.sync_through(lsn).expect("sync");
                let now = log.synced_lsn().as_u64();
                let prev = observed_max.fetch_max(now, Ordering::SeqCst);
                // synced_lsn observed by us must be >= what we
                // sync_through'd. Concurrent observers may see a
                // higher value if other threads advanced it
                // further. Both are fine; the invariant we
                // assert is "fetch_max never sees a regression"
                // — i.e. synced_lsn is monotonic.
                assert!(now >= lsn.as_u64());
                let _ = prev;
            }
        }));
    }
    for j in joins {
        j.join().expect("join");
    }
}

#[test]
fn append_returns_lsn_equal_to_file_size_after_each_op() {
    // Single-threaded sanity: after each append, the file size
    // should equal the next_lsn (modulo the OS having applied
    // the write — sync_through ensures this). The LSN advance
    // includes frame overhead (12 bytes per record).
    let path = tmp_path("size_invariant");
    let _g = Cleanup(path.clone());
    let fs = builder().build().expect("handle");
    let log = fs.journal(&path).expect("journal");

    for i in 0..50 {
        let payload = vec![0xABu8; 17 + i]; // varying sizes
        let lsn = log.append(&payload).expect("append");
        log.sync_through(lsn).expect("sync");
        let actual_size = std::fs::metadata(&path).expect("stat").len();
        assert_eq!(
            actual_size,
            lsn.as_u64(),
            "after sync_through, file size should equal LSN (= sum of framed record sizes) at iter {i}"
        );
    }
    let _ = JournalHandle::close;
}

/// 0.9.6 audit H-8 — concurrent stress with a thread-count
/// ladder.
///
/// Pre-0.9.6, concurrent-append tests ran at fixed thread counts
/// (8 / 16). Race conditions, lock contention, and false-sharing
/// effects are thread-count-dependent — a bug at N=32 might be
/// invisible at N=8. This test sweeps the full ladder so a
/// regression at any depth is caught in CI.
///
/// Verifies the same byte-corruption + LSN-monotonicity
/// invariants as `concurrent_appends_produce_no_data_corruption`
/// but at each of `[1, 2, 4, 8, 16, 32]` threads. The 1-thread
/// case is the regression check for the lock-free single-writer
/// path; the higher counts catch contention-related races.
#[test]
fn concurrent_appends_thread_count_ladder() {
    const RECORDS_PER_THREAD: usize = 250;
    const RECORD_SIZE: usize = 32;

    for &thread_count in &[1usize, 2, 4, 8, 16, 32] {
        let path = tmp_path(&format!("ladder_{thread_count}"));
        let _g = Cleanup(path.clone());

        let fs = builder().build().expect("handle");
        let log = Arc::new(fs.journal(&path).expect("journal"));
        let barrier = Arc::new(Barrier::new(thread_count));

        let mut handles = Vec::with_capacity(thread_count);
        for tid in 0..thread_count {
            let log = Arc::clone(&log);
            let barrier = Arc::clone(&barrier);
            handles.push(std::thread::spawn(move || {
                let payload = vec![tid as u8; RECORD_SIZE];
                let _ = barrier.wait();
                let mut last_lsn = Lsn::ZERO;
                for _ in 0..RECORDS_PER_THREAD {
                    last_lsn = log.append(&payload).expect("append");
                }
                last_lsn
            }));
        }

        // Collect each thread's last LSN.
        let mut last_lsns: Vec<Lsn> = Vec::with_capacity(thread_count);
        for h in handles {
            last_lsns.push(h.join().expect("thread join"));
        }

        // Final sync_through on the highest LSN observed by any
        // thread.
        let high = last_lsns.iter().copied().max().unwrap_or(Lsn::ZERO);
        log.sync_through(high).expect("final sync");

        // Verify invariants:
        // 1. File size == high LSN (every byte accounted for).
        let actual_size = std::fs::metadata(&path).expect("stat").len();
        assert_eq!(
            actual_size,
            high.as_u64(),
            "thread_count={thread_count}: file size should equal high LSN"
        );

        // 2. Reading every record back yields exactly
        //    thread_count × RECORDS_PER_THREAD records, each
        //    RECORD_SIZE bytes of one thread ID byte repeated.
        let mut reader = JournalReader::open(&path).expect("reader open");
        let mut record_count = 0usize;
        let mut per_thread_count = vec![0usize; thread_count];
        for rec in reader.iter() {
            let r = rec.expect("read record");
            assert_eq!(
                r.payload.len(),
                RECORD_SIZE,
                "thread_count={thread_count}: every record should be RECORD_SIZE bytes"
            );
            let tid = r.payload[0] as usize;
            assert!(
                tid < thread_count,
                "thread_count={thread_count}: record payload byte {tid} >= thread_count {thread_count}"
            );
            // All bytes in this record should be the same thread ID byte.
            assert!(
                r.payload.iter().all(|&b| b == tid as u8),
                "thread_count={thread_count}: record payload not uniform (interleaving bug?)"
            );
            per_thread_count[tid] += 1;
            record_count += 1;
        }
        assert_eq!(
            record_count,
            thread_count * RECORDS_PER_THREAD,
            "thread_count={thread_count}: missing records"
        );
        for (tid, count) in per_thread_count.iter().enumerate() {
            assert_eq!(
                *count, RECORDS_PER_THREAD,
                "thread_count={thread_count}, tid={tid}: per-thread record count mismatch"
            );
        }
    }
}