use super::*;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum AttachmentManifestConformance {
Persistent,
Noop,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct RuntimePersistenceConformance {
pub attachment_manifest: AttachmentManifestConformance,
pub durability_tier: crate::DurabilityTier,
}
impl RuntimePersistenceConformance {
pub const fn persistent_attachment_manifest(durability_tier: crate::DurabilityTier) -> Self {
Self {
attachment_manifest: AttachmentManifestConformance::Persistent,
durability_tier,
}
}
pub const fn noop_attachment_manifest(durability_tier: crate::DurabilityTier) -> Self {
Self {
attachment_manifest: AttachmentManifestConformance::Noop,
durability_tier,
}
}
}
impl Default for RuntimePersistenceConformance {
fn default() -> Self {
Self {
attachment_manifest: AttachmentManifestConformance::Persistent,
durability_tier: crate::DurabilityTier::Durable,
}
}
}
pub async fn runtime_persistence<F>(make: F)
where
F: Fn() -> Arc<dyn RuntimePersistence>,
{
runtime_persistence_with_options(
make,
RuntimePersistenceConformance::persistent_attachment_manifest(
crate::DurabilityTier::Inline,
),
)
.await;
}
pub async fn runtime_persistence_reopenable<F>(make: F)
where
F: Fn() -> ReopenableRuntimePersistence,
{
runtime_persistence_with_options(
|| make().open,
RuntimePersistenceConformance::persistent_attachment_manifest(
crate::DurabilityTier::Durable,
),
)
.await;
runtime_persistence_survives_reopen(make()).await;
}
pub async fn runtime_persistence_with_options<F>(make: F, options: RuntimePersistenceConformance)
where
F: Fn() -> Arc<dyn RuntimePersistence>,
{
runtime_persistence_reports_declared_durability(make(), options.durability_tier).await;
commit_increments_head_and_round_trips_agent_frames(make()).await;
commit_rejects_a_different_session_id(make()).await;
load_hydrates_checkpoint_and_usage(make()).await;
active_path_read_scope_selects_only_requested_ancestry(make()).await;
match options.attachment_manifest {
AttachmentManifestConformance::Persistent => {
attachment_manifest_records_intent_and_commit_stamps(make()).await;
}
AttachmentManifestConformance::Noop => {
noop_attachment_manifest_is_explicit_and_empty(make()).await;
}
}
queued_work_source_keys_are_idempotent_and_list_ordered(make()).await;
queued_work_cancel_removes_only_unclaimed_batches(make()).await;
queued_work_exact_claim_uses_selected_batch_ids(make()).await;
queued_work_claims_respect_boundaries_renewal_and_abandon(make()).await;
queued_work_respects_availability_limits_exclusivity_reclaim_and_sessions(make()).await;
queued_work_join_groups_by_delivery_policy_and_merge_key(make()).await;
queued_work_completion_is_lease_guarded(make()).await;
queued_wake_delivery_is_source_key_idempotent_and_claimed_once(make()).await;
queue_completion_and_turn_commit_stamp_are_atomic(make()).await;
session_metadata_round_trips(make()).await;
tombstone_vacuum_and_gc_are_minimally_consistent(make()).await;
final_commit_stamp_is_idempotent_and_conflicts_on_changed_hash(make()).await;
}
pub fn queued_turn_input_draft(
session_id: &str,
text: &str,
delivery_policy: DeliveryPolicy,
slot_policy: SlotPolicy,
) -> QueuedWorkBatchDraft {
QueuedWorkBatchDraft::new(
session_id,
delivery_policy,
slot_policy,
vec![QueuedWorkPayload::turn_input(TurnInput::text(text))],
)
}
fn queued_draft(
session_id: &str,
text: &str,
delivery_policy: DeliveryPolicy,
slot_policy: SlotPolicy,
) -> QueuedWorkBatchDraft {
queued_turn_input_draft(session_id, text, delivery_policy, slot_policy)
}
fn queued_batch_text(batch: &QueuedWorkBatch) -> Option<&str> {
let payload = batch.items.first().map(|item| &item.payload)?;
match payload {
QueuedWorkPayload::TurnInput { input } => input.items.first().and_then(|item| match item {
crate::InputItem::Text { text } => Some(text.as_str()),
crate::InputItem::ImageRef { .. } => None,
}),
QueuedWorkPayload::ProcessWake { .. } | QueuedWorkPayload::SessionCommand { .. } => None,
}
}
fn sample_session_node(id: &str, parent: Option<&str>) -> SessionNodeRecord {
SessionNodeRecord {
node_id: id.to_string(),
parent_node_id: parent.map(ToOwned::to_owned),
caused_by: None,
agent_frame_id: None,
timestamp: "1970-01-01T00:00:00Z".to_string(),
payload: SessionNodePayload::Event {
event: crate::SessionEventRecord::Protocol(
ProtocolEvent::typed("conformance", serde_json::json!({ "node": id }))
.expect("protocol event"),
),
},
}
}
fn attachment_intent(id: &str) -> AttachmentIntent {
AttachmentIntent {
attachment_id: AttachmentId::new(id.to_string()),
session_id: "root".to_string(),
canonical_uri: format!("sha256:{id}"),
intent_at_epoch_ms: 100,
}
}
async fn commit_increments_head_and_round_trips_agent_frames(store: Arc<dyn RuntimePersistence>) {
let mut state = RuntimeSessionState {
session_id: "root".to_string(),
policy: SessionPolicy {
model: ModelSpec::from_token_limits("gpt-5.4-mini", None, 200_000, None)
.expect("valid model spec"),
..SessionPolicy::default()
},
..RuntimeSessionState::default()
};
state.ensure_agent_frame_initialized();
let previous_frame_id = state.current_agent_frame_id.clone();
let assignment = state
.current_agent_frame()
.expect("initial frame")
.assignment
.clone();
let custom_reason = AgentFrameReason::new("plan_mode");
state.append_agent_frame(AgentFrameRecord::new(
"frame-2".to_string(),
"root".to_string(),
Some(previous_frame_id),
custom_reason.clone(),
None,
assignment,
ProtocolTurnOptions::default(),
));
state.set_execution_state_snapshot(Some(b"frame-vm".to_vec()));
store
.commit_runtime_state(RuntimeCommit::persisted_state(&state, &[]))
.await
.expect("commit runtime state");
let read = store
.load_session(SessionReadScope::FullGraph)
.await
.expect("load session")
.expect("session read");
assert_eq!(read.current_agent_frame_id, "frame-2");
assert_eq!(read.agent_frames.len(), 2);
let current = read
.agent_frames
.iter()
.find(|frame| frame.frame_id == "frame-2")
.expect("current frame");
assert_eq!(current.reason, custom_reason);
assert_eq!(
current.execution_state_snapshot.as_deref(),
Some(&b"frame-vm"[..])
);
assert_eq!(
read.checkpoint
.as_ref()
.and_then(|checkpoint| checkpoint.execution_state.as_deref()),
Some(&b"frame-vm"[..])
);
}
async fn commit_rejects_a_different_session_id(store: Arc<dyn RuntimePersistence>) {
let alpha = RuntimeSessionState {
session_id: "alpha".to_string(),
..RuntimeSessionState::default()
};
store
.commit_runtime_state(RuntimeCommit::persisted_state(&alpha, &[]))
.await
.expect("first commit binds the session");
let beta = RuntimeSessionState {
session_id: "beta".to_string(),
..RuntimeSessionState::default()
};
let result = store
.commit_runtime_state(RuntimeCommit::persisted_state(&beta, &[]))
.await;
assert!(
result.is_err(),
"a single-session store must reject a commit for a different session id"
);
}
async fn load_hydrates_checkpoint_and_usage(store: Arc<dyn RuntimePersistence>) {
let state = RuntimeSessionState {
session_id: "hydrated".to_string(),
tool_state_snapshot: Some(ToolState::default().with_generation(9)),
plugin_snapshot_revision: Some(12),
plugin_snapshot: Some(PluginSessionSnapshot {
plugins: Default::default(),
}),
..RuntimeSessionState::default()
};
let usage = TokenLedgerEntry {
source: "turn".to_string(),
model: "mock-model".to_string(),
usage: TokenUsage {
input_tokens: 11,
output_tokens: 7,
cached_input_tokens: 3,
reasoning_tokens: 5,
},
};
store
.commit_runtime_state(RuntimeCommit::persisted_state(&state, &[usage]))
.await
.expect("commit");
let read = store
.load_session(SessionReadScope::FullGraph)
.await
.expect("load")
.expect("session");
let checkpoint = read.checkpoint.expect("checkpoint");
assert_eq!(read.session_id, "hydrated");
assert_eq!(
checkpoint
.tool_state
.expect("dynamic snapshot")
.generation(),
9
);
assert_eq!(checkpoint.plugin_snapshot_revision, Some(12));
assert_eq!(read.token_ledger.len(), 1);
assert_eq!(read.token_ledger[0].usage.input_tokens, 11);
}
async fn runtime_persistence_reports_declared_durability(
store: Arc<dyn RuntimePersistence>,
expected_tier: crate::DurabilityTier,
) {
assert_eq!(
store.durability_tier(),
expected_tier,
"runtime persistence conformance must pin the backend's declared durability tier"
);
}
async fn active_path_read_scope_selects_only_requested_ancestry(
store: Arc<dyn RuntimePersistence>,
) {
let graph = crate::SessionGraph::from_nodes(
vec![
sample_session_node("root-node", None),
sample_session_node("left-node", Some("root-node")),
sample_session_node("left-leaf", Some("left-node")),
sample_session_node("right-leaf", Some("root-node")),
],
Some("left-leaf".to_string()),
);
let state = RuntimeSessionState {
session_id: "branchy".to_string(),
session_graph: graph,
graph_replace_required: true,
..RuntimeSessionState::default()
};
store
.commit_runtime_state(RuntimeCommit::persisted_state(&state, &[]))
.await
.expect("commit branchy graph");
let full = store
.load_session(SessionReadScope::FullGraph)
.await
.expect("load full graph")
.expect("full graph exists");
assert_eq!(
full.graph
.nodes
.iter()
.map(|node| node.node_id.as_str())
.collect::<Vec<_>>(),
vec!["root-node", "left-node", "left-leaf", "right-leaf"],
"FullGraph must retain every non-tombstoned branch"
);
let persisted_leaf_path = store
.load_session(SessionReadScope::ActivePath { leaf_node_id: None })
.await
.expect("load persisted active path")
.expect("active path exists");
assert_eq!(
persisted_leaf_path
.graph
.nodes
.iter()
.map(|node| node.node_id.as_str())
.collect::<Vec<_>>(),
vec!["root-node", "left-node", "left-leaf"],
"ActivePath with no explicit leaf must use the persisted leaf and hide sibling branches"
);
assert_eq!(
persisted_leaf_path.graph.leaf_node_id.as_deref(),
Some("left-leaf")
);
let explicit_right_path = store
.load_session(SessionReadScope::ActivePath {
leaf_node_id: Some("right-leaf".to_string()),
})
.await
.expect("load explicit active path")
.expect("explicit active path exists");
assert_eq!(
explicit_right_path
.graph
.nodes
.iter()
.map(|node| node.node_id.as_str())
.collect::<Vec<_>>(),
vec!["root-node", "right-leaf"],
"ActivePath with an explicit leaf must select that ancestry, not the persisted leaf"
);
assert_eq!(
explicit_right_path.graph.leaf_node_id.as_deref(),
Some("right-leaf")
);
}
async fn attachment_manifest_records_intent_and_commit_stamps(store: Arc<dyn RuntimePersistence>) {
let committed_by_runtime = AttachmentId::new("runtime-commit".to_string());
let committed_out_of_band = AttachmentId::new("manual-commit".to_string());
let orphan = AttachmentId::new("orphan".to_string());
for id in [&committed_by_runtime, &committed_out_of_band, &orphan] {
store
.record_intent(attachment_intent(id.as_str()))
.expect("record attachment intent");
}
let mut uncommitted = store
.list_uncommitted(200)
.expect("list uncommitted attachment intents");
uncommitted.sort_by(|left, right| left.attachment_id.cmp(&right.attachment_id));
assert_eq!(uncommitted.len(), 3);
store
.commit_refs("root", std::slice::from_ref(&committed_out_of_band))
.expect("commit attachment ref out of band");
let state = RuntimeSessionState {
session_id: "root".to_string(),
..RuntimeSessionState::default()
};
store
.commit_runtime_state(
RuntimeCommit::persisted_state(&state, &[])
.with_committed_attachments([committed_by_runtime.clone()]),
)
.await
.expect("runtime commit stamps attachment manifest");
let still_uncommitted = store
.list_uncommitted(200)
.expect("list remaining uncommitted attachments");
assert_eq!(still_uncommitted.len(), 1);
assert_eq!(still_uncommitted[0].attachment_id, orphan);
assert!(still_uncommitted[0].committed_at_epoch_ms.is_none());
store.forget(&orphan).expect("forget orphan attachment");
assert!(
store
.list_uncommitted(200)
.expect("list after forget")
.is_empty()
);
}
async fn noop_attachment_manifest_is_explicit_and_empty(store: Arc<dyn RuntimePersistence>) {
let attachment = AttachmentId::new("noop".to_string());
store
.record_intent(attachment_intent(attachment.as_str()))
.expect("noop record intent succeeds");
store
.commit_refs("root", std::slice::from_ref(&attachment))
.expect("noop commit refs succeeds");
assert!(
store
.list_uncommitted(200)
.expect("noop list uncommitted")
.is_empty(),
"declared no-op attachment manifests must not retain intent rows"
);
store.forget(&attachment).expect("noop forget succeeds");
}
async fn queued_work_source_keys_are_idempotent_and_list_ordered(
store: Arc<dyn RuntimePersistence>,
) {
let first = store
.enqueue_queued_work(
queued_draft(
"root",
"first",
DeliveryPolicy::EarliestSafeBoundary,
SlotPolicy::Join,
)
.with_source_key("source:first"),
)
.await
.expect("enqueue first batch");
let replay = store
.enqueue_queued_work(
queued_draft(
"root",
"different replay payload",
DeliveryPolicy::EarliestSafeBoundary,
SlotPolicy::Join,
)
.with_source_key("source:first"),
)
.await
.expect("replay first batch");
let second = store
.enqueue_queued_work(queued_draft(
"root",
"second",
DeliveryPolicy::EarliestSafeBoundary,
SlotPolicy::Exclusive,
))
.await
.expect("enqueue second batch");
store
.enqueue_queued_work(queued_draft(
"other",
"other session",
DeliveryPolicy::EarliestSafeBoundary,
SlotPolicy::Exclusive,
))
.await
.expect("enqueue other session");
assert_eq!(
first.batch_id, replay.batch_id,
"replaying a source key must return the original batch"
);
assert_eq!(first.items[0].item_id, replay.items[0].item_id);
assert_eq!(
queued_batch_text(&replay),
Some("first"),
"source-key replay must return the original stored payload, not the replay attempt"
);
let listed = store
.list_queued_work("root")
.await
.expect("list queued work");
assert_eq!(
listed
.iter()
.map(|batch| batch.batch_id.as_str())
.collect::<Vec<_>>(),
vec![first.batch_id.as_str(), second.batch_id.as_str()]
);
assert!(listed[0].enqueue_seq < listed[1].enqueue_seq);
}
async fn queued_work_cancel_removes_only_unclaimed_batches(store: Arc<dyn RuntimePersistence>) {
let cancellable = store
.enqueue_queued_work(queued_draft(
"root",
"cancel me",
DeliveryPolicy::AfterCurrentTurnCommit,
SlotPolicy::Exclusive,
))
.await
.expect("enqueue cancellable batch");
let cancelled = store
.cancel_queued_work_batch("root", &cancellable.batch_id)
.await
.expect("cancel unclaimed batch")
.expect("unclaimed batch is returned");
assert_eq!(cancelled.batch_id, cancellable.batch_id);
assert_eq!(queued_batch_text(&cancelled), Some("cancel me"));
assert!(
store
.list_queued_work("root")
.await
.expect("list after cancellation")
.is_empty(),
"cancelled batches must be removed from the durable queue"
);
let claimed = store
.enqueue_queued_work(queued_draft(
"root",
"claimed",
DeliveryPolicy::AfterCurrentTurnCommit,
SlotPolicy::Exclusive,
))
.await
.expect("enqueue claimed batch");
let claim = store
.claim_ready_queued_work("root", "owner", QueuedWorkClaimBoundary::Idle, 60_000, 1)
.await
.expect("claim batch")
.expect("claim exists");
assert_eq!(claim.batches[0].batch_id, claimed.batch_id);
assert!(
store
.list_pending_queued_work("root")
.await
.expect("list pending during active claim")
.is_empty(),
"active claims must disappear from user-editable queue snapshots"
);
assert_eq!(
store
.list_queued_work("root")
.await
.expect("raw durable list during active claim")
.len(),
1,
"claimed batches remain durable until their claim is completed"
);
assert!(
store
.cancel_queued_work_batch("root", &claimed.batch_id)
.await
.expect("cancel active claim")
.is_none(),
"actively claimed batches must not be cancelled"
);
store
.abandon_queued_work_claim(&claim)
.await
.expect("abandon claim");
assert_eq!(
store
.list_pending_queued_work("root")
.await
.expect("list pending after abandoned claim")
.len(),
1,
"abandoned claims become user-editable queue work again"
);
assert!(
store
.cancel_queued_work_batch("root", &claimed.batch_id)
.await
.expect("cancel abandoned claim")
.is_some(),
"abandoned batches become cancellable again"
);
}
async fn queued_work_exact_claim_uses_selected_batch_ids(store: Arc<dyn RuntimePersistence>) {
let first = store
.enqueue_queued_work(queued_draft(
"root",
"first",
DeliveryPolicy::AfterCurrentTurnCommit,
SlotPolicy::Exclusive,
))
.await
.expect("enqueue first batch");
let second = store
.enqueue_queued_work(queued_draft(
"root",
"second",
DeliveryPolicy::AfterCurrentTurnCommit,
SlotPolicy::Exclusive,
))
.await
.expect("enqueue second batch");
assert!(
store
.claim_ready_queued_work_by_batch_ids(
"root",
"owner",
QueuedWorkClaimBoundary::Idle,
60_000,
std::slice::from_ref(&second.batch_id),
)
.await
.expect("claim out-of-order exact batch")
.is_none(),
"exact claims must not skip earlier durable queue work"
);
assert_eq!(
store
.list_pending_queued_work("root")
.await
.expect("list after rejected exact claim")
.iter()
.map(|batch| batch.batch_id.as_str())
.collect::<Vec<_>>(),
vec![first.batch_id.as_str(), second.batch_id.as_str()]
);
let claim = store
.claim_ready_queued_work_by_batch_ids(
"root",
"owner",
QueuedWorkClaimBoundary::Idle,
60_000,
std::slice::from_ref(&first.batch_id),
)
.await
.expect("claim first exact batch")
.expect("first exact claim exists");
assert_eq!(
claim
.batches
.iter()
.map(|batch| batch.batch_id.as_str())
.collect::<Vec<_>>(),
vec![first.batch_id.as_str()]
);
assert_eq!(
store
.list_pending_queued_work("root")
.await
.expect("list pending after exact claim")
.iter()
.map(|batch| batch.batch_id.as_str())
.collect::<Vec<_>>(),
vec![second.batch_id.as_str()]
);
}
async fn queued_work_claims_respect_boundaries_renewal_and_abandon(
store: Arc<dyn RuntimePersistence>,
) {
let after_commit = store
.enqueue_queued_work(queued_draft(
"root",
"after current commit",
DeliveryPolicy::AfterCurrentTurnCommit,
SlotPolicy::Exclusive,
))
.await
.expect("enqueue after-commit work");
let earliest = store
.enqueue_queued_work(queued_draft(
"root",
"earliest",
DeliveryPolicy::EarliestSafeBoundary,
SlotPolicy::Exclusive,
))
.await
.expect("enqueue earliest work");
assert!(
store
.claim_ready_queued_work(
"root",
"owner-a",
QueuedWorkClaimBoundary::ActiveTurnCheckpoint,
60_000,
10,
)
.await
.expect("checkpoint claim")
.is_none(),
"after-current-commit work at the queue head must wait for the idle boundary"
);
let idle_claim = store
.claim_ready_queued_work("root", "owner-a", QueuedWorkClaimBoundary::Idle, 60_000, 10)
.await
.expect("idle claim")
.expect("idle claim exists");
assert_eq!(idle_claim.batches.len(), 1);
assert_eq!(idle_claim.batches[0].batch_id, after_commit.batch_id);
let checkpoint_claim = store
.claim_ready_queued_work(
"root",
"owner-b",
QueuedWorkClaimBoundary::ActiveTurnCheckpoint,
60_000,
10,
)
.await
.expect("checkpoint claim after head is leased")
.expect("checkpoint claim exists");
assert_eq!(checkpoint_claim.batches[0].batch_id, earliest.batch_id);
store
.abandon_queued_work_claim(&idle_claim)
.await
.expect("abandon idle claim");
let reclaimed = store
.claim_ready_queued_work("root", "owner-c", QueuedWorkClaimBoundary::Idle, 60_000, 10)
.await
.expect("reclaim abandoned work")
.expect("reclaimed work exists");
assert_eq!(reclaimed.batches[0].batch_id, after_commit.batch_id);
assert!(
reclaimed.fencing_token > idle_claim.fencing_token,
"reclaiming abandoned work must advance the fencing token"
);
let renewed = store
.renew_queued_work_claim(&reclaimed, 60_000)
.await
.expect("renew queued work claim");
assert_eq!(renewed.claim_id, reclaimed.claim_id);
assert_eq!(renewed.lease_token, reclaimed.lease_token);
assert_eq!(renewed.batches[0].batch_id, reclaimed.batches[0].batch_id);
assert!(renewed.expires_at_epoch_ms >= reclaimed.expires_at_epoch_ms);
}
async fn queued_work_respects_availability_limits_exclusivity_reclaim_and_sessions(
store: Arc<dyn RuntimePersistence>,
) {
store
.enqueue_queued_work(
queued_draft(
"root",
"not ready",
DeliveryPolicy::EarliestSafeBoundary,
SlotPolicy::Exclusive,
)
.with_available_at_ms(4_102_444_800_000),
)
.await
.expect("enqueue unavailable work");
let exclusive = store
.enqueue_queued_work(queued_draft(
"root",
"exclusive",
DeliveryPolicy::EarliestSafeBoundary,
SlotPolicy::Exclusive,
))
.await
.expect("enqueue exclusive work");
let joined = store
.enqueue_queued_work(
queued_draft(
"root",
"joined",
DeliveryPolicy::EarliestSafeBoundary,
SlotPolicy::Join,
)
.with_merge_key(MergeKey::Group("root".to_string())),
)
.await
.expect("enqueue joined work");
let other = store
.enqueue_queued_work(queued_draft(
"other",
"other session",
DeliveryPolicy::EarliestSafeBoundary,
SlotPolicy::Exclusive,
))
.await
.expect("enqueue other session work");
let claim = store
.claim_ready_queued_work("root", "owner-a", QueuedWorkClaimBoundary::Idle, 60_000, 10)
.await
.expect("claim root")
.expect("root claim");
assert_eq!(
claim
.batches
.iter()
.map(|batch| batch.batch_id.as_str())
.collect::<Vec<_>>(),
vec![exclusive.batch_id.as_str()],
"an exclusive batch must claim alone and unavailable earlier work must be skipped"
);
let next_root = store
.claim_ready_queued_work("root", "owner-b", QueuedWorkClaimBoundary::Idle, 60_000, 10)
.await
.expect("claim joined")
.expect("joined claim");
assert_eq!(next_root.batches[0].batch_id, joined.batch_id);
let other_claim = store
.claim_ready_queued_work(
"other",
"owner-c",
QueuedWorkClaimBoundary::Idle,
60_000,
10,
)
.await
.expect("claim other")
.expect("other claim");
assert_eq!(
other_claim.batches[0].batch_id, other.batch_id,
"claiming one session must not consume queued work from another session"
);
let reclaimed_source = store
.enqueue_queued_work(queued_draft(
"reclaim",
"expired claim",
DeliveryPolicy::EarliestSafeBoundary,
SlotPolicy::Exclusive,
))
.await
.expect("enqueue reclaim work");
let expired = store
.claim_ready_queued_work("reclaim", "owner-a", QueuedWorkClaimBoundary::Idle, 0, 1)
.await
.expect("claim with zero ttl")
.expect("expired claim");
let reclaimed = store
.claim_ready_queued_work(
"reclaim",
"owner-b",
QueuedWorkClaimBoundary::Idle,
60_000,
1,
)
.await
.expect("reclaim expired")
.expect("reclaimed expired claim");
assert_eq!(reclaimed.batches[0].batch_id, reclaimed_source.batch_id);
assert!(
reclaimed.fencing_token > expired.fencing_token,
"reclaiming an expired queued-work claim must bump the fencing token"
);
let limited_first = store
.enqueue_queued_work(
queued_draft(
"limited",
"one",
DeliveryPolicy::EarliestSafeBoundary,
SlotPolicy::Join,
)
.with_merge_key(MergeKey::Group("limited".to_string())),
)
.await
.expect("enqueue limited one");
let limited_second = store
.enqueue_queued_work(
queued_draft(
"limited",
"two",
DeliveryPolicy::EarliestSafeBoundary,
SlotPolicy::Join,
)
.with_merge_key(MergeKey::Group("limited".to_string())),
)
.await
.expect("enqueue limited two");
let limited_third = store
.enqueue_queued_work(
queued_draft(
"limited",
"three",
DeliveryPolicy::EarliestSafeBoundary,
SlotPolicy::Join,
)
.with_merge_key(MergeKey::Group("limited".to_string())),
)
.await
.expect("enqueue limited three");
let limited = store
.claim_ready_queued_work("limited", "owner", QueuedWorkClaimBoundary::Idle, 60_000, 2)
.await
.expect("limited claim")
.expect("limited claim exists");
assert_eq!(
limited
.batches
.iter()
.map(|batch| batch.batch_id.as_str())
.collect::<Vec<_>>(),
vec![
limited_first.batch_id.as_str(),
limited_second.batch_id.as_str()
],
"max_batches must cap a join claim"
);
let remaining = store
.claim_ready_queued_work(
"limited",
"owner-next",
QueuedWorkClaimBoundary::Idle,
60_000,
10,
)
.await
.expect("remaining claim")
.expect("remaining claim exists");
assert_eq!(remaining.batches[0].batch_id, limited_third.batch_id);
}
async fn queued_work_join_groups_by_delivery_policy_and_merge_key(
store: Arc<dyn RuntimePersistence>,
) {
let first = store
.enqueue_queued_work(
queued_draft(
"root",
"group a one",
DeliveryPolicy::EarliestSafeBoundary,
SlotPolicy::Join,
)
.with_merge_key(MergeKey::Group("a".to_string())),
)
.await
.expect("enqueue group a one");
let second = store
.enqueue_queued_work(
queued_draft(
"root",
"group a two",
DeliveryPolicy::EarliestSafeBoundary,
SlotPolicy::Join,
)
.with_merge_key(MergeKey::Group("a".to_string())),
)
.await
.expect("enqueue group a two");
let different_merge = store
.enqueue_queued_work(
queued_draft(
"root",
"group b",
DeliveryPolicy::EarliestSafeBoundary,
SlotPolicy::Join,
)
.with_merge_key(MergeKey::Group("b".to_string())),
)
.await
.expect("enqueue group b");
let different_delivery = store
.enqueue_queued_work(
queued_draft(
"root",
"after commit",
DeliveryPolicy::AfterCurrentTurnCommit,
SlotPolicy::Join,
)
.with_merge_key(MergeKey::Group("a".to_string())),
)
.await
.expect("enqueue after-commit");
let first_claim = store
.claim_ready_queued_work("root", "owner-a", QueuedWorkClaimBoundary::Idle, 60_000, 10)
.await
.expect("claim first group")
.expect("first group claim");
assert_eq!(
first_claim
.batches
.iter()
.map(|batch| batch.batch_id.as_str())
.collect::<Vec<_>>(),
vec![first.batch_id.as_str(), second.batch_id.as_str()],
"join claims must group only adjacent batches with the same delivery policy and merge key"
);
let second_claim = store
.claim_ready_queued_work("root", "owner-b", QueuedWorkClaimBoundary::Idle, 60_000, 10)
.await
.expect("claim second group")
.expect("second group claim");
assert_eq!(second_claim.batches[0].batch_id, different_merge.batch_id);
let third_claim = store
.claim_ready_queued_work("root", "owner-c", QueuedWorkClaimBoundary::Idle, 60_000, 10)
.await
.expect("claim third group")
.expect("third group claim");
assert_eq!(third_claim.batches[0].batch_id, different_delivery.batch_id);
}
async fn queued_work_completion_is_lease_guarded(store: Arc<dyn RuntimePersistence>) {
let first = store
.enqueue_queued_work(
queued_draft(
"root",
"join one",
DeliveryPolicy::EarliestSafeBoundary,
SlotPolicy::Join,
)
.with_merge_key(MergeKey::Group("joined".to_string())),
)
.await
.expect("enqueue first joined batch");
let second = store
.enqueue_queued_work(
queued_draft(
"root",
"join two",
DeliveryPolicy::EarliestSafeBoundary,
SlotPolicy::Join,
)
.with_merge_key(MergeKey::Group("joined".to_string())),
)
.await
.expect("enqueue second joined batch");
let claim = store
.claim_ready_queued_work("root", "owner-a", QueuedWorkClaimBoundary::Idle, 60_000, 10)
.await
.expect("claim joined batches")
.expect("joined claim exists");
assert_eq!(
claim
.batches
.iter()
.map(|batch| batch.batch_id.as_str())
.collect::<Vec<_>>(),
vec![first.batch_id.as_str(), second.batch_id.as_str()]
);
let mut stale_completion = claim.completion();
stale_completion.lease_token.push_str(":stale");
let state = RuntimeSessionState {
session_id: "root".to_string(),
..RuntimeSessionState::default()
};
let err = store
.commit_runtime_state(
RuntimeCommit::persisted_state(&state, &[]).completing_queue_claim(stale_completion),
)
.await
.expect_err("stale queued-work completion must fail");
assert!(matches!(err, StoreError::QueuedWorkClaimExpired { .. }));
assert_eq!(
store
.list_queued_work("root")
.await
.expect("stale completion preserves queued work")
.len(),
2
);
store
.commit_runtime_state(
RuntimeCommit::persisted_state(&state, &[]).completing_queue_claim(claim.completion()),
)
.await
.expect("valid queued-work completion commits");
assert!(
store
.list_queued_work("root")
.await
.expect("valid completion clears queued work")
.is_empty()
);
}
async fn queue_completion_and_turn_commit_stamp_are_atomic(store: Arc<dyn RuntimePersistence>) {
let batch = store
.enqueue_queued_work(queued_draft(
"root",
"atomic queue",
DeliveryPolicy::EarliestSafeBoundary,
SlotPolicy::Exclusive,
))
.await
.expect("enqueue queue batch");
let claim = store
.claim_ready_queued_work(
"root",
"queue-owner",
QueuedWorkClaimBoundary::Idle,
60_000,
1,
)
.await
.expect("claim queue")
.expect("queue claim");
assert_eq!(claim.batches[0].batch_id, batch.batch_id);
let state = RuntimeSessionState {
session_id: "root".to_string(),
turn_index: 41,
..RuntimeSessionState::default()
};
let base_commit = RuntimeCommit::persisted_state(&state, &[]);
let commit_hash = base_commit.turn_commit_hash().expect("turn commit hash");
let turn_commit = RuntimeTurnCommitStamp::new("root", "turn-atomic", commit_hash.clone());
let mut stale_queue_completion = claim.completion();
stale_queue_completion.lease_token.push_str(":stale");
let err = store
.commit_runtime_state(
base_commit
.clone()
.with_turn_commit(turn_commit.clone())
.completing_queue_claim(stale_queue_completion),
)
.await
.expect_err("stale queue completion must reject the whole final commit");
assert!(matches!(err, StoreError::QueuedWorkClaimExpired { .. }));
assert!(
store
.load_session(SessionReadScope::FullGraph)
.await
.expect("load after rejected atomic commit")
.is_none(),
"rejected queue completion must not persist session state"
);
assert_eq!(
store
.list_queued_work("root")
.await
.expect("list after rejected atomic commit")
.len(),
1,
"rejected queue completion must preserve queued work"
);
let first = store
.commit_runtime_state(
base_commit
.clone()
.with_turn_commit(turn_commit.clone())
.completing_queue_claim(claim.completion()),
)
.await
.expect("valid final commit clears queue and records the turn stamp atomically");
let retry = store
.commit_runtime_state(
base_commit
.with_turn_commit(RuntimeTurnCommitStamp::new(
"root",
"turn-atomic",
commit_hash,
))
.completing_queue_claim(claim.completion()),
)
.await
.expect("same final turn commit stamp retries idempotently");
assert_eq!(retry.head_revision, first.head_revision);
assert_eq!(retry.checkpoint_ref, first.checkpoint_ref);
assert!(
store
.load_session(SessionReadScope::FullGraph)
.await
.expect("load after accepted atomic commit")
.is_some()
);
assert!(
store
.list_queued_work("root")
.await
.expect("list after accepted atomic commit")
.is_empty()
);
}
async fn session_metadata_round_trips(store: Arc<dyn RuntimePersistence>) {
let meta = SessionMeta {
session_id: "root".to_string(),
session_name: "Conformance Root".to_string(),
created_at: "2026-06-02T00:00:00Z".to_string(),
model: "gpt-5.4-mini".to_string(),
cwd: Some("/tmp/lash-conformance".to_string()),
relation: SessionRelation::Root,
};
store
.save_session_meta(meta.clone())
.await
.expect("save session meta");
let loaded = store
.load_session_meta()
.await
.expect("load session meta")
.expect("session meta present");
assert_eq!(loaded.session_id, meta.session_id);
assert_eq!(loaded.session_name, meta.session_name);
assert_eq!(loaded.created_at, meta.created_at);
assert_eq!(loaded.model, meta.model);
assert_eq!(loaded.cwd, meta.cwd);
assert_eq!(loaded.relation, meta.relation);
}
async fn tombstone_vacuum_and_gc_are_minimally_consistent(store: Arc<dyn RuntimePersistence>) {
let mut state = RuntimeSessionState {
session_id: "root".to_string(),
session_graph: crate::SessionGraph::from_nodes(
vec![
sample_session_node("node-live", None),
sample_session_node("node-delete", Some("node-live")),
],
Some("node-delete".to_string()),
),
graph_replace_required: true,
..RuntimeSessionState::default()
};
state.head_revision = None;
store
.commit_runtime_state(RuntimeCommit::persisted_state(&state, &[]))
.await
.expect("commit graph");
assert!(
store
.load_node("node-delete")
.await
.expect("load node before tombstone")
.is_some()
);
store
.tombstone_nodes(&["node-delete".to_string()])
.await
.expect("tombstone node");
assert!(
store
.load_node("node-delete")
.await
.expect("load node after tombstone")
.is_none(),
"tombstoned nodes must be hidden from direct loads"
);
let read = store
.load_session(SessionReadScope::FullGraph)
.await
.expect("load graph after tombstone")
.expect("session after tombstone");
assert!(
!read
.graph
.nodes
.iter()
.any(|node| node.node_id == "node-delete"),
"tombstoned nodes must be hidden from session graph loads"
);
let vacuum = store.vacuum().await.expect("vacuum");
assert!(
vacuum.removed_node_count <= 1,
"vacuum must report only rows removed by this call, got {vacuum:?}"
);
store
.gc_unreachable()
.await
.expect("gc_unreachable should be safe to call");
}
async fn runtime_persistence_survives_reopen(factory: ReopenableRuntimePersistence) {
let meta = SessionMeta {
session_id: "root".to_string(),
session_name: "Durable Root".to_string(),
created_at: "2026-06-02T00:00:00Z".to_string(),
model: "gpt-5.4-mini".to_string(),
cwd: Some("/tmp/lash-reopen".to_string()),
relation: SessionRelation::Root,
};
factory
.open
.save_session_meta(meta.clone())
.await
.expect("save meta");
let state = RuntimeSessionState {
session_id: "root".to_string(),
tool_state_snapshot: Some(ToolState::default().with_generation(77)),
..RuntimeSessionState::default()
};
factory
.open
.commit_runtime_state(RuntimeCommit::persisted_state(&state, &[]))
.await
.expect("commit state");
let queued = factory
.open
.enqueue_queued_work(
queued_draft(
"root",
"survives reopen",
DeliveryPolicy::EarliestSafeBoundary,
SlotPolicy::Exclusive,
)
.with_source_key("reopen:queued"),
)
.await
.expect("enqueue queued work");
let attachment = AttachmentId::new("reopen-attachment".to_string());
factory
.open
.record_intent(AttachmentIntent {
attachment_id: attachment.clone(),
session_id: "root".to_string(),
canonical_uri: "sha256:reopen-attachment".to_string(),
intent_at_epoch_ms: 100,
})
.expect("record attachment intent");
let reopened_meta = factory
.reopen
.load_session_meta()
.await
.expect("load reopened meta")
.expect("reopened meta");
assert_eq!(reopened_meta.session_name, meta.session_name);
let reopened = factory
.reopen
.load_session(SessionReadScope::FullGraph)
.await
.expect("load reopened state")
.expect("reopened state");
assert_eq!(reopened.session_id, "root");
assert_eq!(
reopened
.checkpoint
.as_ref()
.and_then(|checkpoint| checkpoint.tool_state.as_ref())
.map(|tool_state| tool_state.generation()),
Some(77)
);
let reopened_queue = factory
.reopen
.list_queued_work("root")
.await
.expect("list reopened queue");
assert_eq!(reopened_queue.len(), 1);
assert_eq!(reopened_queue[0].batch_id, queued.batch_id);
assert_eq!(
queued_batch_text(&reopened_queue[0]),
Some("survives reopen")
);
let reopened_intents = factory
.reopen
.list_uncommitted(200)
.expect("list reopened attachment intents");
assert!(
reopened_intents
.iter()
.any(|intent| intent.attachment_id == attachment),
"attachment intent rows must survive reopening a durable store"
);
}
async fn queued_wake_delivery_is_source_key_idempotent_and_claimed_once(
store: Arc<dyn RuntimePersistence>,
) {
let wake = ProcessWakeDelivery {
wake_id: "wake-1".to_string(),
target_session_id: "root".to_string(),
target_scope_id: ProcessScopeId::new("session:root"),
process_id: "process-1".to_string(),
sequence: 7,
event_type: "process.wake".to_string(),
event_invocation: RuntimeInvocation {
scope: RuntimeScope::new("root"),
subject: RuntimeSubject::ProcessEvent {
process_id: "process-1".to_string(),
sequence: 7,
event_type: "process.wake".to_string(),
},
caused_by: None,
replay: None,
},
process_caused_by: None,
dedupe_key: "wake-dedupe-1".to_string(),
input: "wake payload".to_string(),
created_at_ms: 1,
};
let first = store
.enqueue_queued_work(crate::process_wake_batch_draft(wake.clone()))
.await
.expect("enqueue wake");
let replay = store
.enqueue_queued_work(crate::process_wake_batch_draft(wake))
.await
.expect("replay wake enqueue");
assert_eq!(
first.batch_id, replay.batch_id,
"wake source-key replay must return the original queued batch"
);
assert_eq!(
store
.list_queued_work("root")
.await
.expect("list queued wakes")
.len(),
1,
"replayed wake must not create a second queued delivery"
);
let claim = store
.claim_ready_queued_work(
"root",
"wake-owner",
QueuedWorkClaimBoundary::Idle,
60_000,
10,
)
.await
.expect("claim wake")
.expect("wake claim");
assert_eq!(claim.batches.len(), 1);
assert_eq!(claim.batches[0].items.len(), 1);
assert!(matches!(
claim.batches[0].items[0].payload,
QueuedWorkPayload::ProcessWake { .. }
));
let state = RuntimeSessionState {
session_id: "root".to_string(),
..RuntimeSessionState::default()
};
store
.commit_runtime_state(
RuntimeCommit::persisted_state(&state, &[]).completing_queue_claim(claim.completion()),
)
.await
.expect("wake delivery completion commits");
assert!(
store
.list_queued_work("root")
.await
.expect("list after wake completion")
.is_empty(),
"completed wake delivery must be removed exactly once"
);
}
async fn final_commit_stamp_is_idempotent_and_conflicts_on_changed_hash(
store: Arc<dyn RuntimePersistence>,
) {
let state = RuntimeSessionState {
session_id: "root".to_string(),
..RuntimeSessionState::default()
};
let commit = RuntimeCommit::persisted_state(&state, &[]);
let turn_commit_hash = commit.turn_commit_hash().expect("turn commit hash");
let commit = commit.with_turn_commit(RuntimeTurnCommitStamp::new(
"root",
"provider-turn",
turn_commit_hash.clone(),
));
let first = store
.commit_runtime_state(commit.clone())
.await
.expect("host-replayed final commit does not require a Lash lease");
let retry = store
.commit_runtime_state(commit)
.await
.expect("same host-replayed final commit retries idempotently");
assert_eq!(retry.head_revision, first.head_revision);
assert_eq!(retry.checkpoint_ref, first.checkpoint_ref);
let mut retry_from_new_head = RuntimeCommit::persisted_state(&state, &[]);
retry_from_new_head.expected_head_revision = Some(first.head_revision);
let retry_hash = retry_from_new_head
.turn_commit_hash()
.expect("retry commit hash");
assert_eq!(
retry_hash, turn_commit_hash,
"turn commit identity must not depend on the optimistic CAS revision"
);
let changed_state = RuntimeSessionState {
session_id: "root".to_string(),
turn_index: 1,
..RuntimeSessionState::default()
};
let changed = RuntimeCommit::persisted_state(&changed_state, &[]);
let changed_hash = changed.turn_commit_hash().expect("changed commit hash");
let err = store
.commit_runtime_state(changed.with_turn_commit(RuntimeTurnCommitStamp::new(
"root",
"provider-turn",
changed_hash,
)))
.await
.expect_err("same provider turn id with a different commit hash must conflict");
assert!(matches!(err, StoreError::RuntimeTurnCommitConflict { .. }));
}