batpak 0.8.0

Event sourcing with causal graphs and caller-defined gates. Sync API, no async runtime.
Documentation
use super::*;
use crate::coordinate::DagPosition;
use crate::store::index::DiskPos;
use std::io::ErrorKind;
use tempfile::TempDir;

struct FailingRead {
    kind: ErrorKind,
}

impl std::io::Read for FailingRead {
    fn read(&mut self, _buf: &mut [u8]) -> std::io::Result<usize> {
        Err(std::io::Error::from(self.kind))
    }
}

fn test_reader() -> (Reader, TempDir) {
    let dir = TempDir::new().expect("create temp dir for reader test");
    let reader = Reader::new(
        dir.path().to_path_buf(),
        4,
        std::sync::Arc::new(crate::store::SystemClock::new()),
    );
    (reader, dir)
}

fn write_segment_bytes(dir: &TempDir, segment_id: u64, bytes: &[u8]) {
    let path = dir.path().join(segment::segment_filename(segment_id));
    std::fs::write(&path, bytes).expect("write segment bytes");
}

#[test]
fn read_frame_header_policy_treats_unexpected_eof_as_clean_end() {
    let mut reader = FailingRead {
        kind: ErrorKind::UnexpectedEof,
    };

    let result = read_frame_header_or_clean_eof(&mut reader).expect("EOF should be non-fatal");

    assert!(
        result.is_none(),
        "PROPERTY: EOF while reading the next frame header is the clean segment terminator"
    );
}

#[test]
fn read_frame_header_policy_surfaces_non_eof_io_errors() {
    let mut reader = FailingRead {
        kind: ErrorKind::PermissionDenied,
    };

    let result = read_frame_header_or_clean_eof(&mut reader);

    assert!(
        matches!(result, Err(error) if error.kind() == ErrorKind::PermissionDenied),
        "PROPERTY: non-EOF frame-header read errors must surface as I/O failures"
    );
}

#[test]
fn frame_decode_error_mapping_preserves_segment_and_offset_context() {
    fn assert_error_trait<E: std::error::Error>() {}

    assert_error_trait::<segment::FrameDecodeError>();

    let crc_error = Reader::frame_decode_error(
        7,
        42,
        segment::FrameDecodeError::CrcMismatch {
            expected: 0xAAAA_AAAA,
            actual: 0xBBBB_BBBB,
        },
    );
    assert!(
        matches!(
            crc_error,
            StoreError::CrcMismatch {
                segment_id: 7,
                offset: 42
            }
        ),
        "PROPERTY: frame CRC failures must retain exact disk position context"
    );

    let truncated_error = Reader::frame_decode_error(
        7,
        42,
        segment::FrameDecodeError::Truncated {
            expected_len: 16,
            available: 12,
        },
    );
    assert!(
        matches!(
            truncated_error,
            StoreError::CorruptSegment { segment_id: 7, ref detail }
            if detail.contains("frame at offset 42")
                && detail.contains("frame truncated: expected 16 bytes, got 12")
        ),
        "PROPERTY: structural frame decode failures must retain segment, offset, and decode reason; got {truncated_error:?}"
    );
}

#[test]
fn acquire_buffer_returns_requested_size() {
    let (reader, _dir) = test_reader();
    let buf = reader.acquire_buffer(256);
    assert_eq!(
        buf.len(),
        256,
        "ACQUIRE BUFFER: expected buffer of size 256, got {}.\n\
         Check: src/store/segment/scan.rs acquire_buffer() vec allocation.",
        buf.len()
    );
    assert!(
        buf.iter().all(|&b| b == 0),
        "ACQUIRE BUFFER: newly allocated buffer should be zero-initialized."
    );
}

