lash-sqlite-store 0.1.0-alpha.42

SQLite-backed session store for the lash agent runtime.
Documentation
//! Regression tests for the storage review fixes:
//!
//! * head-revision CAS holds across two independent connections to the same
//!   file database (the `BEGIN IMMEDIATE` fix),
//! * a contended queued-work claim reports not-claimed instead of a false
//!   success (the rows-affected check),
//! * a poisoned connection mutex recovers instead of bricking the store,
//! * the unsupported-schema error reports the real expected/found versions,
//! * concurrent first opens do not expose a schema-version-0 store,
//! * `gc_unreachable` never panics on a corrupt rooted manifest and keeps
//!   every blob in that conservative case.

use std::future::Future;
use std::sync::Arc;

use lash_core::runtime::{QueuedWorkBatchDraft, QueuedWorkClaimBoundary, QueuedWorkPayload};
use lash_core::{
    DeliveryPolicy, PluginSessionSnapshot, RuntimeCommit, RuntimePersistence, RuntimeSessionState,
    SlotPolicy, StoreError, ToolState, TurnInput,
};
use lash_sqlite_store::Store;

fn unique_db_path(name: &str) -> std::path::PathBuf {
    let dir = std::env::temp_dir().join(format!(
        "lash-storage-fixes-{name}-{}-{}",
        std::process::id(),
        std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .unwrap()
            .as_nanos()
    ));
    std::fs::create_dir_all(&dir).expect("temp dir");
    dir.join("session.db")
}

fn block_on<T>(future: impl Future<Output = T>) -> T {
    tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .expect("runtime")
        .block_on(future)
}

fn commit_at(session_id: &str, expected_head_revision: Option<u64>) -> RuntimeCommit {
    let state = RuntimeSessionState {
        session_id: session_id.to_string(),
        ..RuntimeSessionState::default()
    };
    RuntimeCommit {
        expected_head_revision,
        ..RuntimeCommit::persisted_state(&state, &[])
    }
}

// Finding 1: the head-revision compare-and-set must serialize across two
// independent connections to the *same* file database. Two threads, each with
// its own connection, both read head revision 0 and then commit with
// `expected_head_revision = Some(0)` as concurrently as a barrier can arrange.
// Under `BEGIN IMMEDIATE` the second writer blocks on the busy timeout, then
// reads the now-bumped revision and returns a clean `HeadRevisionConflict`;
// exactly one commit applies and the persisted head ends at revision 1. Under
// the old `BEGIN DEFERRED` both reads ran on a shared snapshot, letting the
// losing writer either double-apply or fail with a raw busy error instead of a
// clean conflict.
#[test]
fn head_revision_cas_holds_across_two_connections() {
    let path = unique_db_path("cas");

    let barrier = Arc::new(std::sync::Barrier::new(2));
    let run = |path: std::path::PathBuf, barrier: Arc<std::sync::Barrier>| {
        std::thread::spawn(move || {
            block_on(async move {
                let store = Store::open(&path).await.expect("open store");
                barrier.wait();
                store.commit_runtime_state(commit_at("root", Some(0))).await
            })
        })
    };

    let handle_a = run(path.clone(), Arc::clone(&barrier));
    let handle_b = run(path.clone(), Arc::clone(&barrier));
    let result_a = handle_a.join().expect("thread a");
    let result_b = handle_b.join().expect("thread b");

    let winners = [&result_a, &result_b]
        .iter()
        .filter(|res| res.is_ok())
        .count();
    let conflicts = [&result_a, &result_b]
        .iter()
        .filter(|res| matches!(res, Err(StoreError::HeadRevisionConflict { .. })))
        .count();
    assert_eq!(
        winners, 1,
        "exactly one connection may win the CAS, got a={result_a:?} b={result_b:?}"
    );
    assert_eq!(
        conflicts, 1,
        "the loser must observe a clean HeadRevisionConflict (not a raw busy \
         error or a second success), got a={result_a:?} b={result_b:?}"
    );

    // The persisted head must reflect exactly one applied commit.
    let store = block_on(Store::open(&path)).expect("reopen store");
    let read = block_on(store.load_session(lash_core::SessionReadScope::FullGraph))
        .expect("load")
        .expect("session present");
    assert_eq!(read.head_revision, 1);
}

