#[async_trait::async_trait]
impl RuntimePersistence for PostgresSessionStore {
fn durability_tier(&self) -> DurabilityTier {
DurabilityTier::Durable
}
async fn load_session(
&self,
scope: SessionReadScope,
) -> Result<Option<PersistedSessionRead>, StoreError> {
let Some(session_id) = self.selected_session_id().await? else {
return Ok(None);
};
let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
let Some(meta) = load_session_head_meta_tx(&mut tx, &session_id, false).await? else {
return Ok(None);
};
tx.commit().await.map_err(store_sqlx_error)?;
let leaf_node_id = match &scope {
SessionReadScope::FullGraph => meta.leaf_node_id.clone(),
SessionReadScope::ActivePath { leaf_node_id } => {
leaf_node_id.clone().or_else(|| meta.leaf_node_id.clone())
}
};
let graph = load_graph(
&self.pool,
&session_id,
leaf_node_id.clone(),
matches!(scope, SessionReadScope::ActivePath { .. }),
)
.await?;
let checkpoint = match meta.checkpoint_ref.as_ref() {
Some(blob_ref) => get_checkpoint(&self.pool, blob_ref).await,
None => None,
};
Ok(Some(PersistedSessionRead {
session_id: meta.session_id,
head_revision: meta.head_revision,
config: meta.config,
agent_frames: meta.agent_frames,
current_agent_frame_id: meta.current_agent_frame_id,
graph,
checkpoint_ref: meta.checkpoint_ref,
checkpoint,
token_ledger: merge_token_ledger_entries(
load_usage_deltas(&self.pool, &session_id).await,
),
}))
}
async fn load_node(&self, node_id: &str) -> Result<Option<SessionNodeRecord>, StoreError> {
let json: Option<String> = if let Some(session_id) = &self.session_id {
sqlx::query_scalar(
"SELECT node_json FROM lash_graph_nodes
WHERE session_id = $1 AND node_id = $2 AND tombstoned = FALSE",
)
.bind(session_id)
.bind(node_id)
.fetch_optional(&self.pool)
.await
.map_err(store_sqlx_error)?
} else {
sqlx::query_scalar(
"SELECT node_json FROM lash_graph_nodes
WHERE node_id = $1 AND tombstoned = FALSE
ORDER BY session_id ASC
LIMIT 1",
)
.bind(node_id)
.fetch_optional(&self.pool)
.await
.map_err(store_sqlx_error)?
};
json.map(|json| store_decode_json(&json, "session graph node"))
.transpose()
}
async fn commit_runtime_state(
&self,
commit: RuntimeCommit,
) -> Result<RuntimeCommitResult, StoreError> {
let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
let existing = load_session_head_meta_tx(&mut tx, &commit.session_id, false).await?;
if let Some(bound_session_id) = existing.as_ref().map(|meta| meta.session_id.as_str())
&& bound_session_id != commit.session_id
{
return Err(StoreError::SessionBindingMismatch {
bound_session_id: bound_session_id.to_string(),
attempted_session_id: commit.session_id,
});
}
let effective_binding = self
.session_id
.clone()
.or_else(|| self.bound_session.get().cloned());
if let Some(bound_session_id) = &effective_binding
&& commit.session_id != *bound_session_id
{
return Err(StoreError::SessionBindingMismatch {
bound_session_id: bound_session_id.clone(),
attempted_session_id: commit.session_id,
});
}
if self.session_id.is_none() {
let _ = self.bound_session.set(commit.session_id.clone());
}
if let Some(completed) = &commit.turn_commit {
if completed.session_id != commit.session_id {
return Err(StoreError::RuntimeTurnCommitConflict {
session_id: completed.session_id.clone(),
turn_id: completed.turn_id.clone(),
});
}
let prior = sqlx::query(
"SELECT turn_commit_hash, result_json
FROM lash_runtime_turn_commits
WHERE session_id = $1 AND turn_id = $2",
)
.bind(&completed.session_id)
.bind(&completed.turn_id)
.fetch_optional(&mut *tx)
.await
.map_err(store_sqlx_error)?;
if let Some(row) = prior {
let hash: String = row.get(0);
let result_json: String = row.get(1);
if hash == completed.turn_commit_hash {
return store_decode_json(&result_json, "runtime turn commit result");
}
return Err(StoreError::RuntimeTurnCommitConflict {
session_id: completed.session_id.clone(),
turn_id: completed.turn_id.clone(),
});
}
}
let actual_revision = existing.as_ref().map_or(0, |meta| meta.head_revision);
if commit.expected_head_revision.is_some()
&& commit.expected_head_revision != Some(actual_revision)
{
return Err(StoreError::HeadRevisionConflict {
expected: commit.expected_head_revision,
actual: actual_revision,
});
}
for completed in &commit.completed_queue_claims {
if completed.session_id != commit.session_id {
return Err(StoreError::QueuedWorkClaimExpired {
session_id: completed.session_id.clone(),
claim_id: completed.claim_id.clone(),
});
}
ensure_queued_work_completion_tx(&mut tx, completed).await?;
}
let (checkpoint_ref, manifest) = put_checkpoint_tx(&mut tx, &commit.checkpoint).await?;
for entry in &commit.usage_deltas {
sqlx::query("INSERT INTO lash_usage_deltas (session_id, entry_json) VALUES ($1, $2)")
.bind(&commit.session_id)
.bind(encode_json(entry))
.execute(&mut *tx)
.await
.map_err(store_sqlx_error)?;
}
let leaf_node_id = match &commit.graph {
GraphCommitDelta::Unchanged { leaf_node_id } => leaf_node_id.clone(),
GraphCommitDelta::Append {
nodes,
leaf_node_id,
} => {
for node in nodes {
sqlx::query(
"INSERT INTO lash_graph_nodes (session_id, node_id, node_json)
VALUES ($1, $2, $3)
ON CONFLICT (session_id, node_id) DO UPDATE SET
node_json = EXCLUDED.node_json,
tombstoned = FALSE",
)
.bind(&commit.session_id)
.bind(&node.node_id)
.bind(encode_json(node))
.execute(&mut *tx)
.await
.map_err(store_sqlx_error)?;
}
leaf_node_id.clone()
}
GraphCommitDelta::ReplaceFull(graph) => {
sqlx::query("DELETE FROM lash_graph_nodes WHERE session_id = $1")
.bind(&commit.session_id)
.execute(&mut *tx)
.await
.map_err(store_sqlx_error)?;
for node in &graph.nodes {
sqlx::query(
"INSERT INTO lash_graph_nodes (session_id, node_id, node_json)
VALUES ($1, $2, $3)",
)
.bind(&commit.session_id)
.bind(&node.node_id)
.bind(encode_json(node))
.execute(&mut *tx)
.await
.map_err(store_sqlx_error)?;
}
graph.leaf_node_id.clone()
}
};
let graph_node_count: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM lash_graph_nodes WHERE session_id = $1 AND tombstoned = FALSE",
)
.bind(&commit.session_id)
.fetch_one(&mut *tx)
.await
.map_err(store_sqlx_error)?;
let next_revision = actual_revision + 1;
let meta = SessionHeadMeta {
session_id: commit.session_id.clone(),
head_revision: next_revision,
config: commit.config.clone(),
agent_frames: commit.agent_frames.clone(),
current_agent_frame_id: commit.current_agent_frame_id.clone(),
checkpoint_ref: Some(checkpoint_ref.clone()),
leaf_node_id,
graph_node_count: graph_node_count as usize,
token_ledger: Vec::new(),
};
let head_write = sqlx::query(
"INSERT INTO lash_sessions (session_id, head_revision, head_json, checkpoint_ref)
VALUES ($1, $2, $3, $4)
ON CONFLICT (session_id) DO UPDATE SET
head_revision = EXCLUDED.head_revision,
head_json = EXCLUDED.head_json,
checkpoint_ref = EXCLUDED.checkpoint_ref
WHERE lash_sessions.head_revision = $5",
)
.bind(&commit.session_id)
.bind(next_revision as i64)
.bind(encode_json(&meta))
.bind(checkpoint_ref.as_str())
.bind(actual_revision as i64)
.execute(&mut *tx)
.await;
let head_write = match head_write {
Ok(result) => result,
Err(err) if is_contention_error(&err) => {
return Err(StoreError::HeadRevisionConflict {
expected: commit.expected_head_revision.or(Some(actual_revision)),
actual: actual_revision,
});
}
Err(err) => return Err(store_sqlx_error(err)),
};
if head_write.rows_affected() == 0 {
let actual_now = sqlx::query_scalar::<_, i64>(
"SELECT head_revision FROM lash_sessions WHERE session_id = $1",
)
.bind(&commit.session_id)
.fetch_optional(&mut *tx)
.await
.map_err(store_sqlx_error)?
.map_or(actual_revision, |revision| revision as u64);
return Err(StoreError::HeadRevisionConflict {
expected: commit.expected_head_revision.or(Some(actual_revision)),
actual: actual_now,
});
}
for completed in &commit.completed_queue_claims {
for batch_id in &completed.batch_ids {
sqlx::query(
"DELETE FROM lash_queued_work_batches
WHERE session_id = $1 AND batch_id = $2 AND claim_id = $3 AND claim_token = $4",
)
.bind(&completed.session_id)
.bind(batch_id)
.bind(&completed.claim_id)
.bind(&completed.lease_token)
.execute(&mut *tx)
.await
.map_err(store_sqlx_error)?;
}
}
commit_attachment_refs_tx(
&mut tx,
&commit.session_id,
&commit.committed_attachment_ids,
)
.await?;
let result = RuntimeCommitResult {
head_revision: next_revision,
checkpoint_ref,
manifest,
};
if let Some(completed) = &commit.turn_commit {
sqlx::query(
"INSERT INTO lash_runtime_turn_commits (
session_id, turn_id, turn_commit_hash, result_json, committed_at_ms
)
VALUES ($1, $2, $3, $4, $5)",
)
.bind(&completed.session_id)
.bind(&completed.turn_id)
.bind(&completed.turn_commit_hash)
.bind(encode_json(&result))
.bind(current_epoch_ms() as i64)
.execute(&mut *tx)
.await
.map_err(store_sqlx_error)?;
}
tx.commit().await.map_err(store_sqlx_error)?;
Ok(result)
}
async fn enqueue_queued_work(
&self,
batch: QueuedWorkBatchDraft,
) -> Result<QueuedWorkBatch, StoreError> {
let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
if let Some(source_key) = batch.source_key.as_deref() {
let existing_id: Option<String> = sqlx::query_scalar(
"SELECT batch_id FROM lash_queued_work_batches
WHERE session_id = $1 AND source_key = $2",
)
.bind(&batch.session_id)
.bind(source_key)
.fetch_optional(&mut *tx)
.await
.map_err(store_sqlx_error)?;
if let Some(batch_id) = existing_id {
let existing = load_queued_batch(&mut tx, &batch_id)
.await?
.ok_or_else(|| {
StoreError::Backend("queued work source row disappeared".to_string())
})?;
tx.commit().await.map_err(store_sqlx_error)?;
return Ok(existing);
}
}
let now = current_epoch_ms();
let batch_id = derive_batch_id(&batch.session_id, batch.source_key.as_deref(), now, None);
let row = sqlx::query_scalar::<_, i64>(
"INSERT INTO lash_queued_work_batches (
batch_id, session_id, source_key, delivery_policy, slot_policy,
merge_key_json, available_at_ms, enqueued_at_ms
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING enqueue_seq",
)
.bind(&batch_id)
.bind(&batch.session_id)
.bind(&batch.source_key)
.bind(batch.delivery_policy.as_str())
.bind(batch.slot_policy.as_str())
.bind(encode_json(&batch.merge_key))
.bind(batch.available_at_ms as i64)
.bind(now as i64)
.fetch_one(&mut *tx)
.await
.map_err(store_sqlx_error)?;
for (index, payload) in batch.payloads.iter().enumerate() {
let item_id = format!("{batch_id}:item:{index}");
sqlx::query(
"INSERT INTO lash_queued_work_items (batch_id, item_index, item_id, payload_json)
VALUES ($1, $2, $3, $4)",
)
.bind(&batch_id)
.bind(index as i32)
.bind(item_id)
.bind(encode_json(payload))
.execute(&mut *tx)
.await
.map_err(store_sqlx_error)?;
}
let queued = load_queued_batch(&mut tx, &batch_id)
.await?
.ok_or_else(|| StoreError::Backend("queued work insert disappeared".to_string()))?;
debug_assert_eq!(queued.enqueue_seq, row as u64);
tx.commit().await.map_err(store_sqlx_error)?;
Ok(queued)
}
async fn claim_ready_queued_work(
&self,
session_id: &str,
owner_id: &str,
boundary: QueuedWorkClaimBoundary,
lease_ttl_ms: u64,
max_batches: usize,
) -> Result<Option<QueuedWorkClaim>, StoreError> {
if max_batches == 0 {
return Ok(None);
}
let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
let now = current_epoch_ms();
let rows = sqlx::query(
"SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
claim_fencing_token
FROM lash_queued_work_batches
WHERE session_id = $1
AND available_at_ms <= $2
AND (claim_token IS NULL OR claim_expires_at_ms <= $2)
ORDER BY enqueue_seq ASC
LIMIT $3
FOR UPDATE SKIP LOCKED",
)
.bind(session_id)
.bind(now as i64)
.bind(claim_scan_limit(max_batches))
.fetch_all(&mut *tx)
.await
.map_err(store_sqlx_error)?;
let mut selected = Vec::new();
for row in rows {
selected.push(queued_batch_row(row)?);
}
let candidates = selected
.iter()
.map(|row| ClaimCandidate {
enqueue_seq: row.enqueue_seq,
claim_fencing_token: row.claim_fencing_token,
delivery_policy: row.delivery_policy,
slot_policy: row.slot_policy,
merge_key: row.merge_key.clone(),
})
.collect::<Vec<_>>();
let selected_len = select_claim_prefix(&candidates, boundary, max_batches);
if selected_len == 0 {
tx.commit().await.map_err(store_sqlx_error)?;
return Ok(None);
}
selected.truncate(selected_len);
let lease =
QueuedWorkClaimLease::derive(&candidates[0], session_id, owner_id, now, lease_ttl_ms);
for row in &selected {
let changed = sqlx::query(
"UPDATE lash_queued_work_batches
SET claim_id = $3,
claim_owner_id = $4,
claim_token = $5,
claim_fencing_token = claim_fencing_token + 1,
claim_claimed_at_ms = $6,
claim_expires_at_ms = $7
WHERE session_id = $1
AND batch_id = $2
AND (claim_token IS NULL OR claim_expires_at_ms <= $6)",
)
.bind(session_id)
.bind(&row.batch_id)
.bind(&lease.claim_id)
.bind(owner_id)
.bind(&lease.lease_token)
.bind(now as i64)
.bind(lease.expires_at_epoch_ms as i64)
.execute(&mut *tx)
.await
.map_err(store_sqlx_error)?
.rows_affected();
if changed == 0 {
tx.rollback().await.map_err(store_sqlx_error)?;
return Ok(None);
}
}
let mut batches = Vec::new();
for row in selected {
batches.push(queued_work_batch_from_row(&mut tx, row).await?);
}
tx.commit().await.map_err(store_sqlx_error)?;
Ok(Some(QueuedWorkClaim {
session_id: session_id.to_string(),
claim_id: lease.claim_id,
owner_id: owner_id.to_string(),
lease_token: lease.lease_token,
fencing_token: lease.fencing_token,
claimed_at_epoch_ms: lease.claimed_at_epoch_ms,
expires_at_epoch_ms: lease.expires_at_epoch_ms,
batches,
}))
}
async fn renew_queued_work_claim(
&self,
claim: &QueuedWorkClaim,
lease_ttl_ms: u64,
) -> Result<QueuedWorkClaim, StoreError> {
let expires_at = current_epoch_ms().saturating_add(lease_ttl_ms);
let changed = sqlx::query(
"UPDATE lash_queued_work_batches
SET claim_expires_at_ms = $4
WHERE session_id = $1 AND claim_id = $2 AND claim_token = $3",
)
.bind(&claim.session_id)
.bind(&claim.claim_id)
.bind(&claim.lease_token)
.bind(expires_at as i64)
.execute(&self.pool)
.await
.map_err(store_sqlx_error)?
.rows_affected();
renewed_claim(claim, changed as usize, expires_at)
}
async fn abandon_queued_work_claim(&self, claim: &QueuedWorkClaim) -> Result<(), StoreError> {
sqlx::query(
"UPDATE lash_queued_work_batches
SET claim_id = NULL,
claim_owner_id = NULL,
claim_token = NULL,
claim_claimed_at_ms = 0,
claim_expires_at_ms = 0
WHERE session_id = $1 AND claim_id = $2 AND claim_token = $3",
)
.bind(&claim.session_id)
.bind(&claim.claim_id)
.bind(&claim.lease_token)
.execute(&self.pool)
.await
.map_err(store_sqlx_error)?;
Ok(())
}
async fn cancel_queued_work_batch(
&self,
session_id: &str,
batch_id: &str,
) -> Result<Option<QueuedWorkBatch>, StoreError> {
let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
let now = current_epoch_ms();
let row = sqlx::query(
"SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
claim_fencing_token
FROM lash_queued_work_batches
WHERE session_id = $1
AND batch_id = $2
AND (claim_token IS NULL OR claim_expires_at_ms <= $3)
FOR UPDATE",
)
.bind(session_id)
.bind(batch_id)
.bind(now as i64)
.fetch_optional(&mut *tx)
.await
.map_err(store_sqlx_error)?;
let Some(row) = row else {
tx.commit().await.map_err(store_sqlx_error)?;
return Ok(None);
};
let batch = queued_work_batch_from_row(&mut tx, queued_batch_row(row)?).await?;
sqlx::query("DELETE FROM lash_queued_work_batches WHERE batch_id = $1")
.bind(batch_id)
.execute(&mut *tx)
.await
.map_err(store_sqlx_error)?;
tx.commit().await.map_err(store_sqlx_error)?;
Ok(Some(batch))
}
async fn list_queued_work(&self, session_id: &str) -> Result<Vec<QueuedWorkBatch>, StoreError> {
let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
let rows = sqlx::query(
"SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
claim_fencing_token
FROM lash_queued_work_batches
WHERE session_id = $1
ORDER BY enqueue_seq ASC",
)
.bind(session_id)
.fetch_all(&mut *tx)
.await
.map_err(store_sqlx_error)?;
let mut batches = Vec::new();
for row in rows {
batches.push(queued_work_batch_from_row(&mut tx, queued_batch_row(row)?).await?);
}
tx.commit().await.map_err(store_sqlx_error)?;
Ok(batches)
}
async fn list_pending_queued_work(
&self,
session_id: &str,
) -> Result<Vec<QueuedWorkBatch>, StoreError> {
let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
let now = current_epoch_ms();
let rows = sqlx::query(
"SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
claim_fencing_token
FROM lash_queued_work_batches
WHERE session_id = $1
AND (claim_token IS NULL OR claim_expires_at_ms <= $2)
ORDER BY enqueue_seq ASC",
)
.bind(session_id)
.bind(now as i64)
.fetch_all(&mut *tx)
.await
.map_err(store_sqlx_error)?;
let mut batches = Vec::new();
for row in rows {
batches.push(queued_work_batch_from_row(&mut tx, queued_batch_row(row)?).await?);
}
tx.commit().await.map_err(store_sqlx_error)?;
Ok(batches)
}
async fn save_session_meta(&self, meta: SessionMeta) -> Result<(), StoreError> {
sqlx::query(
"INSERT INTO lash_session_meta (session_id, meta_json)
VALUES ($1, $2)
ON CONFLICT (session_id) DO UPDATE SET meta_json = EXCLUDED.meta_json",
)
.bind(&meta.session_id)
.bind(encode_json(&meta))
.execute(&self.pool)
.await
.map_err(store_sqlx_error)?;
Ok(())
}
async fn load_session_meta(&self) -> Result<Option<SessionMeta>, StoreError> {
let json: Option<String> = if let Some(session_id) = &self.session_id {
sqlx::query_scalar("SELECT meta_json FROM lash_session_meta WHERE session_id = $1")
.bind(session_id)
.fetch_optional(&self.pool)
.await
.map_err(store_sqlx_error)?
} else {
sqlx::query_scalar(
"SELECT meta_json FROM lash_session_meta ORDER BY session_id ASC LIMIT 1",
)
.fetch_optional(&self.pool)
.await
.map_err(store_sqlx_error)?
};
json.map(|json| store_decode_json(&json, "session meta"))
.transpose()
}
async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), StoreError> {
for id in ids {
if let Some(session_id) = &self.session_id {
sqlx::query(
"UPDATE lash_graph_nodes
SET tombstoned = TRUE
WHERE session_id = $1 AND node_id = $2",
)
.bind(session_id)
.bind(id)
.execute(&self.pool)
.await
.map_err(store_sqlx_error)?;
} else {
sqlx::query(
"UPDATE lash_graph_nodes
SET tombstoned = TRUE
WHERE node_id = $1",
)
.bind(id)
.execute(&self.pool)
.await
.map_err(store_sqlx_error)?;
}
}
Ok(())
}
async fn vacuum(&self) -> Result<VacuumReport, StoreError> {
let removed = if let Some(session_id) = &self.session_id {
sqlx::query("DELETE FROM lash_graph_nodes WHERE session_id = $1 AND tombstoned = TRUE")
.bind(session_id)
.execute(&self.pool)
.await
.map_err(store_sqlx_error)?
.rows_affected()
} else {
sqlx::query("DELETE FROM lash_graph_nodes WHERE tombstoned = TRUE")
.execute(&self.pool)
.await
.map_err(store_sqlx_error)?
.rows_affected()
};
Ok(VacuumReport {
removed_node_count: removed as usize,
})
}
async fn gc_unreachable(&self) -> Result<GcReport, StoreError> {
Ok(GcReport::default())
}
}