#[test]
fn released_buffer_is_zero_filled_and_resized_on_next_acquire() {
    let (reader, _dir) = test_reader();

    let mut buf = reader.acquire_buffer(128);
    for byte in buf.iter_mut() {
        *byte = 0xAB;
    }
    reader.release_buffer(buf);

    let buf2 = reader.acquire_buffer(64);
    assert_eq!(
        buf2.len(),
        64,
        "PROPERTY: re-acquired buffer must match the requested size, \
         regardless of whether it came from the pool or a fresh allocation. \
         Investigate: src/store/segment/scan.rs acquire_buffer resize path."
    );
    assert!(
        buf2.iter().all(|&b| b == 0),
        "PROPERTY: re-acquired buffer must be zero-filled. A non-zero byte \
         means the previous user's data leaked into the new acquirer, \
         which is a memory-safety / information-disclosure bug. \
         Investigate: src/store/segment/scan.rs acquire_buffer fill path."
    );
}

#[test]
fn buffer_pool_does_not_grow_unboundedly() {
    let (reader, _dir) = test_reader();

    for _ in 0..100 {
        reader.release_buffer(vec![0u8; 1024]);
    }

    for i in 0..100 {
        let buf = reader.acquire_buffer(1024);
        assert_eq!(
            buf.len(),
            1024,
            "PROPERTY: buffer {i} of 100 must be the requested size."
        );
        assert!(
            buf.iter().all(|&b| b == 0),
            "PROPERTY: buffer {i} of 100 must be zero-filled."
        );
    }
}

#[test]
fn acquire_buffer_satisfies_contract_on_empty_pool() {
    let (reader, _dir) = test_reader();

    let buf = reader.acquire_buffer(512);
    assert_eq!(
        buf.len(),
        512,
        "PROPERTY: acquire_buffer on a fresh reader must return the \
         requested size. Investigate: src/store/segment/scan.rs allocation \
         path when pool is empty."
    );
    assert!(
        buf.iter().all(|&b| b == 0),
        "PROPERTY: a freshly allocated buffer must be zero-filled."
    );
}

#[test]
fn buffer_pool_retains_at_most_sixteen_released_buffers() {
    let (reader, _dir) = test_reader();

    for _ in 0..17 {
        reader.release_buffer(vec![0u8; 32]);
    }

    let retained = reader.buffer_pool.lock().len();
    assert_eq!(
        retained, 16,
        "PROPERTY: release_buffer must cap the internal pool at exactly 16 buffers; \
         retaining a seventeenth buffer weakens the bounded-memory contract"
    );
}

#[test]
fn batch_marker_payload_decode_ignores_marker_payload_bytes() {
    let header = EventHeader::new(
        1,
        1,
        None,
        1,
        DagPosition::root(),
        0,
        EventKind::SYSTEM_BATCH_BEGIN,
    );
    let event = Event {
        header,
        payload: vec![0xC1],
        hash_chain: Some(HashChain::default()),
    };
    let frame = FramePayload {
        event,
        entity: "entity:batch-marker".to_owned(),
        scope: "scope:test".to_owned(),
        receipt_extensions: BTreeMap::new(),
    };
    let encoded = crate::encoding::to_bytes(&frame).expect("encode batch marker frame");

    let decoded = Reader::decode_frame_payload_value(&encoded)
        .expect("batch marker payload bytes are ignored by value decode");

    assert_eq!(
        decoded.event.payload,
        serde_json::Value::Null,
        "PROPERTY: SYSTEM_BATCH_BEGIN/COMMIT markers carry count semantics in the header; \
         value decoding must not deserialize their raw marker payload bytes"
    );
}

#[test]
fn set_active_segment_advances_the_sealed_cutoff() {
    let (reader, _dir) = test_reader();

    reader.set_active_segment(7);

    assert_eq!(reader.active_segment_id(), 7);
    assert!(
        reader.is_sealed(6),
        "PROPERTY: segments older than the configured active segment must be treated as sealed"
    );
    assert!(
        !reader.is_sealed(7),
        "PROPERTY: the configured active segment itself must stay writable/non-sealed"
    );
    assert!(
        !reader.is_sealed(8),
        "PROPERTY: future segment ids must not be treated as sealed before rotation reaches them"
    );
}

