batpak 0.3.0

Event sourcing with causal graphs and policy gates. Sync API, zero async.
Documentation
use super::*;
use tempfile::TempDir;

fn test_store_with_writer(tx: flume::Sender<writer::WriterCommand>) -> (Store, TempDir) {
    let dir = TempDir::new().expect("temp dir");
    let subscribers = Arc::new(writer::SubscriberList::new());
    let store = Store {
        index: Arc::new(index::StoreIndex::new()),
        reader: Arc::new(reader::Reader::new(dir.path().to_path_buf(), 4)),
        cache: Box::new(NoCache),
        writer: writer::WriterHandle::from_parts_for_test(tx, subscribers),
        config: Arc::new(StoreConfig::new(dir.path().to_path_buf())),
        should_shutdown_on_drop: true,
        _state: std::marker::PhantomData,
    };
    (store, dir)
}

#[test]
fn sync_reports_writer_crash_when_channel_is_closed() {
    let (tx, rx) = flume::bounded(1);
    drop(rx);
    let (store, _dir) = test_store_with_writer(tx);

    assert!(
        matches!(Store::sync(&store), Err(StoreError::WriterCrashed)),
        "PROPERTY: Store::sync must surface WriterCrashed when the writer channel is disconnected.\n\
         Investigate: src/store/mod.rs Store::sync and src/store/maintenance.rs sync.\n\
         Common causes: sync() returning success without contacting the writer, disconnected sends being ignored."
    );
}

#[test]
fn drop_sends_shutdown_to_writer_thread() {
    let (tx, rx) = flume::bounded(1);
    let (signal_tx, signal_rx) = flume::bounded::<()>(1);
    let _listener = std::thread::Builder::new()
        .name("store-drop-shutdown-test".into())
        .spawn(move || {
            if let Ok(writer::WriterCommand::Shutdown { respond }) = rx.recv() {
                let _ = respond.send(Ok(()));
                let _ = signal_tx.send(());
            }
        })
        .expect("spawn shutdown listener");

    let (store, _dir) = test_store_with_writer(tx);
    drop(store);

    assert!(
        signal_rx
            .recv_timeout(std::time::Duration::from_millis(500))
            .is_ok(),
        "PROPERTY: dropping Store without close() must send a Shutdown command to the writer for best-effort draining.\n\
         Investigate: src/store/mod.rs Drop for Store.\n\
         Common causes: Drop body removed, shutdown command not sent, or shutdown path returning before notifying the writer."
    );
}

#[test]
fn checked_payload_len_returns_exact_serialized_length() {
    assert_eq!(
        checked_payload_len(&[1, 2, 3, 4]).expect("payload len"),
        4,
        "PROPERTY: checked_payload_len must preserve the exact payload byte length.\n\
         Investigate: src/store/contracts.rs checked_payload_len.\n\
         Common causes: helper replaced with a constant, off-by-one, or truncated length conversion."
    );
}

#[test]
fn now_us_moves_forward_over_real_time() {
    let first = now_us();
    std::thread::sleep(std::time::Duration::from_millis(2));
    let second = now_us();

    assert!(
        first > 0,
        "PROPERTY: now_us must return a positive microsecond timestamp since the Unix epoch."
    );
    assert!(
        second > first,
        "PROPERTY: now_us must advance as wall-clock time moves forward.\n\
         Investigate: src/store/config.rs now_us.\n\
         Common causes: helper replaced with a constant or non-monotonic sentinel."
    );
}