// Finding 5: a checkpoint committed through the real `commit_runtime_state`
// path carries tool / plugin / execution snapshot blobs. `gc_unreachable` must
// treat that live checkpoint's child blobs as reachable and keep them, while
// still collecting genuinely orphaned blobs — and it must never panic inside
// the commit while doing so.
#[tokio::test]
async fn gc_keeps_live_committed_checkpoint_blobs() {
    let store = Store::memory().await.expect("store");
    let orphan = store
        .put_artifact_blob(
            lash_sqlite_store::BlobArtifactDescriptor::plugin_session_snapshot(),
            b"orphan-blob",
        )
        .await;

    let state = RuntimeSessionState {
        session_id: "root".to_string(),
        tool_state_snapshot: Some(ToolState::default().with_generation(3)),
        plugin_snapshot: Some(PluginSessionSnapshot {
            plugins: Default::default(),
        }),
        plugin_snapshot_revision: Some(5),
        execution_state_snapshot: Some(vec![0xDE, 0xAD, 0xBE, 0xEF]),
        ..RuntimeSessionState::default()
    };
    let result = store
        .commit_runtime_state(RuntimeCommit {
            expected_head_revision: Some(0),
            ..RuntimeCommit::persisted_state(&state, &[])
        })
        .await
        .expect("commit");

    let report = store.gc_unreachable().await;
    assert!(
        report.deleted_blob_count >= 1,
        "the orphan blob should be collected, report={report:?}"
    );
    assert!(
        store.get_blob(&orphan).await.is_none(),
        "orphan blob must be collected"
    );

    // The live committed checkpoint manifest and every snapshot it references
    // must survive GC.
    assert!(
        store.get_blob(&result.checkpoint_ref).await.is_some(),
        "live checkpoint manifest must survive gc"
    );
    let manifest = store
        .get_checkpoint(&result.checkpoint_ref)
        .await
        .expect("checkpoint manifest");
    for blob_ref in [
        manifest.tool_state_ref.as_ref(),
        manifest.plugin_snapshot_ref.as_ref(),
        manifest.execution_state_ref.as_ref(),
    ]
    .into_iter()
    .flatten()
    {
        assert!(
            store.get_blob(blob_ref).await.is_some(),
            "live checkpoint child blob {blob_ref} must survive gc"
        );
    }
}

fn exclusive_draft(session_id: &str, text: &str) -> QueuedWorkBatchDraft {
    QueuedWorkBatchDraft::new(
        session_id,
        DeliveryPolicy::EarliestSafeBoundary,
        SlotPolicy::Exclusive,
        vec![QueuedWorkPayload::turn_input(TurnInput::text(text))],
    )
}

// Finding 2 (sequential): when a batch is already held by a live claim, a
// second claim must not "succeed" with a claim that doesn't actually own the
// row. Owner A claims the only ready batch; owner B then asks to claim and must
// get `None`.
#[tokio::test]
async fn second_claim_on_held_batch_is_not_won() {
    let store = Store::memory().await.expect("store");
    store
        .enqueue_queued_work(exclusive_draft("root", "work"))
        .await
        .expect("enqueue");

    let claim_a = store
        .claim_ready_queued_work("root", "owner-a", QueuedWorkClaimBoundary::Idle, 60_000, 10)
        .await
        .expect("claim a")
        .expect("owner a wins the only batch");
    assert_eq!(claim_a.batches.len(), 1);

    let claim_b = store
        .claim_ready_queued_work("root", "owner-b", QueuedWorkClaimBoundary::Idle, 60_000, 10)
        .await
        .expect("claim b");
    assert!(
        claim_b.is_none(),
        "a batch already held by a live claim must not be re-claimed, got {claim_b:?}"
    );

    // Owner A's claim is still the live, renewable one.
    store
        .renew_queued_work_claim(&claim_a, 60_000)
        .await
        .expect("owner a can still renew its uncontended claim");
}

