use std::future::Future;
use std::sync::Arc;
use lash_core::runtime::{QueuedWorkBatchDraft, QueuedWorkClaimBoundary, QueuedWorkPayload};
use lash_core::{
DeliveryPolicy, PluginSessionSnapshot, RuntimeCommit, RuntimePersistence, RuntimeSessionState,
SlotPolicy, StoreError, ToolState, TurnInput,
};
use lash_sqlite_store::Store;
fn unique_db_path(name: &str) -> std::path::PathBuf {
let dir = std::env::temp_dir().join(format!(
"lash-storage-fixes-{name}-{}-{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
));
std::fs::create_dir_all(&dir).expect("temp dir");
dir.join("session.db")
}
fn block_on<T>(future: impl Future<Output = T>) -> T {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("runtime")
.block_on(future)
}
fn commit_at(session_id: &str, expected_head_revision: Option<u64>) -> RuntimeCommit {
let state = RuntimeSessionState {
session_id: session_id.to_string(),
..RuntimeSessionState::default()
};
RuntimeCommit {
expected_head_revision,
..RuntimeCommit::persisted_state(&state, &[])
}
}
#[test]
fn head_revision_cas_holds_across_two_connections() {
let path = unique_db_path("cas");
let barrier = Arc::new(std::sync::Barrier::new(2));
let run = |path: std::path::PathBuf, barrier: Arc<std::sync::Barrier>| {
std::thread::spawn(move || {
block_on(async move {
let store = Store::open(&path).await.expect("open store");
barrier.wait();
store.commit_runtime_state(commit_at("root", Some(0))).await
})
})
};
let handle_a = run(path.clone(), Arc::clone(&barrier));
let handle_b = run(path.clone(), Arc::clone(&barrier));
let result_a = handle_a.join().expect("thread a");
let result_b = handle_b.join().expect("thread b");
let winners = [&result_a, &result_b]
.iter()
.filter(|res| res.is_ok())
.count();
let conflicts = [&result_a, &result_b]
.iter()
.filter(|res| matches!(res, Err(StoreError::HeadRevisionConflict { .. })))
.count();
assert_eq!(
winners, 1,
"exactly one connection may win the CAS, got a={result_a:?} b={result_b:?}"
);
assert_eq!(
conflicts, 1,
"the loser must observe a clean HeadRevisionConflict (not a raw busy \
error or a second success), got a={result_a:?} b={result_b:?}"
);
let store = block_on(Store::open(&path)).expect("reopen store");
let read = block_on(store.load_session(lash_core::SessionReadScope::FullGraph))
.expect("load")
.expect("session present");
assert_eq!(read.head_revision, 1);
}
#[tokio::test]
async fn gc_keeps_live_committed_checkpoint_blobs() {
let store = Store::memory().await.expect("store");
let orphan = store
.put_artifact_blob(
lash_sqlite_store::BlobArtifactDescriptor::plugin_session_snapshot(),
b"orphan-blob",
)
.await;
let state = RuntimeSessionState {
session_id: "root".to_string(),
tool_state_snapshot: Some(ToolState::default().with_generation(3)),
plugin_snapshot: Some(PluginSessionSnapshot {
plugins: Default::default(),
}),
plugin_snapshot_revision: Some(5),
execution_state_snapshot: Some(vec![0xDE, 0xAD, 0xBE, 0xEF]),
..RuntimeSessionState::default()
};
let result = store
.commit_runtime_state(RuntimeCommit {
expected_head_revision: Some(0),
..RuntimeCommit::persisted_state(&state, &[])
})
.await
.expect("commit");
let report = store.gc_unreachable().await;
assert!(
report.deleted_blob_count >= 1,
"the orphan blob should be collected, report={report:?}"
);
assert!(
store.get_blob(&orphan).await.is_none(),
"orphan blob must be collected"
);
assert!(
store.get_blob(&result.checkpoint_ref).await.is_some(),
"live checkpoint manifest must survive gc"
);
let manifest = store
.get_checkpoint(&result.checkpoint_ref)
.await
.expect("checkpoint manifest");
for blob_ref in [
manifest.tool_state_ref.as_ref(),
manifest.plugin_snapshot_ref.as_ref(),
manifest.execution_state_ref.as_ref(),
]
.into_iter()
.flatten()
{
assert!(
store.get_blob(blob_ref).await.is_some(),
"live checkpoint child blob {blob_ref} must survive gc"
);
}
}
fn exclusive_draft(session_id: &str, text: &str) -> QueuedWorkBatchDraft {
QueuedWorkBatchDraft::new(
session_id,
DeliveryPolicy::EarliestSafeBoundary,
SlotPolicy::Exclusive,
vec![QueuedWorkPayload::turn_input(TurnInput::text(text))],
)
}
#[tokio::test]
async fn second_claim_on_held_batch_is_not_won() {
let store = Store::memory().await.expect("store");
store
.enqueue_queued_work(exclusive_draft("root", "work"))
.await
.expect("enqueue");
let claim_a = store
.claim_ready_queued_work("root", "owner-a", QueuedWorkClaimBoundary::Idle, 60_000, 10)
.await
.expect("claim a")
.expect("owner a wins the only batch");
assert_eq!(claim_a.batches.len(), 1);
let claim_b = store
.claim_ready_queued_work("root", "owner-b", QueuedWorkClaimBoundary::Idle, 60_000, 10)
.await
.expect("claim b");
assert!(
claim_b.is_none(),
"a batch already held by a live claim must not be re-claimed, got {claim_b:?}"
);
store
.renew_queued_work_claim(&claim_a, 60_000)
.await
.expect("owner a can still renew its uncontended claim");
}
#[test]
fn concurrent_claims_never_double_own_a_batch() {
let path = unique_db_path("claim-race");
block_on(async {
let seed = Store::open(&path).await.expect("seed store");
seed.enqueue_queued_work(exclusive_draft("root", "work"))
.await
.expect("enqueue");
});
let barrier = Arc::new(std::sync::Barrier::new(2));
let run = |owner: &'static str, path: std::path::PathBuf, barrier: Arc<std::sync::Barrier>| {
std::thread::spawn(move || {
block_on(async move {
let store = Store::open(&path).await.expect("open store");
barrier.wait();
store
.claim_ready_queued_work(
"root",
owner,
QueuedWorkClaimBoundary::Idle,
60_000,
10,
)
.await
})
})
};
let handle_a = run("owner-a", path.clone(), Arc::clone(&barrier));
let handle_b = run("owner-b", path.clone(), Arc::clone(&barrier));
let result_a = handle_a.join().expect("thread a");
let result_b = handle_b.join().expect("thread b");
let mut winners = Vec::new();
for result in [result_a, result_b] {
match result {
Ok(Some(claim)) => winners.push(claim),
Ok(None) => {}
Err(err) => panic!("a contended claim must resolve cleanly, got error: {err:?}"),
}
}
assert!(
winners.len() <= 1,
"at most one owner may win the single batch, got {} winners",
winners.len()
);
if let Some(claim) = winners.first() {
let verify = block_on(Store::open(&path)).expect("verify store");
block_on(verify.renew_queued_work_claim(claim, 60_000))
.expect("the winning claim must actually own its batch");
}
}
#[tokio::test]
async fn unsupported_schema_error_reports_real_versions() {
let path = unique_db_path("schema");
{
let conn = rusqlite::Connection::open(&path).expect("open raw");
conn.execute_batch("CREATE TABLE legacy (id INTEGER); PRAGMA user_version = 99;")
.expect("seed legacy schema");
}
let message = match Store::open(&path).await {
Ok(_) => panic!("opening an unsupported schema must fail"),
Err(err) => err.to_string(),
};
assert!(
message.contains("99"),
"error must report the found version 99: {message}"
);
assert!(
message.contains("schema version 3") || message.contains("version 3"),
"error must report the real expected version 3: {message}"
);
assert!(
!message.contains("version 1 only"),
"error must not carry the stale 'version 1 only' text: {message}"
);
}
#[test]
fn concurrent_first_open_never_observes_version_zero_schema() {
let path = unique_db_path("concurrent-schema");
let workers = 16;
let barrier = Arc::new(std::sync::Barrier::new(workers));
let handles = (0..workers)
.map(|_| {
let path = path.clone();
let barrier = Arc::clone(&barrier);
std::thread::spawn(move || {
barrier.wait();
block_on(Store::open(&path))
.map(|_| ())
.map_err(|err| err.to_string())
})
})
.collect::<Vec<_>>();
for handle in handles {
handle
.join()
.expect("schema-open worker")
.expect("concurrent first open should succeed");
}
let conn = rusqlite::Connection::open(&path).expect("open initialized db");
let user_version: i32 = conn
.query_row("PRAGMA user_version", [], |row| row.get(0))
.expect("read user_version");
assert_eq!(user_version, 3);
}