#[test]
fn read_active_frame_into_reads_the_full_requested_slice() {
    let (reader, dir) = test_reader();
    write_segment_bytes(&dir, 0, b"0123456789abcdef");

    let pos = DiskPos::new(0, 3, 5);
    let mut buf = [0u8; 5];
    reader
        .read_active_frame_into(&pos, &mut buf)
        .expect("read active bytes");

    assert_eq!(
        &buf, b"34567",
        "PROPERTY: active-segment reads must advance until the caller's buffer is fully populated"
    );
}

#[test]
fn checked_frame_range_rejects_overflow_and_oversized_lengths() {
    assert!(Reader::checked_frame_range(1, u64::MAX, 16, 1024).is_err());
    assert!(Reader::checked_frame_len(1, 4).is_err());
    assert!(
        Reader::checked_frame_len(
            1,
            u32::try_from(FRAME_HEADER_BYTES).expect("frame header size fits u32")
        )
        .is_ok(),
        "PROPERTY: a frame length exactly equal to the frame header size is the minimum valid empty-payload frame"
    );
    assert!(Reader::checked_frame_len(
        1,
        u32::try_from(FRAME_HEADER_BYTES + segment::MAX_FRAME_PAYLOAD)
            .expect("max frame length fits u32")
    )
    .is_ok());
    assert!(Reader::checked_frame_len(
        1,
        u32::try_from(FRAME_HEADER_BYTES + segment::MAX_FRAME_PAYLOAD + 1)
            .expect("one-past-max frame length fits u32")
    )
    .is_err());
    assert!(Reader::checked_frame_len(1, u32::MAX).is_err());
}

#[test]
fn payload_len_exceeds_max_respects_the_exact_boundary() {
    assert!(
        !Reader::payload_len_exceeds_max(segment::MAX_FRAME_PAYLOAD),
        "PROPERTY: a frame exactly at MAX_FRAME_PAYLOAD remains valid"
    );
    assert!(
        Reader::payload_len_exceeds_max(segment::MAX_FRAME_PAYLOAD + 1),
        "PROPERTY: a frame one byte past MAX_FRAME_PAYLOAD must stop scan/recovery before allocation"
    );
}

#[test]
fn checked_batch_count_rejects_vacuous_or_implausible_counts() {
    assert!(Reader::checked_batch_count(1, 0, 0).is_err());
    assert!(Reader::checked_batch_count(1, 0, MAX_BATCH_RECOVERY_ITEMS + 1).is_err());
    assert_eq!(
        Reader::checked_batch_count(1, 0, MAX_BATCH_RECOVERY_ITEMS)
            .expect("max batch count remains valid"),
        MAX_BATCH_RECOVERY_ITEMS,
        "PROPERTY: the exact MAX_BATCH_RECOVERY_ITEMS boundary is allowed"
    );
    assert_eq!(
        Reader::checked_batch_count(1, 0, 3).expect("valid batch count"),
        3
    );
}

#[test]
fn required_index_hash_chain_rejects_missing_chain_for_data_event() {
    let event = IndexScanEvent {
        header: EventHeader::new(
            1,
            1,
            None,
            1,
            crate::coordinate::DagPosition::root(),
            0,
            EventKind::DATA,
        ),
        _payload: serde::de::IgnoredAny,
        hash_chain: None,
    };

    let err = Reader::required_index_hash_chain(&event, 7, 99).expect_err("missing hash chain");
    assert!(
        matches!(
            err,
            StoreError::CorruptSegment { segment_id: 7, ref detail }
            if detail.contains("missing hash_chain")
        ),
        "PROPERTY: missing hash_chain must surface as CorruptSegment with the expected detail, got {err:?}"
    );
}