// Finding 2 (concurrent): two owners on two connections race for the same
// single ready batch. The claim is read-then-write, so without the
// rows-affected check (and the `BEGIN IMMEDIATE` that serializes the read with
// the write) both could believe they won. At most one claim may succeed, and a
// successful claim must actually own the batch (provable by renewing it).
#[test]
fn concurrent_claims_never_double_own_a_batch() {
    let path = unique_db_path("claim-race");
    block_on(async {
        let seed = Store::open(&path).await.expect("seed store");
        seed.enqueue_queued_work(exclusive_draft("root", "work"))
            .await
            .expect("enqueue");
    });

    let barrier = Arc::new(std::sync::Barrier::new(2));
    let run = |owner: &'static str, path: std::path::PathBuf, barrier: Arc<std::sync::Barrier>| {
        std::thread::spawn(move || {
            block_on(async move {
                let store = Store::open(&path).await.expect("open store");
                barrier.wait();
                store
                    .claim_ready_queued_work(
                        "root",
                        owner,
                        QueuedWorkClaimBoundary::Idle,
                        60_000,
                        10,
                    )
                    .await
            })
        })
    };

    let handle_a = run("owner-a", path.clone(), Arc::clone(&barrier));
    let handle_b = run("owner-b", path.clone(), Arc::clone(&barrier));
    let result_a = handle_a.join().expect("thread a");
    let result_b = handle_b.join().expect("thread b");

    let mut winners = Vec::new();
    for result in [result_a, result_b] {
        match result {
            Ok(Some(claim)) => winners.push(claim),
            Ok(None) => {}
            Err(err) => panic!("a contended claim must resolve cleanly, got error: {err:?}"),
        }
    }
    assert!(
        winners.len() <= 1,
        "at most one owner may win the single batch, got {} winners",
        winners.len()
    );
    if let Some(claim) = winners.first() {
        // A successful claim must really own the batch: renewing it must update
        // exactly the claimed batch.
        let verify = block_on(Store::open(&path)).expect("verify store");
        block_on(verify.renew_queued_work_claim(claim, 60_000))
            .expect("the winning claim must actually own its batch");
    }
}

// Finding 7: opening a database stamped with an unsupported schema version must
// report the actual expected and found versions, not a stale "version 1 only".
#[tokio::test]
async fn unsupported_schema_error_reports_real_versions() {
    let path = unique_db_path("schema");
    {
        let conn = rusqlite::Connection::open(&path).expect("open raw");
        // Create a user object and stamp a bogus, unsupported user_version so
        // the store's open path takes the reject branch.
        conn.execute_batch("CREATE TABLE legacy (id INTEGER); PRAGMA user_version = 99;")
            .expect("seed legacy schema");
    }

    let message = match Store::open(&path).await {
        Ok(_) => panic!("opening an unsupported schema must fail"),
        Err(err) => err.to_string(),
    };
    assert!(
        message.contains("99"),
        "error must report the found version 99: {message}"
    );
    assert!(
        message.contains("schema version 3") || message.contains("version 3"),
        "error must report the real expected version 3: {message}"
    );
    assert!(
        !message.contains("version 1 only"),
        "error must not carry the stale 'version 1 only' text: {message}"
    );
}

#[test]
fn concurrent_first_open_never_observes_version_zero_schema() {
    let path = unique_db_path("concurrent-schema");
    let workers = 16;
    let barrier = Arc::new(std::sync::Barrier::new(workers));
    let handles = (0..workers)
        .map(|_| {
            let path = path.clone();
            let barrier = Arc::clone(&barrier);
            std::thread::spawn(move || {
                barrier.wait();
                block_on(Store::open(&path))
                    .map(|_| ())
                    .map_err(|err| err.to_string())
            })
        })
        .collect::<Vec<_>>();

    for handle in handles {
        handle
            .join()
            .expect("schema-open worker")
            .expect("concurrent first open should succeed");
    }
    let conn = rusqlite::Connection::open(&path).expect("open initialized db");
    let user_version: i32 = conn
        .query_row("PRAGMA user_version", [], |row| row.get(0))
        .expect("read user_version");
    assert_eq!(user_version, 3);
}