Skip to main content

lash_postgres_store/
lib.rs

1//! PostgreSQL durable storage for Lash.
2//!
3//! One [`PostgresStorage`] owns a shared [`sqlx::PgPool`] and creates durable
4//! implementations for the runtime session store, process registry, host-event
5//! store, Lashlang artifact store, and attachment manifest.
6
7use std::collections::HashSet;
8use std::sync::{Arc, OnceLock};
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11use lash_core::runtime::{
12    ProcessHandleGrantEntry, QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim,
13    QueuedWorkClaimBoundary, QueuedWorkCompletion, QueuedWorkItem,
14};
15use lash_core::store::queued_work::{
16    ClaimCandidate, QueuedWorkClaimLease, claim_scan_limit, derive_batch_id, renewed_claim,
17    select_claim_prefix,
18};
19use lash_core::store::{
20    GraphCommitDelta, HydratedSessionCheckpoint, PersistedSessionRead, RuntimeCommit,
21    RuntimeCommitResult, SessionCheckpoint, SessionHeadMeta,
22};
23use lash_core::{
24    AttachmentId, AttachmentIntent, AttachmentManifest, AttachmentManifestEntry, BlobRef,
25    DeliveryPolicy, DurabilityTier, GcReport, MergeKey, ProcessAwaitOutput, ProcessEvent,
26    ProcessEventAppendRequest, ProcessEventAppendResult, ProcessExternalRef,
27    ProcessHandleDescriptor, ProcessHandleGrant, ProcessLease, ProcessLeaseCompletion,
28    ProcessRecord, ProcessRegistration, ProcessRegistry, ProcessScope, RuntimePersistence,
29    SessionMeta, SessionNodeRecord, SessionReadScope, SessionStoreCreateRequest,
30    SessionStoreFactory, SlotPolicy, StoreError, TokenLedgerEntry, VacuumReport,
31};
32use lash_core::{
33    HostEventOccurrenceRecord, HostEventOccurrenceRequest, HostEventStore, PluginError,
34    TriggerDeliveryReservation, TriggerSubscriptionDraft, TriggerSubscriptionFilter,
35    TriggerSubscriptionRecord,
36};
37use sha2::{Digest, Sha256};
38use sqlx::postgres::{PgPool, PgPoolOptions, PgRow};
39use sqlx::{Executor, Row};
40
41const SCHEMA_COMPONENT: &str = "lash-postgres-store";
42const SCHEMA_VERSION: i32 = 1;
43const PROCESS_LEASE_SCHEMA_VERSION: u32 = lash_core::PROCESS_LEASE_SCHEMA_VERSION;
44
45#[derive(Clone)]
46pub struct PostgresStorage {
47    pool: PgPool,
48}
49
50#[derive(Clone)]
51pub struct PostgresSessionStoreFactory {
52    pool: PgPool,
53}
54
55#[derive(Clone)]
56pub struct PostgresSessionStore {
57    pool: PgPool,
58    /// Explicit session binding for handles created via the factory.
59    session_id: Option<String>,
60    /// In-memory bind-on-first-commit for an *unbound* handle. A session-store
61    /// handle commits to exactly one session; an unbound handle latches the first
62    /// session it commits and rejects others (Postgres is multi-session per
63    /// database, so this can't be inferred from a singleton head row the way the
64    /// single-file SQLite store does). Shared across clones via `Arc`.
65    bound_session: Arc<OnceLock<String>>,
66}
67
68#[derive(Clone)]
69pub struct PostgresProcessRegistry {
70    pool: PgPool,
71    notify: Arc<tokio::sync::Notify>,
72}
73
74#[derive(Clone)]
75pub struct PostgresHostEventStore {
76    pool: PgPool,
77}
78
79#[derive(Clone)]
80pub struct PostgresLashlangArtifactStore {
81    pool: PgPool,
82}
83
84/// Connection-pool and per-connection timeout knobs for [`PostgresStorage`].
85///
86/// Session commits use **optimistic CAS** on the head (`UPDATE … WHERE
87/// head_revision = expected`), not a held `SELECT … FOR UPDATE`, so concurrent
88/// writers never pin a pool connection while blocked on a lock. `lock_timeout` is
89/// defense in depth: it caps how long the single CAS write may wait on the head
90/// row's lock before erroring (surfaced as a retryable conflict), so a pathological
91/// burst can never starve the pool.
92#[derive(Clone, Debug)]
93pub struct PostgresStoreConfig {
94    /// Maximum pooled connections. Default 16.
95    pub max_connections: u32,
96    /// Minimum idle connections kept warm. Default 0.
97    pub min_connections: u32,
98    /// How long `acquire` waits for a free connection before erroring. Default 30s.
99    pub acquire_timeout: Duration,
100    /// Close a connection after this idle period. Default 10m.
101    pub idle_timeout: Option<Duration>,
102    /// Recycle a connection after this lifetime. Default 30m.
103    pub max_lifetime: Option<Duration>,
104    /// Postgres `lock_timeout` applied to every connection. Default 10s.
105    pub lock_timeout: Option<Duration>,
106    /// Postgres `statement_timeout` applied to every connection. Default 30s — a
107    /// backstop so a wedged query can never hold a connection indefinitely.
108    pub statement_timeout: Option<Duration>,
109}
110
111impl Default for PostgresStoreConfig {
112    fn default() -> Self {
113        Self {
114            max_connections: 16,
115            min_connections: 0,
116            acquire_timeout: Duration::from_secs(30),
117            idle_timeout: Some(Duration::from_secs(600)),
118            max_lifetime: Some(Duration::from_secs(1800)),
119            lock_timeout: Some(Duration::from_secs(10)),
120            statement_timeout: Some(Duration::from_secs(30)),
121        }
122    }
123}
124
125impl PostgresStorage {
126    /// Connect with [`PostgresStoreConfig::default`] pool/timeout settings.
127    pub async fn connect(database_url: &str) -> Result<Self, StoreError> {
128        Self::connect_with(database_url, PostgresStoreConfig::default()).await
129    }
130
131    /// Connect with explicit pool sizing and per-connection timeouts.
132    pub async fn connect_with(
133        database_url: &str,
134        config: PostgresStoreConfig,
135    ) -> Result<Self, StoreError> {
136        let lock_ms = config.lock_timeout.map(|d| d.as_millis().max(1) as u64);
137        let statement_ms = config
138            .statement_timeout
139            .map(|d| d.as_millis().max(1) as u64);
140        let mut options = PgPoolOptions::new()
141            .max_connections(config.max_connections)
142            .min_connections(config.min_connections)
143            .acquire_timeout(config.acquire_timeout);
144        if let Some(timeout) = config.idle_timeout {
145            options = options.idle_timeout(timeout);
146        }
147        if let Some(timeout) = config.max_lifetime {
148            options = options.max_lifetime(timeout);
149        }
150        let pool = options
151            .after_connect(move |conn, _meta| {
152                Box::pin(async move {
153                    if let Some(ms) = lock_ms {
154                        conn.execute(format!("SET lock_timeout = {ms}").as_str())
155                            .await?;
156                    }
157                    if let Some(ms) = statement_ms {
158                        conn.execute(format!("SET statement_timeout = {ms}").as_str())
159                            .await?;
160                    }
161                    Ok(())
162                })
163            })
164            .connect(database_url)
165            .await
166            .map_err(store_sqlx_error)?;
167        ensure_schema(&pool).await?;
168        Ok(Self { pool })
169    }
170
171    pub fn from_pool(pool: PgPool) -> Self {
172        Self { pool }
173    }
174
175    pub fn pool(&self) -> &PgPool {
176        &self.pool
177    }
178
179    pub fn session_store_factory(&self) -> PostgresSessionStoreFactory {
180        PostgresSessionStoreFactory {
181            pool: self.pool.clone(),
182        }
183    }
184
185    pub fn session_store(&self, session_id: impl Into<String>) -> PostgresSessionStore {
186        PostgresSessionStore {
187            pool: self.pool.clone(),
188            session_id: Some(session_id.into()),
189            bound_session: Arc::new(OnceLock::new()),
190        }
191    }
192
193    pub fn unbound_session_store(&self) -> PostgresSessionStore {
194        PostgresSessionStore {
195            pool: self.pool.clone(),
196            session_id: None,
197            bound_session: Arc::new(OnceLock::new()),
198        }
199    }
200
201    pub fn process_registry(&self) -> PostgresProcessRegistry {
202        PostgresProcessRegistry {
203            pool: self.pool.clone(),
204            notify: Arc::new(tokio::sync::Notify::new()),
205        }
206    }
207
208    pub fn host_event_store(&self) -> PostgresHostEventStore {
209        PostgresHostEventStore {
210            pool: self.pool.clone(),
211        }
212    }
213
214    pub fn lashlang_artifact_store(&self) -> PostgresLashlangArtifactStore {
215        PostgresLashlangArtifactStore {
216            pool: self.pool.clone(),
217        }
218    }
219}
220
221impl PostgresSessionStoreFactory {
222    pub fn new(storage: &PostgresStorage) -> Self {
223        storage.session_store_factory()
224    }
225}
226
227impl PostgresSessionStore {
228    pub fn unbound(storage: &PostgresStorage) -> Self {
229        storage.unbound_session_store()
230    }
231
232    async fn selected_session_id(&self) -> Result<Option<String>, StoreError> {
233        if let Some(session_id) = &self.session_id {
234            return Ok(Some(session_id.clone()));
235        }
236        sqlx::query_scalar("SELECT session_id FROM lash_sessions ORDER BY session_id ASC LIMIT 1")
237            .fetch_optional(&self.pool)
238            .await
239            .map_err(store_sqlx_error)
240    }
241}
242
243async fn ensure_schema(pool: &PgPool) -> Result<(), StoreError> {
244    let mut tx = pool.begin().await.map_err(store_sqlx_error)?;
245    tx.execute("SELECT pg_advisory_xact_lock(715421, 907001)")
246        .await
247        .map_err(store_sqlx_error)?;
248    tx.execute(
249        r#"
250        CREATE TABLE IF NOT EXISTS lash_schema_versions (
251            component TEXT PRIMARY KEY,
252            version INTEGER NOT NULL
253        );
254
255        CREATE TABLE IF NOT EXISTS lash_blobs (
256            hash TEXT PRIMARY KEY,
257            content BYTEA NOT NULL
258        );
259
260        CREATE TABLE IF NOT EXISTS lash_sessions (
261            session_id TEXT PRIMARY KEY,
262            head_revision BIGINT NOT NULL DEFAULT 0,
263            head_json TEXT NOT NULL,
264            checkpoint_ref TEXT
265        );
266
267        CREATE TABLE IF NOT EXISTS lash_graph_nodes (
268            session_id TEXT NOT NULL,
269            seq BIGSERIAL,
270            node_id TEXT NOT NULL,
271            node_json TEXT NOT NULL,
272            tombstoned BOOLEAN NOT NULL DEFAULT FALSE,
273            PRIMARY KEY (session_id, node_id)
274        );
275        CREATE INDEX IF NOT EXISTS idx_lash_graph_nodes_seq
276            ON lash_graph_nodes(session_id, seq);
277
278        CREATE TABLE IF NOT EXISTS lash_usage_deltas (
279            seq BIGSERIAL PRIMARY KEY,
280            session_id TEXT NOT NULL,
281            entry_json TEXT NOT NULL
282        );
283
284        CREATE TABLE IF NOT EXISTS lash_session_meta (
285            session_id TEXT PRIMARY KEY,
286            meta_json TEXT NOT NULL
287        );
288
289        CREATE TABLE IF NOT EXISTS lash_runtime_turn_commits (
290            session_id TEXT NOT NULL,
291            turn_id TEXT NOT NULL,
292            turn_commit_hash TEXT NOT NULL,
293            result_json TEXT NOT NULL,
294            committed_at_ms BIGINT NOT NULL,
295            PRIMARY KEY (session_id, turn_id)
296        );
297
298        CREATE TABLE IF NOT EXISTS lash_queued_work_batches (
299            enqueue_seq BIGSERIAL PRIMARY KEY,
300            batch_id TEXT NOT NULL UNIQUE,
301            session_id TEXT NOT NULL,
302            source_key TEXT,
303            delivery_policy TEXT NOT NULL,
304            slot_policy TEXT NOT NULL,
305            merge_key_json TEXT NOT NULL,
306            available_at_ms BIGINT NOT NULL,
307            enqueued_at_ms BIGINT NOT NULL,
308            claim_id TEXT,
309            claim_owner_id TEXT,
310            claim_token TEXT,
311            claim_fencing_token BIGINT NOT NULL DEFAULT 0,
312            claim_claimed_at_ms BIGINT NOT NULL DEFAULT 0,
313            claim_expires_at_ms BIGINT NOT NULL DEFAULT 0,
314            UNIQUE (session_id, source_key)
315        );
316        CREATE INDEX IF NOT EXISTS idx_lash_queued_work_ready
317            ON lash_queued_work_batches(session_id, available_at_ms, enqueue_seq);
318
319        CREATE TABLE IF NOT EXISTS lash_queued_work_items (
320            batch_id TEXT NOT NULL REFERENCES lash_queued_work_batches(batch_id) ON DELETE CASCADE,
321            item_index INTEGER NOT NULL,
322            item_id TEXT NOT NULL,
323            payload_json TEXT NOT NULL,
324            PRIMARY KEY (batch_id, item_index)
325        );
326
327        CREATE TABLE IF NOT EXISTS lash_attachment_manifest (
328            attachment_id TEXT PRIMARY KEY,
329            session_id TEXT NOT NULL,
330            canonical_uri TEXT NOT NULL,
331            intent_at_ms BIGINT NOT NULL,
332            committed_at_ms BIGINT
333        );
334        CREATE INDEX IF NOT EXISTS idx_lash_attachment_manifest_uncommitted
335            ON lash_attachment_manifest(committed_at_ms)
336            WHERE committed_at_ms IS NULL;
337
338        CREATE TABLE IF NOT EXISTS lash_processes (
339            process_id TEXT PRIMARY KEY,
340            registration_hash TEXT NOT NULL,
341            owner_scope_id TEXT NOT NULL,
342            host_profile_id TEXT NOT NULL,
343            created_at_ms BIGINT NOT NULL,
344            updated_at_ms BIGINT NOT NULL,
345            status TEXT NOT NULL,
346            record_json TEXT NOT NULL
347        );
348        CREATE INDEX IF NOT EXISTS idx_lash_processes_status
349            ON lash_processes(status);
350
351        CREATE TABLE IF NOT EXISTS lash_process_events (
352            process_id TEXT NOT NULL REFERENCES lash_processes(process_id) ON DELETE CASCADE,
353            sequence BIGINT NOT NULL,
354            event_type TEXT NOT NULL,
355            payload_hash TEXT NOT NULL,
356            idempotency_key TEXT,
357            occurred_at_ms BIGINT NOT NULL,
358            event_json TEXT NOT NULL,
359            PRIMARY KEY (process_id, sequence)
360        );
361        CREATE UNIQUE INDEX IF NOT EXISTS idx_lash_process_events_key
362            ON lash_process_events(process_id, idempotency_key)
363            WHERE idempotency_key IS NOT NULL;
364
365        CREATE TABLE IF NOT EXISTS lash_process_wake_acks (
366            process_id TEXT NOT NULL REFERENCES lash_processes(process_id) ON DELETE CASCADE,
367            sequence BIGINT NOT NULL,
368            PRIMARY KEY (process_id, sequence)
369        );
370
371        CREATE TABLE IF NOT EXISTS lash_process_handle_grants (
372            session_id TEXT NOT NULL,
373            scope_id TEXT NOT NULL,
374            process_id TEXT NOT NULL REFERENCES lash_processes(process_id) ON DELETE CASCADE,
375            descriptor_json TEXT NOT NULL,
376            PRIMARY KEY (scope_id, process_id)
377        );
378        CREATE INDEX IF NOT EXISTS idx_lash_process_handle_grants_session
379            ON lash_process_handle_grants(session_id);
380        CREATE INDEX IF NOT EXISTS idx_lash_process_handle_grants_process
381            ON lash_process_handle_grants(process_id);
382
383        CREATE TABLE IF NOT EXISTS lash_process_leases (
384            process_id TEXT PRIMARY KEY REFERENCES lash_processes(process_id) ON DELETE CASCADE,
385            lease_owner_id TEXT,
386            lease_token TEXT,
387            lease_fencing_token BIGINT NOT NULL DEFAULT 0,
388            lease_claimed_at_ms BIGINT NOT NULL DEFAULT 0,
389            lease_expires_at_ms BIGINT NOT NULL DEFAULT 0
390        );
391
392        CREATE SEQUENCE IF NOT EXISTS lash_host_event_subscription_seq;
393        CREATE TABLE IF NOT EXISTS lash_host_event_trigger_subscriptions (
394            subscription_id TEXT PRIMARY KEY,
395            session_id TEXT NOT NULL,
396            handle TEXT NOT NULL,
397            source_type TEXT NOT NULL,
398            source_key TEXT NOT NULL,
399            enabled BOOLEAN NOT NULL,
400            created_at_ms BIGINT NOT NULL,
401            updated_at_ms BIGINT NOT NULL,
402            record_json TEXT NOT NULL,
403            UNIQUE(session_id, handle)
404        );
405        CREATE INDEX IF NOT EXISTS idx_lash_host_event_subscriptions_source
406            ON lash_host_event_trigger_subscriptions(source_type, source_key, enabled);
407
408        CREATE TABLE IF NOT EXISTS lash_host_event_occurrences (
409            occurrence_id TEXT PRIMARY KEY,
410            idempotency_key TEXT NOT NULL UNIQUE,
411            request_hash TEXT NOT NULL,
412            source_type TEXT NOT NULL,
413            source_key TEXT NOT NULL,
414            occurred_at_ms BIGINT NOT NULL,
415            record_json TEXT NOT NULL
416        );
417
418        CREATE TABLE IF NOT EXISTS lash_host_event_deliveries (
419            occurrence_id TEXT NOT NULL REFERENCES lash_host_event_occurrences(occurrence_id) ON DELETE CASCADE,
420            subscription_id TEXT NOT NULL REFERENCES lash_host_event_trigger_subscriptions(subscription_id) ON DELETE CASCADE,
421            process_id TEXT NOT NULL,
422            created_at_ms BIGINT NOT NULL,
423            PRIMARY KEY (occurrence_id, subscription_id)
424        );
425
426        CREATE TABLE IF NOT EXISTS lash_lashlang_artifacts (
427            module_ref TEXT PRIMARY KEY,
428            artifact_bytes BYTEA NOT NULL
429        );
430        "#,
431    )
432    .await
433    .map_err(store_sqlx_error)?;
434
435    let existing: Option<i32> =
436        sqlx::query_scalar("SELECT version FROM lash_schema_versions WHERE component = $1")
437            .bind(SCHEMA_COMPONENT)
438            .fetch_optional(&mut *tx)
439            .await
440            .map_err(store_sqlx_error)?;
441    match existing {
442        Some(version) if version == SCHEMA_VERSION => {}
443        Some(version) => {
444            return Err(StoreError::Backend(format!(
445                "Postgres schema component `{SCHEMA_COMPONENT}` has version {version}, expected {SCHEMA_VERSION}"
446            )));
447        }
448        None => {
449            sqlx::query("INSERT INTO lash_schema_versions (component, version) VALUES ($1, $2)")
450                .bind(SCHEMA_COMPONENT)
451                .bind(SCHEMA_VERSION)
452                .execute(&mut *tx)
453                .await
454                .map_err(store_sqlx_error)?;
455        }
456    }
457    tx.commit().await.map_err(store_sqlx_error)
458}
459
460fn current_epoch_ms() -> u64 {
461    SystemTime::now()
462        .duration_since(UNIX_EPOCH)
463        .unwrap_or_default()
464        .as_millis() as u64
465}
466
467fn current_timestamp_string() -> String {
468    let now = SystemTime::now()
469        .duration_since(UNIX_EPOCH)
470        .unwrap_or_default();
471    format!("unix:{}", now.as_secs())
472}
473
474fn store_sqlx_error(err: sqlx::Error) -> StoreError {
475    StoreError::Backend(err.to_string())
476}
477
478/// Postgres SQLSTATEs that signal transient write contention rather than a hard
479/// failure: serialization failure, deadlock, and lock-acquisition timeout. On the
480/// session head these all mean "a concurrent committer got there first" — i.e. a
481/// revision conflict the caller should reload-and-retry, not a backend error.
482fn is_contention_error(err: &sqlx::Error) -> bool {
483    matches!(
484        err.as_database_error().and_then(|db| db.code()).as_deref(),
485        Some("40001" | "40P01" | "55P03")
486    )
487}
488
489fn plugin_sqlx_error(err: sqlx::Error) -> PluginError {
490    PluginError::Session(err.to_string())
491}
492
493fn process_decode_error(err: serde_json::Error) -> PluginError {
494    PluginError::Session(format!("failed to decode process registry row: {err}"))
495}
496
497fn store_decode_json<T: serde::de::DeserializeOwned>(
498    json: &str,
499    what: &str,
500) -> Result<T, StoreError> {
501    serde_json::from_str(json)
502        .map_err(|err| StoreError::Backend(format!("failed to decode {what}: {err}")))
503}
504
505fn encode_json<T: serde::Serialize>(value: &T) -> String {
506    serde_json::to_string(value).expect("persisted state should serialize")
507}
508
509fn encode_msgpack<T: serde::Serialize>(value: &T) -> Vec<u8> {
510    let mut buf = Vec::with_capacity(1024);
511    rmp_serde::encode::write_named(&mut buf, value).expect("value should serialize");
512    buf
513}
514
515fn decode_msgpack<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Option<T> {
516    rmp_serde::from_slice(bytes).ok()
517}
518
519fn block_on_detached<T: Send + 'static>(
520    future: impl std::future::Future<Output = T> + Send + 'static,
521) -> T {
522    std::thread::spawn(move || {
523        tokio::runtime::Builder::new_current_thread()
524            .enable_all()
525            .build()
526            .expect("postgres manifest runtime")
527            .block_on(future)
528    })
529    .join()
530    .expect("postgres manifest thread")
531}
532
533fn merge_token_ledger_entries(entries: Vec<TokenLedgerEntry>) -> Vec<TokenLedgerEntry> {
534    let mut merged = Vec::<TokenLedgerEntry>::new();
535    for entry in entries {
536        if entry.usage.total() == 0 {
537            continue;
538        }
539        if let Some(existing) = merged
540            .iter_mut()
541            .find(|existing| existing.source == entry.source && existing.model == entry.model)
542        {
543            existing.usage.input_tokens += entry.usage.input_tokens;
544            existing.usage.output_tokens += entry.usage.output_tokens;
545            existing.usage.cached_input_tokens += entry.usage.cached_input_tokens;
546            existing.usage.reasoning_tokens += entry.usage.reasoning_tokens;
547        } else {
548            merged.push(entry);
549        }
550    }
551    merged
552}
553
554#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
555struct SessionCheckpointEnvelope {
556    manifest: SessionCheckpoint,
557    tool_state: Option<lash_core::ToolState>,
558    plugin_snapshot: Option<lash_core::PluginSessionSnapshot>,
559    execution_state: Option<Vec<u8>>,
560}
561
562async fn put_blob_tx(
563    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
564    content: &[u8],
565) -> Result<BlobRef, StoreError> {
566    let hash = format!("{:x}", Sha256::digest(content));
567    sqlx::query(
568        "INSERT INTO lash_blobs (hash, content)
569         VALUES ($1, $2)
570         ON CONFLICT (hash) DO NOTHING",
571    )
572    .bind(&hash)
573    .bind(content)
574    .execute(&mut **tx)
575    .await
576    .map_err(store_sqlx_error)?;
577    Ok(BlobRef(hash))
578}
579
580async fn get_blob(pool: &PgPool, blob_ref: &BlobRef) -> Option<Vec<u8>> {
581    sqlx::query_scalar::<_, Vec<u8>>("SELECT content FROM lash_blobs WHERE hash = $1")
582        .bind(blob_ref.as_str())
583        .fetch_optional(pool)
584        .await
585        .ok()
586        .flatten()
587}
588
589async fn put_checkpoint_tx(
590    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
591    checkpoint: &HydratedSessionCheckpoint,
592) -> Result<(BlobRef, SessionCheckpoint), StoreError> {
593    let manifest = SessionCheckpoint {
594        turn_state: checkpoint.turn_state.clone(),
595        tool_state_ref: checkpoint.tool_state_ref.clone(),
596        plugin_snapshot_ref: checkpoint.plugin_snapshot_ref.clone(),
597        plugin_snapshot_revision: checkpoint.plugin_snapshot_revision,
598        execution_state_ref: checkpoint.execution_state_ref.clone(),
599    };
600    let envelope = SessionCheckpointEnvelope {
601        manifest: manifest.clone(),
602        tool_state: checkpoint.tool_state.clone(),
603        plugin_snapshot: checkpoint.plugin_snapshot.clone(),
604        execution_state: checkpoint.execution_state.clone(),
605    };
606    let bytes = encode_msgpack(&envelope);
607    let checkpoint_ref = put_blob_tx(tx, &bytes).await?;
608    Ok((checkpoint_ref, manifest))
609}
610
611async fn get_checkpoint(pool: &PgPool, blob_ref: &BlobRef) -> Option<HydratedSessionCheckpoint> {
612    let bytes = get_blob(pool, blob_ref).await?;
613    let envelope: SessionCheckpointEnvelope = decode_msgpack(&bytes)?;
614    Some(HydratedSessionCheckpoint {
615        turn_state: envelope.manifest.turn_state,
616        tool_state_ref: envelope.manifest.tool_state_ref,
617        tool_state: envelope.tool_state,
618        plugin_snapshot_ref: envelope.manifest.plugin_snapshot_ref,
619        plugin_snapshot: envelope.plugin_snapshot,
620        plugin_snapshot_revision: envelope.manifest.plugin_snapshot_revision,
621        execution_state_ref: envelope.manifest.execution_state_ref,
622        execution_state: envelope.execution_state,
623    })
624}
625
626async fn load_session_head_meta_tx(
627    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
628    session_id: &str,
629    for_update: bool,
630) -> Result<Option<SessionHeadMeta>, StoreError> {
631    let sql = if for_update {
632        "SELECT head_json, head_revision FROM lash_sessions WHERE session_id = $1 FOR UPDATE"
633    } else {
634        "SELECT head_json, head_revision FROM lash_sessions WHERE session_id = $1"
635    };
636    let row = sqlx::query(sql)
637        .bind(session_id)
638        .fetch_optional(&mut **tx)
639        .await
640        .map_err(store_sqlx_error)?;
641    let Some(row) = row else {
642        return Ok(None);
643    };
644    let head_json: String = row.get(0);
645    let head_revision: i64 = row.get(1);
646    let mut meta: SessionHeadMeta = store_decode_json(&head_json, "session head")?;
647    meta.head_revision = head_revision as u64;
648    Ok(Some(meta))
649}
650
651async fn load_usage_deltas(pool: &PgPool, session_id: &str) -> Vec<TokenLedgerEntry> {
652    let rows = sqlx::query(
653        "SELECT entry_json FROM lash_usage_deltas WHERE session_id = $1 ORDER BY seq ASC",
654    )
655    .bind(session_id)
656    .fetch_all(pool)
657    .await
658    .unwrap_or_default();
659    rows.into_iter()
660        .filter_map(|row| {
661            let json: String = row.get(0);
662            serde_json::from_str(&json).ok()
663        })
664        .collect()
665}
666
667async fn load_graph(
668    pool: &PgPool,
669    session_id: &str,
670    leaf_node_id: Option<String>,
671    active_path: bool,
672) -> Result<lash_core::SessionGraph, StoreError> {
673    let rows = sqlx::query(
674        "SELECT node_json FROM lash_graph_nodes
675         WHERE session_id = $1 AND tombstoned = FALSE
676         ORDER BY seq ASC",
677    )
678    .bind(session_id)
679    .fetch_all(pool)
680    .await
681    .map_err(store_sqlx_error)?;
682    let mut nodes = Vec::<SessionNodeRecord>::new();
683    for row in rows {
684        let json: String = row.get(0);
685        nodes.push(store_decode_json(&json, "session graph node")?);
686    }
687    if active_path && let Some(leaf) = leaf_node_id.clone() {
688        let wanted = active_path_node_ids(&nodes, &leaf);
689        nodes.retain(|node| wanted.contains(&node.node_id));
690    }
691    Ok(lash_core::SessionGraph::from_nodes(nodes, leaf_node_id))
692}
693
694fn active_path_node_ids(nodes: &[SessionNodeRecord], leaf_node_id: &str) -> HashSet<String> {
695    let mut parent_by_id = std::collections::BTreeMap::new();
696    for node in nodes {
697        parent_by_id.insert(node.node_id.clone(), node.parent_node_id.clone());
698    }
699    let mut wanted = HashSet::new();
700    let mut cursor = Some(leaf_node_id.to_string());
701    while let Some(node_id) = cursor {
702        if !wanted.insert(node_id.clone()) {
703            break;
704        }
705        cursor = parent_by_id.get(&node_id).cloned().flatten();
706    }
707    wanted
708}
709
710async fn commit_attachment_refs_tx(
711    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
712    session_id: &str,
713    attachment_ids: &[AttachmentId],
714) -> Result<(), StoreError> {
715    if attachment_ids.is_empty() {
716        return Ok(());
717    }
718    let now = current_epoch_ms() as i64;
719    for id in attachment_ids {
720        sqlx::query(
721            "UPDATE lash_attachment_manifest
722             SET committed_at_ms = COALESCE(committed_at_ms, $1)
723             WHERE attachment_id = $2 AND session_id = $3",
724        )
725        .bind(now)
726        .bind(id.as_str())
727        .bind(session_id)
728        .execute(&mut **tx)
729        .await
730        .map_err(store_sqlx_error)?;
731    }
732    Ok(())
733}
734
735impl AttachmentManifest for PostgresSessionStore {
736    fn record_intent(&self, intent: AttachmentIntent) -> Result<(), StoreError> {
737        let pool = self.pool.clone();
738        block_on_detached(async move {
739            sqlx::query(
740                "INSERT INTO lash_attachment_manifest (
741                    attachment_id, session_id, canonical_uri, intent_at_ms, committed_at_ms
742                 )
743                 VALUES ($1, $2, $3, $4, NULL)
744                 ON CONFLICT (attachment_id) DO UPDATE SET
745                    session_id = EXCLUDED.session_id,
746                    canonical_uri = EXCLUDED.canonical_uri,
747                    intent_at_ms = EXCLUDED.intent_at_ms",
748            )
749            .bind(intent.attachment_id.as_str())
750            .bind(intent.session_id)
751            .bind(intent.canonical_uri)
752            .bind(intent.intent_at_epoch_ms as i64)
753            .execute(&pool)
754            .await
755            .map(|_| ())
756            .map_err(store_sqlx_error)
757        })
758    }
759
760    fn commit_refs(
761        &self,
762        session_id: &str,
763        attachment_ids: &[AttachmentId],
764    ) -> Result<(), StoreError> {
765        let pool = self.pool.clone();
766        let session_id = session_id.to_string();
767        let attachment_ids = attachment_ids.to_vec();
768        block_on_detached(async move {
769            let mut tx = pool.begin().await.map_err(store_sqlx_error)?;
770            commit_attachment_refs_tx(&mut tx, &session_id, &attachment_ids).await?;
771            tx.commit().await.map_err(store_sqlx_error)
772        })
773    }
774
775    fn list_uncommitted(
776        &self,
777        older_than_epoch_ms: u64,
778    ) -> Result<Vec<AttachmentManifestEntry>, StoreError> {
779        let pool = self.pool.clone();
780        block_on_detached(async move {
781            let rows = sqlx::query(
782                "SELECT attachment_id, session_id, canonical_uri, intent_at_ms, committed_at_ms
783                 FROM lash_attachment_manifest
784                 WHERE committed_at_ms IS NULL AND intent_at_ms <= $1
785                 ORDER BY attachment_id ASC",
786            )
787            .bind(older_than_epoch_ms as i64)
788            .fetch_all(&pool)
789            .await
790            .map_err(store_sqlx_error)?;
791            Ok(rows
792                .into_iter()
793                .map(|row| AttachmentManifestEntry {
794                    attachment_id: AttachmentId::new(row.get::<String, _>(0)),
795                    session_id: row.get(1),
796                    canonical_uri: row.get(2),
797                    intent_at_epoch_ms: row.get::<i64, _>(3) as u64,
798                    committed_at_epoch_ms: row.get::<Option<i64>, _>(4).map(|value| value as u64),
799                })
800                .collect())
801        })
802    }
803
804    fn forget(&self, attachment_id: &AttachmentId) -> Result<(), StoreError> {
805        let pool = self.pool.clone();
806        let attachment_id = attachment_id.to_string();
807        block_on_detached(async move {
808            sqlx::query("DELETE FROM lash_attachment_manifest WHERE attachment_id = $1")
809                .bind(attachment_id)
810                .execute(&pool)
811                .await
812                .map(|_| ())
813                .map_err(store_sqlx_error)
814        })
815    }
816}
817
818#[async_trait::async_trait]
819impl SessionStoreFactory for PostgresSessionStoreFactory {
820    fn durability_tier(&self) -> DurabilityTier {
821        DurabilityTier::Durable
822    }
823
824    async fn create_store(
825        &self,
826        request: &SessionStoreCreateRequest,
827    ) -> Result<Arc<dyn RuntimePersistence>, String> {
828        let store = PostgresSessionStore {
829            pool: self.pool.clone(),
830            session_id: Some(request.session_id.clone()),
831            bound_session: Arc::new(OnceLock::new()),
832        };
833        if store
834            .load_session_meta()
835            .await
836            .map_err(|err| err.to_string())?
837            .is_none()
838        {
839            store
840                .save_session_meta(SessionMeta {
841                    session_id: request.session_id.clone(),
842                    session_name: request.session_id.clone(),
843                    created_at: current_timestamp_string(),
844                    model: request.policy.model.id.clone(),
845                    cwd: std::env::current_dir()
846                        .ok()
847                        .and_then(|path| path.to_str().map(str::to_string)),
848                    relation: request.relation.clone(),
849                })
850                .await
851                .map_err(|err| err.to_string())?;
852        }
853        Ok(Arc::new(store))
854    }
855
856    async fn delete_session(&self, session_id: &str) -> Result<(), String> {
857        let mut tx = self.pool.begin().await.map_err(|err| err.to_string())?;
858        for sql in [
859            "DELETE FROM lash_queued_work_items WHERE batch_id IN (SELECT batch_id FROM lash_queued_work_batches WHERE session_id = $1)",
860            "DELETE FROM lash_queued_work_batches WHERE session_id = $1",
861            "DELETE FROM lash_usage_deltas WHERE session_id = $1",
862            "DELETE FROM lash_graph_nodes WHERE session_id = $1",
863            "DELETE FROM lash_runtime_turn_commits WHERE session_id = $1",
864            "DELETE FROM lash_session_meta WHERE session_id = $1",
865            "DELETE FROM lash_sessions WHERE session_id = $1",
866            "DELETE FROM lash_attachment_manifest WHERE session_id = $1",
867        ] {
868            sqlx::query(sql)
869                .bind(session_id)
870                .execute(&mut *tx)
871                .await
872                .map_err(|err| err.to_string())?;
873        }
874        tx.commit().await.map_err(|err| err.to_string())
875    }
876}
877
878#[derive(Clone, Debug)]
879struct QueuedBatchRow {
880    enqueue_seq: u64,
881    batch_id: String,
882    session_id: String,
883    source_key: Option<String>,
884    delivery_policy: DeliveryPolicy,
885    slot_policy: SlotPolicy,
886    merge_key: MergeKey,
887    available_at_ms: u64,
888    enqueued_at_ms: u64,
889    claim_fencing_token: u64,
890}
891
892fn queued_batch_row(row: PgRow) -> Result<QueuedBatchRow, StoreError> {
893    let delivery_policy =
894        DeliveryPolicy::from_wire_str(row.get::<String, _>("delivery_policy").as_str())
895            .ok_or_else(|| {
896                StoreError::Backend("invalid queued work delivery policy".to_string())
897            })?;
898    let slot_policy = SlotPolicy::from_wire_str(row.get::<String, _>("slot_policy").as_str())
899        .ok_or_else(|| StoreError::Backend("invalid queued work slot policy".to_string()))?;
900    let merge_json: String = row.get("merge_key_json");
901    Ok(QueuedBatchRow {
902        enqueue_seq: row.get::<i64, _>("enqueue_seq") as u64,
903        batch_id: row.get("batch_id"),
904        session_id: row.get("session_id"),
905        source_key: row.get("source_key"),
906        delivery_policy,
907        slot_policy,
908        merge_key: store_decode_json(&merge_json, "queued work merge key")?,
909        available_at_ms: row.get::<i64, _>("available_at_ms") as u64,
910        enqueued_at_ms: row.get::<i64, _>("enqueued_at_ms") as u64,
911        claim_fencing_token: row.get::<i64, _>("claim_fencing_token") as u64,
912    })
913}
914
915async fn load_queued_batch(
916    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
917    batch_id: &str,
918) -> Result<Option<QueuedWorkBatch>, StoreError> {
919    let row = sqlx::query(
920        "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
921                slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
922                claim_fencing_token
923         FROM lash_queued_work_batches
924         WHERE batch_id = $1",
925    )
926    .bind(batch_id)
927    .fetch_optional(&mut **tx)
928    .await
929    .map_err(store_sqlx_error)?;
930    let Some(row) = row else {
931        return Ok(None);
932    };
933    let row = queued_batch_row(row)?;
934    queued_work_batch_from_row(tx, row).await.map(Some)
935}
936
937async fn queued_work_batch_from_row(
938    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
939    row: QueuedBatchRow,
940) -> Result<QueuedWorkBatch, StoreError> {
941    let item_rows = sqlx::query(
942        "SELECT item_id, payload_json
943         FROM lash_queued_work_items
944         WHERE batch_id = $1
945         ORDER BY item_index ASC",
946    )
947    .bind(&row.batch_id)
948    .fetch_all(&mut **tx)
949    .await
950    .map_err(store_sqlx_error)?;
951    let mut items = Vec::new();
952    for item in item_rows {
953        let payload_json: String = item.get(1);
954        items.push(QueuedWorkItem {
955            item_id: item.get(0),
956            payload: store_decode_json(&payload_json, "queued work payload")?,
957        });
958    }
959    Ok(QueuedWorkBatch {
960        batch_id: row.batch_id,
961        session_id: row.session_id,
962        enqueue_seq: row.enqueue_seq,
963        source_key: row.source_key,
964        delivery_policy: row.delivery_policy,
965        slot_policy: row.slot_policy,
966        merge_key: row.merge_key,
967        available_at_ms: row.available_at_ms,
968        enqueued_at_ms: row.enqueued_at_ms,
969        items,
970    })
971}
972
973async fn ensure_queued_work_completion_tx(
974    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
975    completed: &QueuedWorkCompletion,
976) -> Result<(), StoreError> {
977    for batch_id in &completed.batch_ids {
978        let exists: Option<i64> = sqlx::query_scalar(
979            "SELECT 1::BIGINT FROM lash_queued_work_batches
980             WHERE session_id = $1
981               AND batch_id = $2
982               AND claim_id = $3
983               AND claim_token = $4
984             LIMIT 1",
985        )
986        .bind(&completed.session_id)
987        .bind(batch_id)
988        .bind(&completed.claim_id)
989        .bind(&completed.lease_token)
990        .fetch_optional(&mut **tx)
991        .await
992        .map_err(store_sqlx_error)?;
993        if exists.is_none() {
994            return Err(StoreError::QueuedWorkClaimExpired {
995                session_id: completed.session_id.clone(),
996                claim_id: completed.claim_id.clone(),
997            });
998        }
999    }
1000    Ok(())
1001}
1002
1003#[async_trait::async_trait]
1004impl RuntimePersistence for PostgresSessionStore {
1005    fn durability_tier(&self) -> DurabilityTier {
1006        DurabilityTier::Durable
1007    }
1008
1009    async fn load_session(
1010        &self,
1011        scope: SessionReadScope,
1012    ) -> Result<Option<PersistedSessionRead>, StoreError> {
1013        let Some(session_id) = self.selected_session_id().await? else {
1014            return Ok(None);
1015        };
1016        let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1017        let Some(meta) = load_session_head_meta_tx(&mut tx, &session_id, false).await? else {
1018            return Ok(None);
1019        };
1020        tx.commit().await.map_err(store_sqlx_error)?;
1021        let leaf_node_id = match &scope {
1022            SessionReadScope::FullGraph => meta.leaf_node_id.clone(),
1023            SessionReadScope::ActivePath { leaf_node_id } => {
1024                leaf_node_id.clone().or_else(|| meta.leaf_node_id.clone())
1025            }
1026        };
1027        let graph = load_graph(
1028            &self.pool,
1029            &session_id,
1030            leaf_node_id.clone(),
1031            matches!(scope, SessionReadScope::ActivePath { .. }),
1032        )
1033        .await?;
1034        let checkpoint = match meta.checkpoint_ref.as_ref() {
1035            Some(blob_ref) => get_checkpoint(&self.pool, blob_ref).await,
1036            None => None,
1037        };
1038        Ok(Some(PersistedSessionRead {
1039            session_id: meta.session_id,
1040            head_revision: meta.head_revision,
1041            config: meta.config,
1042            agent_frames: meta.agent_frames,
1043            current_agent_frame_id: meta.current_agent_frame_id,
1044            graph,
1045            checkpoint_ref: meta.checkpoint_ref,
1046            checkpoint,
1047            token_ledger: merge_token_ledger_entries(
1048                load_usage_deltas(&self.pool, &session_id).await,
1049            ),
1050        }))
1051    }
1052
1053    async fn load_node(&self, node_id: &str) -> Result<Option<SessionNodeRecord>, StoreError> {
1054        let json: Option<String> = if let Some(session_id) = &self.session_id {
1055            sqlx::query_scalar(
1056                "SELECT node_json FROM lash_graph_nodes
1057                 WHERE session_id = $1 AND node_id = $2 AND tombstoned = FALSE",
1058            )
1059            .bind(session_id)
1060            .bind(node_id)
1061            .fetch_optional(&self.pool)
1062            .await
1063            .map_err(store_sqlx_error)?
1064        } else {
1065            sqlx::query_scalar(
1066                "SELECT node_json FROM lash_graph_nodes
1067                 WHERE node_id = $1 AND tombstoned = FALSE
1068                 ORDER BY session_id ASC
1069                 LIMIT 1",
1070            )
1071            .bind(node_id)
1072            .fetch_optional(&self.pool)
1073            .await
1074            .map_err(store_sqlx_error)?
1075        };
1076        json.map(|json| store_decode_json(&json, "session graph node"))
1077            .transpose()
1078    }
1079
1080    async fn commit_runtime_state(
1081        &self,
1082        commit: RuntimeCommit,
1083    ) -> Result<RuntimeCommitResult, StoreError> {
1084        let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1085        // Read the head WITHOUT a lock; the conditional CAS write below is the
1086        // serialization point (optimistic concurrency), so no pessimistic
1087        // `FOR UPDATE` lock is held across the rest of this transaction.
1088        let existing = load_session_head_meta_tx(&mut tx, &commit.session_id, false).await?;
1089        if let Some(bound_session_id) = existing.as_ref().map(|meta| meta.session_id.as_str())
1090            && bound_session_id != commit.session_id
1091        {
1092            return Err(StoreError::SessionBindingMismatch {
1093                bound_session_id: bound_session_id.to_string(),
1094                attempted_session_id: commit.session_id,
1095            });
1096        }
1097        // A session-store handle commits to exactly one session. An explicit
1098        // binding (`self.session_id`) is authoritative; otherwise the handle binds
1099        // to the first session it commits and rejects any other thereafter.
1100        let effective_binding = self
1101            .session_id
1102            .clone()
1103            .or_else(|| self.bound_session.get().cloned());
1104        if let Some(bound_session_id) = &effective_binding
1105            && commit.session_id != *bound_session_id
1106        {
1107            return Err(StoreError::SessionBindingMismatch {
1108                bound_session_id: bound_session_id.clone(),
1109                attempted_session_id: commit.session_id,
1110            });
1111        }
1112        if self.session_id.is_none() {
1113            let _ = self.bound_session.set(commit.session_id.clone());
1114        }
1115        if let Some(completed) = &commit.turn_commit {
1116            if completed.session_id != commit.session_id {
1117                return Err(StoreError::RuntimeTurnCommitConflict {
1118                    session_id: completed.session_id.clone(),
1119                    turn_id: completed.turn_id.clone(),
1120                });
1121            }
1122            let prior = sqlx::query(
1123                "SELECT turn_commit_hash, result_json
1124                 FROM lash_runtime_turn_commits
1125                 WHERE session_id = $1 AND turn_id = $2",
1126            )
1127            .bind(&completed.session_id)
1128            .bind(&completed.turn_id)
1129            .fetch_optional(&mut *tx)
1130            .await
1131            .map_err(store_sqlx_error)?;
1132            if let Some(row) = prior {
1133                let hash: String = row.get(0);
1134                let result_json: String = row.get(1);
1135                if hash == completed.turn_commit_hash {
1136                    return store_decode_json(&result_json, "runtime turn commit result");
1137                }
1138                return Err(StoreError::RuntimeTurnCommitConflict {
1139                    session_id: completed.session_id.clone(),
1140                    turn_id: completed.turn_id.clone(),
1141                });
1142            }
1143        }
1144        let actual_revision = existing.as_ref().map_or(0, |meta| meta.head_revision);
1145        if commit.expected_head_revision.is_some()
1146            && commit.expected_head_revision != Some(actual_revision)
1147        {
1148            return Err(StoreError::HeadRevisionConflict {
1149                expected: commit.expected_head_revision,
1150                actual: actual_revision,
1151            });
1152        }
1153        for completed in &commit.completed_queue_claims {
1154            if completed.session_id != commit.session_id {
1155                return Err(StoreError::QueuedWorkClaimExpired {
1156                    session_id: completed.session_id.clone(),
1157                    claim_id: completed.claim_id.clone(),
1158                });
1159            }
1160            ensure_queued_work_completion_tx(&mut tx, completed).await?;
1161        }
1162        let (checkpoint_ref, manifest) = put_checkpoint_tx(&mut tx, &commit.checkpoint).await?;
1163        for entry in &commit.usage_deltas {
1164            sqlx::query("INSERT INTO lash_usage_deltas (session_id, entry_json) VALUES ($1, $2)")
1165                .bind(&commit.session_id)
1166                .bind(encode_json(entry))
1167                .execute(&mut *tx)
1168                .await
1169                .map_err(store_sqlx_error)?;
1170        }
1171        let leaf_node_id = match &commit.graph {
1172            GraphCommitDelta::Unchanged { leaf_node_id } => leaf_node_id.clone(),
1173            GraphCommitDelta::Append {
1174                nodes,
1175                leaf_node_id,
1176            } => {
1177                for node in nodes {
1178                    sqlx::query(
1179                        "INSERT INTO lash_graph_nodes (session_id, node_id, node_json)
1180                         VALUES ($1, $2, $3)
1181                         ON CONFLICT (session_id, node_id) DO UPDATE SET
1182                            node_json = EXCLUDED.node_json,
1183                            tombstoned = FALSE",
1184                    )
1185                    .bind(&commit.session_id)
1186                    .bind(&node.node_id)
1187                    .bind(encode_json(node))
1188                    .execute(&mut *tx)
1189                    .await
1190                    .map_err(store_sqlx_error)?;
1191                }
1192                leaf_node_id.clone()
1193            }
1194            GraphCommitDelta::ReplaceFull(graph) => {
1195                sqlx::query("DELETE FROM lash_graph_nodes WHERE session_id = $1")
1196                    .bind(&commit.session_id)
1197                    .execute(&mut *tx)
1198                    .await
1199                    .map_err(store_sqlx_error)?;
1200                for node in &graph.nodes {
1201                    sqlx::query(
1202                        "INSERT INTO lash_graph_nodes (session_id, node_id, node_json)
1203                         VALUES ($1, $2, $3)",
1204                    )
1205                    .bind(&commit.session_id)
1206                    .bind(&node.node_id)
1207                    .bind(encode_json(node))
1208                    .execute(&mut *tx)
1209                    .await
1210                    .map_err(store_sqlx_error)?;
1211                }
1212                graph.leaf_node_id.clone()
1213            }
1214        };
1215        let graph_node_count: i64 = sqlx::query_scalar(
1216            "SELECT COUNT(*) FROM lash_graph_nodes WHERE session_id = $1 AND tombstoned = FALSE",
1217        )
1218        .bind(&commit.session_id)
1219        .fetch_one(&mut *tx)
1220        .await
1221        .map_err(store_sqlx_error)?;
1222        let next_revision = actual_revision + 1;
1223        let meta = SessionHeadMeta {
1224            session_id: commit.session_id.clone(),
1225            head_revision: next_revision,
1226            config: commit.config.clone(),
1227            agent_frames: commit.agent_frames.clone(),
1228            current_agent_frame_id: commit.current_agent_frame_id.clone(),
1229            checkpoint_ref: Some(checkpoint_ref.clone()),
1230            leaf_node_id,
1231            graph_node_count: graph_node_count as usize,
1232            token_ledger: Vec::new(),
1233        };
1234        // Optimistic CAS on the head revision. The `WHERE head_revision = $5`
1235        // guard makes the write succeed only if no concurrent committer moved the
1236        // head since our unlocked read above. A brand-new session inserts (no
1237        // conflict); an existing one updates only when the revision still matches.
1238        let head_write = sqlx::query(
1239            "INSERT INTO lash_sessions (session_id, head_revision, head_json, checkpoint_ref)
1240             VALUES ($1, $2, $3, $4)
1241             ON CONFLICT (session_id) DO UPDATE SET
1242                head_revision = EXCLUDED.head_revision,
1243                head_json = EXCLUDED.head_json,
1244                checkpoint_ref = EXCLUDED.checkpoint_ref
1245             WHERE lash_sessions.head_revision = $5",
1246        )
1247        .bind(&commit.session_id)
1248        .bind(next_revision as i64)
1249        .bind(encode_json(&meta))
1250        .bind(checkpoint_ref.as_str())
1251        .bind(actual_revision as i64)
1252        .execute(&mut *tx)
1253        .await;
1254        let head_write = match head_write {
1255            Ok(result) => result,
1256            Err(err) if is_contention_error(&err) => {
1257                // The head row is contended by a concurrent committer (lock
1258                // timeout / serialization failure / deadlock). That is a conflict,
1259                // not an opaque backend error: surface it so the caller reloads
1260                // and retries. The tx is now aborted; returning drops it.
1261                return Err(StoreError::HeadRevisionConflict {
1262                    expected: commit.expected_head_revision.or(Some(actual_revision)),
1263                    actual: actual_revision,
1264                });
1265            }
1266            Err(err) => return Err(store_sqlx_error(err)),
1267        };
1268        if head_write.rows_affected() == 0 {
1269            // A concurrent commit won the race: the head no longer matches the
1270            // revision we read. Re-read the now-current revision for an accurate
1271            // report, then drop `tx` (auto-rollback), discarding this attempt's
1272            // node/usage writes; the caller reloads and retries.
1273            let actual_now = sqlx::query_scalar::<_, i64>(
1274                "SELECT head_revision FROM lash_sessions WHERE session_id = $1",
1275            )
1276            .bind(&commit.session_id)
1277            .fetch_optional(&mut *tx)
1278            .await
1279            .map_err(store_sqlx_error)?
1280            .map_or(actual_revision, |revision| revision as u64);
1281            return Err(StoreError::HeadRevisionConflict {
1282                expected: commit.expected_head_revision.or(Some(actual_revision)),
1283                actual: actual_now,
1284            });
1285        }
1286        for completed in &commit.completed_queue_claims {
1287            for batch_id in &completed.batch_ids {
1288                sqlx::query(
1289                    "DELETE FROM lash_queued_work_batches
1290                     WHERE session_id = $1 AND batch_id = $2 AND claim_id = $3 AND claim_token = $4",
1291                )
1292                .bind(&completed.session_id)
1293                .bind(batch_id)
1294                .bind(&completed.claim_id)
1295                .bind(&completed.lease_token)
1296                .execute(&mut *tx)
1297                .await
1298                .map_err(store_sqlx_error)?;
1299            }
1300        }
1301        commit_attachment_refs_tx(
1302            &mut tx,
1303            &commit.session_id,
1304            &commit.committed_attachment_ids,
1305        )
1306        .await?;
1307        let result = RuntimeCommitResult {
1308            head_revision: next_revision,
1309            checkpoint_ref,
1310            manifest,
1311        };
1312        if let Some(completed) = &commit.turn_commit {
1313            sqlx::query(
1314                "INSERT INTO lash_runtime_turn_commits (
1315                    session_id, turn_id, turn_commit_hash, result_json, committed_at_ms
1316                 )
1317                 VALUES ($1, $2, $3, $4, $5)",
1318            )
1319            .bind(&completed.session_id)
1320            .bind(&completed.turn_id)
1321            .bind(&completed.turn_commit_hash)
1322            .bind(encode_json(&result))
1323            .bind(current_epoch_ms() as i64)
1324            .execute(&mut *tx)
1325            .await
1326            .map_err(store_sqlx_error)?;
1327        }
1328        tx.commit().await.map_err(store_sqlx_error)?;
1329        Ok(result)
1330    }
1331
1332    async fn enqueue_queued_work(
1333        &self,
1334        batch: QueuedWorkBatchDraft,
1335    ) -> Result<QueuedWorkBatch, StoreError> {
1336        let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1337        if let Some(source_key) = batch.source_key.as_deref() {
1338            let existing_id: Option<String> = sqlx::query_scalar(
1339                "SELECT batch_id FROM lash_queued_work_batches
1340                 WHERE session_id = $1 AND source_key = $2",
1341            )
1342            .bind(&batch.session_id)
1343            .bind(source_key)
1344            .fetch_optional(&mut *tx)
1345            .await
1346            .map_err(store_sqlx_error)?;
1347            if let Some(batch_id) = existing_id {
1348                let existing = load_queued_batch(&mut tx, &batch_id)
1349                    .await?
1350                    .ok_or_else(|| {
1351                        StoreError::Backend("queued work source row disappeared".to_string())
1352                    })?;
1353                tx.commit().await.map_err(store_sqlx_error)?;
1354                return Ok(existing);
1355            }
1356        }
1357        let now = current_epoch_ms();
1358        let batch_id = derive_batch_id(&batch.session_id, batch.source_key.as_deref(), now, None);
1359        let row = sqlx::query_scalar::<_, i64>(
1360            "INSERT INTO lash_queued_work_batches (
1361                batch_id, session_id, source_key, delivery_policy, slot_policy,
1362                merge_key_json, available_at_ms, enqueued_at_ms
1363             )
1364             VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
1365             RETURNING enqueue_seq",
1366        )
1367        .bind(&batch_id)
1368        .bind(&batch.session_id)
1369        .bind(&batch.source_key)
1370        .bind(batch.delivery_policy.as_str())
1371        .bind(batch.slot_policy.as_str())
1372        .bind(encode_json(&batch.merge_key))
1373        .bind(batch.available_at_ms as i64)
1374        .bind(now as i64)
1375        .fetch_one(&mut *tx)
1376        .await
1377        .map_err(store_sqlx_error)?;
1378        for (index, payload) in batch.payloads.iter().enumerate() {
1379            let item_id = format!("{batch_id}:item:{index}");
1380            sqlx::query(
1381                "INSERT INTO lash_queued_work_items (batch_id, item_index, item_id, payload_json)
1382                 VALUES ($1, $2, $3, $4)",
1383            )
1384            .bind(&batch_id)
1385            .bind(index as i32)
1386            .bind(item_id)
1387            .bind(encode_json(payload))
1388            .execute(&mut *tx)
1389            .await
1390            .map_err(store_sqlx_error)?;
1391        }
1392        let queued = load_queued_batch(&mut tx, &batch_id)
1393            .await?
1394            .ok_or_else(|| StoreError::Backend("queued work insert disappeared".to_string()))?;
1395        debug_assert_eq!(queued.enqueue_seq, row as u64);
1396        tx.commit().await.map_err(store_sqlx_error)?;
1397        Ok(queued)
1398    }
1399
1400    async fn claim_ready_queued_work(
1401        &self,
1402        session_id: &str,
1403        owner_id: &str,
1404        boundary: QueuedWorkClaimBoundary,
1405        lease_ttl_ms: u64,
1406        max_batches: usize,
1407    ) -> Result<Option<QueuedWorkClaim>, StoreError> {
1408        if max_batches == 0 {
1409            return Ok(None);
1410        }
1411        let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1412        let now = current_epoch_ms();
1413        let rows = sqlx::query(
1414            "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1415                    slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1416                    claim_fencing_token
1417             FROM lash_queued_work_batches
1418             WHERE session_id = $1
1419               AND available_at_ms <= $2
1420               AND (claim_token IS NULL OR claim_expires_at_ms <= $2)
1421             ORDER BY enqueue_seq ASC
1422             LIMIT $3
1423             FOR UPDATE SKIP LOCKED",
1424        )
1425        .bind(session_id)
1426        .bind(now as i64)
1427        .bind(claim_scan_limit(max_batches))
1428        .fetch_all(&mut *tx)
1429        .await
1430        .map_err(store_sqlx_error)?;
1431        let mut selected = Vec::new();
1432        for row in rows {
1433            selected.push(queued_batch_row(row)?);
1434        }
1435        let candidates = selected
1436            .iter()
1437            .map(|row| ClaimCandidate {
1438                enqueue_seq: row.enqueue_seq,
1439                claim_fencing_token: row.claim_fencing_token,
1440                delivery_policy: row.delivery_policy,
1441                slot_policy: row.slot_policy,
1442                merge_key: row.merge_key.clone(),
1443            })
1444            .collect::<Vec<_>>();
1445        let selected_len = select_claim_prefix(&candidates, boundary, max_batches);
1446        if selected_len == 0 {
1447            tx.commit().await.map_err(store_sqlx_error)?;
1448            return Ok(None);
1449        }
1450        selected.truncate(selected_len);
1451        let lease =
1452            QueuedWorkClaimLease::derive(&candidates[0], session_id, owner_id, now, lease_ttl_ms);
1453        for row in &selected {
1454            let changed = sqlx::query(
1455                "UPDATE lash_queued_work_batches
1456                 SET claim_id = $3,
1457                     claim_owner_id = $4,
1458                     claim_token = $5,
1459                     claim_fencing_token = claim_fencing_token + 1,
1460                     claim_claimed_at_ms = $6,
1461                     claim_expires_at_ms = $7
1462                 WHERE session_id = $1
1463                   AND batch_id = $2
1464                   AND (claim_token IS NULL OR claim_expires_at_ms <= $6)",
1465            )
1466            .bind(session_id)
1467            .bind(&row.batch_id)
1468            .bind(&lease.claim_id)
1469            .bind(owner_id)
1470            .bind(&lease.lease_token)
1471            .bind(now as i64)
1472            .bind(lease.expires_at_epoch_ms as i64)
1473            .execute(&mut *tx)
1474            .await
1475            .map_err(store_sqlx_error)?
1476            .rows_affected();
1477            if changed == 0 {
1478                tx.rollback().await.map_err(store_sqlx_error)?;
1479                return Ok(None);
1480            }
1481        }
1482        let mut batches = Vec::new();
1483        for row in selected {
1484            batches.push(queued_work_batch_from_row(&mut tx, row).await?);
1485        }
1486        tx.commit().await.map_err(store_sqlx_error)?;
1487        Ok(Some(QueuedWorkClaim {
1488            session_id: session_id.to_string(),
1489            claim_id: lease.claim_id,
1490            owner_id: owner_id.to_string(),
1491            lease_token: lease.lease_token,
1492            fencing_token: lease.fencing_token,
1493            claimed_at_epoch_ms: lease.claimed_at_epoch_ms,
1494            expires_at_epoch_ms: lease.expires_at_epoch_ms,
1495            batches,
1496        }))
1497    }
1498
1499    async fn renew_queued_work_claim(
1500        &self,
1501        claim: &QueuedWorkClaim,
1502        lease_ttl_ms: u64,
1503    ) -> Result<QueuedWorkClaim, StoreError> {
1504        let expires_at = current_epoch_ms().saturating_add(lease_ttl_ms);
1505        let changed = sqlx::query(
1506            "UPDATE lash_queued_work_batches
1507             SET claim_expires_at_ms = $4
1508             WHERE session_id = $1 AND claim_id = $2 AND claim_token = $3",
1509        )
1510        .bind(&claim.session_id)
1511        .bind(&claim.claim_id)
1512        .bind(&claim.lease_token)
1513        .bind(expires_at as i64)
1514        .execute(&self.pool)
1515        .await
1516        .map_err(store_sqlx_error)?
1517        .rows_affected();
1518        renewed_claim(claim, changed as usize, expires_at)
1519    }
1520
1521    async fn abandon_queued_work_claim(&self, claim: &QueuedWorkClaim) -> Result<(), StoreError> {
1522        sqlx::query(
1523            "UPDATE lash_queued_work_batches
1524             SET claim_id = NULL,
1525                 claim_owner_id = NULL,
1526                 claim_token = NULL,
1527                 claim_claimed_at_ms = 0,
1528                 claim_expires_at_ms = 0
1529             WHERE session_id = $1 AND claim_id = $2 AND claim_token = $3",
1530        )
1531        .bind(&claim.session_id)
1532        .bind(&claim.claim_id)
1533        .bind(&claim.lease_token)
1534        .execute(&self.pool)
1535        .await
1536        .map_err(store_sqlx_error)?;
1537        Ok(())
1538    }
1539
1540    async fn cancel_queued_work_batch(
1541        &self,
1542        session_id: &str,
1543        batch_id: &str,
1544    ) -> Result<Option<QueuedWorkBatch>, StoreError> {
1545        let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1546        let now = current_epoch_ms();
1547        let row = sqlx::query(
1548            "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1549                    slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1550                    claim_fencing_token
1551             FROM lash_queued_work_batches
1552             WHERE session_id = $1
1553               AND batch_id = $2
1554               AND (claim_token IS NULL OR claim_expires_at_ms <= $3)
1555             FOR UPDATE",
1556        )
1557        .bind(session_id)
1558        .bind(batch_id)
1559        .bind(now as i64)
1560        .fetch_optional(&mut *tx)
1561        .await
1562        .map_err(store_sqlx_error)?;
1563        let Some(row) = row else {
1564            tx.commit().await.map_err(store_sqlx_error)?;
1565            return Ok(None);
1566        };
1567        let batch = queued_work_batch_from_row(&mut tx, queued_batch_row(row)?).await?;
1568        sqlx::query("DELETE FROM lash_queued_work_batches WHERE batch_id = $1")
1569            .bind(batch_id)
1570            .execute(&mut *tx)
1571            .await
1572            .map_err(store_sqlx_error)?;
1573        tx.commit().await.map_err(store_sqlx_error)?;
1574        Ok(Some(batch))
1575    }
1576
1577    async fn list_queued_work(&self, session_id: &str) -> Result<Vec<QueuedWorkBatch>, StoreError> {
1578        let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1579        let rows = sqlx::query(
1580            "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1581                    slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1582                    claim_fencing_token
1583             FROM lash_queued_work_batches
1584             WHERE session_id = $1
1585             ORDER BY enqueue_seq ASC",
1586        )
1587        .bind(session_id)
1588        .fetch_all(&mut *tx)
1589        .await
1590        .map_err(store_sqlx_error)?;
1591        let mut batches = Vec::new();
1592        for row in rows {
1593            batches.push(queued_work_batch_from_row(&mut tx, queued_batch_row(row)?).await?);
1594        }
1595        tx.commit().await.map_err(store_sqlx_error)?;
1596        Ok(batches)
1597    }
1598
1599    async fn list_pending_queued_work(
1600        &self,
1601        session_id: &str,
1602    ) -> Result<Vec<QueuedWorkBatch>, StoreError> {
1603        let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1604        let now = current_epoch_ms();
1605        let rows = sqlx::query(
1606            "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1607                    slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1608                    claim_fencing_token
1609             FROM lash_queued_work_batches
1610             WHERE session_id = $1
1611               AND (claim_token IS NULL OR claim_expires_at_ms <= $2)
1612             ORDER BY enqueue_seq ASC",
1613        )
1614        .bind(session_id)
1615        .bind(now as i64)
1616        .fetch_all(&mut *tx)
1617        .await
1618        .map_err(store_sqlx_error)?;
1619        let mut batches = Vec::new();
1620        for row in rows {
1621            batches.push(queued_work_batch_from_row(&mut tx, queued_batch_row(row)?).await?);
1622        }
1623        tx.commit().await.map_err(store_sqlx_error)?;
1624        Ok(batches)
1625    }
1626
1627    async fn save_session_meta(&self, meta: SessionMeta) -> Result<(), StoreError> {
1628        sqlx::query(
1629            "INSERT INTO lash_session_meta (session_id, meta_json)
1630             VALUES ($1, $2)
1631             ON CONFLICT (session_id) DO UPDATE SET meta_json = EXCLUDED.meta_json",
1632        )
1633        .bind(&meta.session_id)
1634        .bind(encode_json(&meta))
1635        .execute(&self.pool)
1636        .await
1637        .map_err(store_sqlx_error)?;
1638        Ok(())
1639    }
1640
1641    async fn load_session_meta(&self) -> Result<Option<SessionMeta>, StoreError> {
1642        let json: Option<String> = if let Some(session_id) = &self.session_id {
1643            sqlx::query_scalar("SELECT meta_json FROM lash_session_meta WHERE session_id = $1")
1644                .bind(session_id)
1645                .fetch_optional(&self.pool)
1646                .await
1647                .map_err(store_sqlx_error)?
1648        } else {
1649            sqlx::query_scalar(
1650                "SELECT meta_json FROM lash_session_meta ORDER BY session_id ASC LIMIT 1",
1651            )
1652            .fetch_optional(&self.pool)
1653            .await
1654            .map_err(store_sqlx_error)?
1655        };
1656        json.map(|json| store_decode_json(&json, "session meta"))
1657            .transpose()
1658    }
1659
1660    async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), StoreError> {
1661        for id in ids {
1662            if let Some(session_id) = &self.session_id {
1663                sqlx::query(
1664                    "UPDATE lash_graph_nodes
1665                     SET tombstoned = TRUE
1666                     WHERE session_id = $1 AND node_id = $2",
1667                )
1668                .bind(session_id)
1669                .bind(id)
1670                .execute(&self.pool)
1671                .await
1672                .map_err(store_sqlx_error)?;
1673            } else {
1674                sqlx::query(
1675                    "UPDATE lash_graph_nodes
1676                     SET tombstoned = TRUE
1677                     WHERE node_id = $1",
1678                )
1679                .bind(id)
1680                .execute(&self.pool)
1681                .await
1682                .map_err(store_sqlx_error)?;
1683            }
1684        }
1685        Ok(())
1686    }
1687
1688    async fn vacuum(&self) -> Result<VacuumReport, StoreError> {
1689        let removed = if let Some(session_id) = &self.session_id {
1690            sqlx::query("DELETE FROM lash_graph_nodes WHERE session_id = $1 AND tombstoned = TRUE")
1691                .bind(session_id)
1692                .execute(&self.pool)
1693                .await
1694                .map_err(store_sqlx_error)?
1695                .rows_affected()
1696        } else {
1697            sqlx::query("DELETE FROM lash_graph_nodes WHERE tombstoned = TRUE")
1698                .execute(&self.pool)
1699                .await
1700                .map_err(store_sqlx_error)?
1701                .rows_affected()
1702        };
1703        Ok(VacuumReport {
1704            removed_node_count: removed as usize,
1705        })
1706    }
1707
1708    async fn gc_unreachable(&self) -> Result<GcReport, StoreError> {
1709        Ok(GcReport::default())
1710    }
1711}
1712
1713fn process_status_label(record: &ProcessRecord) -> &'static str {
1714    record.status.label()
1715}
1716
1717async fn load_process_tx(
1718    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1719    process_id: &str,
1720) -> Result<Option<ProcessRecord>, PluginError> {
1721    let json: Option<String> = sqlx::query_scalar(
1722        "SELECT record_json
1723             FROM lash_processes
1724             WHERE process_id = $1
1725             FOR UPDATE",
1726    )
1727    .bind(process_id)
1728    .fetch_optional(&mut **tx)
1729    .await
1730    .map_err(plugin_sqlx_error)?;
1731    json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
1732        .transpose()
1733}
1734
1735async fn load_process(
1736    pool: &PgPool,
1737    process_id: &str,
1738) -> Result<Option<ProcessRecord>, PluginError> {
1739    let json: Option<String> =
1740        sqlx::query_scalar("SELECT record_json FROM lash_processes WHERE process_id = $1")
1741            .bind(process_id)
1742            .fetch_optional(pool)
1743            .await
1744            .map_err(plugin_sqlx_error)?;
1745    json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
1746        .transpose()
1747}
1748
1749async fn save_process_tx(
1750    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1751    record: &ProcessRecord,
1752) -> Result<(), PluginError> {
1753    sqlx::query(
1754        "UPDATE lash_processes
1755         SET updated_at_ms = $2, status = $3, record_json = $4
1756         WHERE process_id = $1",
1757    )
1758    .bind(&record.id)
1759    .bind(record.updated_at_ms as i64)
1760    .bind(process_status_label(record))
1761    .bind(serde_json::to_string(record).map_err(process_decode_error)?)
1762    .execute(&mut **tx)
1763    .await
1764    .map_err(plugin_sqlx_error)?;
1765    Ok(())
1766}
1767
1768async fn load_event_by_key_tx(
1769    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1770    process_id: &str,
1771    replay_key: &str,
1772) -> Result<Option<(String, ProcessEvent)>, PluginError> {
1773    let row = sqlx::query(
1774        "SELECT payload_hash, event_json
1775         FROM lash_process_events
1776         WHERE process_id = $1 AND idempotency_key = $2",
1777    )
1778    .bind(process_id)
1779    .bind(replay_key)
1780    .fetch_optional(&mut **tx)
1781    .await
1782    .map_err(plugin_sqlx_error)?;
1783    row.map(|row| {
1784        let hash: String = row.get(0);
1785        let json: String = row.get(1);
1786        serde_json::from_str(&json)
1787            .map(|event| (hash, event))
1788            .map_err(process_decode_error)
1789    })
1790    .transpose()
1791}
1792
1793async fn load_process_lease_tx(
1794    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1795    process_id: &str,
1796) -> Result<Option<ProcessLease>, PluginError> {
1797    let row = sqlx::query(
1798        "SELECT lease_owner_id, lease_token, lease_fencing_token,
1799                lease_claimed_at_ms, lease_expires_at_ms
1800         FROM lash_process_leases
1801         WHERE process_id = $1",
1802    )
1803    .bind(process_id)
1804    .fetch_optional(&mut **tx)
1805    .await
1806    .map_err(plugin_sqlx_error)?;
1807    let Some(row) = row else {
1808        return Ok(None);
1809    };
1810    let owner_id: Option<String> = row.get(0);
1811    let lease_token: Option<String> = row.get(1);
1812    let (Some(owner_id), Some(lease_token)) = (owner_id, lease_token) else {
1813        return Ok(None);
1814    };
1815    Ok(Some(ProcessLease {
1816        schema_version: PROCESS_LEASE_SCHEMA_VERSION,
1817        process_id: process_id.to_string(),
1818        owner_id,
1819        lease_token,
1820        fencing_token: row.get::<i64, _>(2) as u64,
1821        claimed_at_epoch_ms: row.get::<i64, _>(3) as u64,
1822        expires_at_epoch_ms: row.get::<i64, _>(4) as u64,
1823    }))
1824}
1825
1826fn process_lease_conflict(process_id: &str, current: &ProcessLease) -> PluginError {
1827    PluginError::Session(format!(
1828        "process `{process_id}` is already leased by `{}` until {}",
1829        current.owner_id, current.expires_at_epoch_ms
1830    ))
1831}
1832
1833fn process_lease_expired(process_id: &str) -> PluginError {
1834    PluginError::Session(format!(
1835        "process lease for `{process_id}` is missing or expired"
1836    ))
1837}
1838
1839fn guard_lease(current: Option<&ProcessLease>, lease_token: &str, now: u64) -> bool {
1840    current
1841        .map(|current| current.lease_token == lease_token && current.expires_at_epoch_ms > now)
1842        .unwrap_or(false)
1843}
1844
1845async fn list_grants_for_scope(
1846    pool: &PgPool,
1847    owner_scope: &ProcessScope,
1848    live_only: bool,
1849) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
1850    let status_clause = if live_only {
1851        "AND p.status = 'running'"
1852    } else {
1853        ""
1854    };
1855    let sql = format!(
1856        "SELECT g.process_id, g.descriptor_json, p.record_json
1857         FROM lash_process_handle_grants g
1858         JOIN lash_processes p ON p.process_id = g.process_id
1859         WHERE g.scope_id = $1 {status_clause}
1860         ORDER BY g.process_id ASC"
1861    );
1862    let rows = sqlx::query(&sql)
1863        .bind(owner_scope.id().as_str())
1864        .fetch_all(pool)
1865        .await
1866        .map_err(plugin_sqlx_error)?;
1867    let mut entries = Vec::new();
1868    for row in rows {
1869        let process_id: String = row.get(0);
1870        let descriptor_json: String = row.get(1);
1871        let record_json: String = row.get(2);
1872        let descriptor: ProcessHandleDescriptor =
1873            serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
1874        let record: ProcessRecord =
1875            serde_json::from_str(&record_json).map_err(process_decode_error)?;
1876        entries.push((
1877            ProcessHandleGrant {
1878                session_id: owner_scope.session_id.clone(),
1879                process_id,
1880                descriptor,
1881            },
1882            record,
1883        ));
1884    }
1885    Ok(entries)
1886}
1887
1888#[async_trait::async_trait]
1889impl ProcessRegistry for PostgresProcessRegistry {
1890    fn durability_tier(&self) -> DurabilityTier {
1891        DurabilityTier::Durable
1892    }
1893
1894    async fn register_process(
1895        &self,
1896        registration: ProcessRegistration,
1897    ) -> Result<ProcessRecord, PluginError> {
1898        let (registration, registration_hash) =
1899            lash_core::runtime::prepare_process_registration(registration)?;
1900        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
1901        if let Some(existing) = load_process_tx(&mut tx, &registration.id).await? {
1902            if existing.registration_hash == registration_hash {
1903                tx.commit().await.map_err(plugin_sqlx_error)?;
1904                return Ok(existing);
1905            }
1906            return Err(PluginError::Session(format!(
1907                "process `{}` registration hash conflict: existing {}, new {}",
1908                registration.id, existing.registration_hash, registration_hash
1909            )));
1910        }
1911        let now = current_epoch_ms();
1912        let record =
1913            ProcessRecord::from_prepared_registration(registration, registration_hash, now);
1914        let record_json = serde_json::to_string(&record).map_err(process_decode_error)?;
1915        sqlx::query(
1916            "INSERT INTO lash_processes (
1917                process_id, registration_hash, owner_scope_id, host_profile_id,
1918                created_at_ms, updated_at_ms, status, record_json
1919             )
1920             VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
1921        )
1922        .bind(&record.id)
1923        .bind(&record.registration_hash)
1924        .bind(record.owner_scope_id().as_str())
1925        .bind(record.host_profile_id())
1926        .bind(record.created_at_ms as i64)
1927        .bind(record.updated_at_ms as i64)
1928        .bind(process_status_label(&record))
1929        .bind(record_json)
1930        .execute(&mut *tx)
1931        .await
1932        .map_err(plugin_sqlx_error)?;
1933        tx.commit().await.map_err(plugin_sqlx_error)?;
1934        Ok(record)
1935    }
1936
1937    async fn set_external_ref(
1938        &self,
1939        process_id: &str,
1940        external_ref: ProcessExternalRef,
1941    ) -> Result<ProcessRecord, PluginError> {
1942        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
1943        let mut record = load_process_tx(&mut tx, process_id)
1944            .await?
1945            .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
1946        record.external_ref = Some(external_ref);
1947        record.updated_at_ms = current_epoch_ms();
1948        save_process_tx(&mut tx, &record).await?;
1949        tx.commit().await.map_err(plugin_sqlx_error)?;
1950        Ok(record)
1951    }
1952
1953    async fn grant_handle(
1954        &self,
1955        owner_scope: &ProcessScope,
1956        process_id: &str,
1957        descriptor: ProcessHandleDescriptor,
1958    ) -> Result<ProcessHandleGrant, PluginError> {
1959        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
1960        if load_process_tx(&mut tx, process_id).await?.is_none() {
1961            return Err(PluginError::Session(format!(
1962                "unknown process `{process_id}`"
1963            )));
1964        }
1965        sqlx::query(
1966            "INSERT INTO lash_process_handle_grants (session_id, scope_id, process_id, descriptor_json)
1967             VALUES ($1, $2, $3, $4)
1968             ON CONFLICT (scope_id, process_id) DO UPDATE SET
1969                session_id = EXCLUDED.session_id,
1970                descriptor_json = EXCLUDED.descriptor_json",
1971        )
1972        .bind(&owner_scope.session_id)
1973        .bind(owner_scope.id().as_str())
1974        .bind(process_id)
1975        .bind(serde_json::to_string(&descriptor).map_err(process_decode_error)?)
1976        .execute(&mut *tx)
1977        .await
1978        .map_err(plugin_sqlx_error)?;
1979        tx.commit().await.map_err(plugin_sqlx_error)?;
1980        Ok(ProcessHandleGrant {
1981            session_id: owner_scope.session_id.clone(),
1982            process_id: process_id.to_string(),
1983            descriptor,
1984        })
1985    }
1986
1987    async fn revoke_handle(
1988        &self,
1989        owner_scope: &ProcessScope,
1990        process_id: &str,
1991    ) -> Result<(), PluginError> {
1992        sqlx::query(
1993            "DELETE FROM lash_process_handle_grants WHERE scope_id = $1 AND process_id = $2",
1994        )
1995        .bind(owner_scope.id().as_str())
1996        .bind(process_id)
1997        .execute(&self.pool)
1998        .await
1999        .map_err(plugin_sqlx_error)?;
2000        Ok(())
2001    }
2002
2003    async fn transfer_handle_grants(
2004        &self,
2005        from_scope: &ProcessScope,
2006        to_scope: &ProcessScope,
2007        process_ids: &[String],
2008    ) -> Result<(), PluginError> {
2009        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2010        for process_id in process_ids {
2011            let descriptor_json: Option<String> = sqlx::query_scalar(
2012                "SELECT descriptor_json FROM lash_process_handle_grants
2013                 WHERE scope_id = $1 AND process_id = $2",
2014            )
2015            .bind(from_scope.id().as_str())
2016            .bind(process_id)
2017            .fetch_optional(&mut *tx)
2018            .await
2019            .map_err(plugin_sqlx_error)?;
2020            let Some(descriptor_json) = descriptor_json else {
2021                return Err(PluginError::Session(format!(
2022                    "process handle `{process_id}` is not granted to session `{}`",
2023                    from_scope.session_id
2024                )));
2025            };
2026            sqlx::query(
2027                "DELETE FROM lash_process_handle_grants WHERE scope_id = $1 AND process_id = $2",
2028            )
2029            .bind(from_scope.id().as_str())
2030            .bind(process_id)
2031            .execute(&mut *tx)
2032            .await
2033            .map_err(plugin_sqlx_error)?;
2034            sqlx::query(
2035                "INSERT INTO lash_process_handle_grants (session_id, scope_id, process_id, descriptor_json)
2036                 VALUES ($1, $2, $3, $4)
2037                 ON CONFLICT (scope_id, process_id) DO UPDATE SET
2038                    session_id = EXCLUDED.session_id,
2039                    descriptor_json = EXCLUDED.descriptor_json",
2040            )
2041            .bind(&to_scope.session_id)
2042            .bind(to_scope.id().as_str())
2043            .bind(process_id)
2044            .bind(descriptor_json)
2045            .execute(&mut *tx)
2046            .await
2047            .map_err(plugin_sqlx_error)?;
2048        }
2049        tx.commit().await.map_err(plugin_sqlx_error)
2050    }
2051
2052    async fn list_handle_grants(
2053        &self,
2054        owner_scope: &ProcessScope,
2055    ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
2056        list_grants_for_scope(&self.pool, owner_scope, false).await
2057    }
2058
2059    async fn list_live_handle_grants(
2060        &self,
2061        owner_scope: &ProcessScope,
2062    ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
2063        list_grants_for_scope(&self.pool, owner_scope, true).await
2064    }
2065
2066    async fn has_handle_grant(
2067        &self,
2068        owner_scope: &ProcessScope,
2069        process_id: &str,
2070    ) -> Result<bool, PluginError> {
2071        let exists: Option<i64> = sqlx::query_scalar(
2072            "SELECT 1::BIGINT FROM lash_process_handle_grants
2073             WHERE scope_id = $1 AND process_id = $2
2074             LIMIT 1",
2075        )
2076        .bind(owner_scope.id().as_str())
2077        .bind(process_id)
2078        .fetch_optional(&self.pool)
2079        .await
2080        .map_err(plugin_sqlx_error)?;
2081        Ok(exists.is_some())
2082    }
2083
2084    async fn handle_grants_for_process(
2085        &self,
2086        process_id: &str,
2087    ) -> Result<Vec<ProcessHandleGrant>, PluginError> {
2088        if load_process(&self.pool, process_id).await?.is_none() {
2089            return Err(PluginError::Session(format!(
2090                "unknown process `{process_id}`"
2091            )));
2092        }
2093        let rows = sqlx::query(
2094            "SELECT session_id, descriptor_json
2095             FROM lash_process_handle_grants
2096             WHERE process_id = $1
2097             ORDER BY session_id ASC, scope_id ASC",
2098        )
2099        .bind(process_id)
2100        .fetch_all(&self.pool)
2101        .await
2102        .map_err(plugin_sqlx_error)?;
2103        let mut grants = Vec::new();
2104        for row in rows {
2105            let descriptor_json: String = row.get(1);
2106            grants.push(ProcessHandleGrant {
2107                session_id: row.get(0),
2108                process_id: process_id.to_string(),
2109                descriptor: serde_json::from_str(&descriptor_json).map_err(process_decode_error)?,
2110            });
2111        }
2112        Ok(grants)
2113    }
2114
2115    async fn delete_session_process_state(
2116        &self,
2117        session_id: &str,
2118    ) -> Result<lash_core::ProcessSessionDeleteReport, PluginError> {
2119        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2120        let rows = sqlx::query(
2121            "SELECT g.process_id, p.record_json
2122             FROM lash_process_handle_grants g
2123             JOIN lash_processes p ON p.process_id = g.process_id
2124             WHERE g.session_id = $1
2125             ORDER BY g.process_id ASC",
2126        )
2127        .bind(session_id)
2128        .fetch_all(&mut *tx)
2129        .await
2130        .map_err(plugin_sqlx_error)?;
2131        let mut removed = Vec::new();
2132        for row in rows {
2133            let process_id: String = row.get(0);
2134            let record_json: String = row.get(1);
2135            let record: ProcessRecord =
2136                serde_json::from_str(&record_json).map_err(process_decode_error)?;
2137            removed.push((process_id, record));
2138        }
2139        let revoked = sqlx::query("DELETE FROM lash_process_handle_grants WHERE session_id = $1")
2140            .bind(session_id)
2141            .execute(&mut *tx)
2142            .await
2143            .map_err(plugin_sqlx_error)?
2144            .rows_affected() as usize;
2145        let mut cancel_process_ids = Vec::new();
2146        let mut preserved_process_ids = Vec::new();
2147        for (process_id, record) in removed {
2148            if record.is_terminal() {
2149                continue;
2150            }
2151            let remaining: i64 = sqlx::query_scalar(
2152                "SELECT COUNT(*) FROM lash_process_handle_grants WHERE process_id = $1",
2153            )
2154            .bind(&process_id)
2155            .fetch_one(&mut *tx)
2156            .await
2157            .map_err(plugin_sqlx_error)?;
2158            if remaining == 0 {
2159                cancel_process_ids.push(process_id);
2160            } else {
2161                preserved_process_ids.push(process_id);
2162            }
2163        }
2164        tx.commit().await.map_err(plugin_sqlx_error)?;
2165        cancel_process_ids.sort();
2166        cancel_process_ids.dedup();
2167        preserved_process_ids.sort();
2168        preserved_process_ids.dedup();
2169        Ok(lash_core::ProcessSessionDeleteReport {
2170            session_id: session_id.to_string(),
2171            revoked_handle_count: revoked,
2172            deleted_wake_count: 0,
2173            cancel_process_ids,
2174            preserved_process_ids,
2175        })
2176    }
2177
2178    async fn append_event(
2179        &self,
2180        process_id: &str,
2181        request: ProcessEventAppendRequest,
2182    ) -> Result<ProcessEventAppendResult, PluginError> {
2183        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2184        let mut record = load_process_tx(&mut tx, process_id)
2185            .await?
2186            .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
2187        let replay_lookup =
2188            if let Some(replay_key) = request.replay.as_ref().map(|r| r.key.as_str()) {
2189                load_event_by_key_tx(&mut tx, process_id, replay_key).await?
2190            } else {
2191                None
2192            };
2193        let sequence: i64 = sqlx::query_scalar(
2194            "SELECT COALESCE(MAX(sequence), 0) + 1 FROM lash_process_events WHERE process_id = $1",
2195        )
2196        .bind(process_id)
2197        .fetch_one(&mut *tx)
2198        .await
2199        .map_err(plugin_sqlx_error)?;
2200        let occurred_at_ms = current_epoch_ms();
2201        let prepared = lash_core::runtime::prepare_process_event_append(
2202            &record,
2203            request,
2204            sequence as u64,
2205            replay_lookup,
2206            occurred_at_ms,
2207        )?;
2208        if prepared.replayed {
2209            let repaired = if let Some(status) = prepared.status_update.clone() {
2210                record.status = status;
2211                record.updated_at_ms = prepared.occurred_at_ms;
2212                save_process_tx(&mut tx, &record).await?;
2213                true
2214            } else {
2215                false
2216            };
2217            tx.commit().await.map_err(plugin_sqlx_error)?;
2218            if repaired {
2219                self.notify.notify_waiters();
2220            }
2221            return Ok(ProcessEventAppendResult {
2222                event: prepared.event,
2223                wake_delivery: prepared.wake_delivery,
2224            });
2225        }
2226        let event = prepared.event;
2227        sqlx::query(
2228            "INSERT INTO lash_process_events (
2229                process_id, sequence, event_type, payload_hash, idempotency_key,
2230                occurred_at_ms, event_json
2231             )
2232             VALUES ($1, $2, $3, $4, $5, $6, $7)",
2233        )
2234        .bind(process_id)
2235        .bind(sequence)
2236        .bind(event.event_type.as_str())
2237        .bind(&prepared.payload_hash)
2238        .bind(event.invocation.replay_key())
2239        .bind(prepared.occurred_at_ms as i64)
2240        .bind(serde_json::to_string(&event).map_err(process_decode_error)?)
2241        .execute(&mut *tx)
2242        .await
2243        .map_err(plugin_sqlx_error)?;
2244        if let Some(status) = prepared.status_update.clone() {
2245            record.status = status;
2246        }
2247        record.updated_at_ms = prepared.occurred_at_ms;
2248        save_process_tx(&mut tx, &record).await?;
2249        tx.commit().await.map_err(plugin_sqlx_error)?;
2250        self.notify.notify_waiters();
2251        Ok(ProcessEventAppendResult {
2252            event,
2253            wake_delivery: prepared.wake_delivery,
2254        })
2255    }
2256
2257    async fn events_after(
2258        &self,
2259        process_id: &str,
2260        after_sequence: u64,
2261    ) -> Result<Vec<ProcessEvent>, PluginError> {
2262        if load_process(&self.pool, process_id).await?.is_none() {
2263            return Err(PluginError::Session(format!(
2264                "unknown process `{process_id}`"
2265            )));
2266        }
2267        let rows = sqlx::query(
2268            "SELECT event_json FROM lash_process_events
2269             WHERE process_id = $1 AND sequence > $2
2270             ORDER BY sequence ASC",
2271        )
2272        .bind(process_id)
2273        .bind(after_sequence as i64)
2274        .fetch_all(&self.pool)
2275        .await
2276        .map_err(plugin_sqlx_error)?;
2277        let mut events = Vec::new();
2278        for row in rows {
2279            let json: String = row.get(0);
2280            events.push(serde_json::from_str(&json).map_err(process_decode_error)?);
2281        }
2282        Ok(events)
2283    }
2284
2285    async fn wake_events_after(
2286        &self,
2287        process_id: &str,
2288        after_sequence: u64,
2289    ) -> Result<Vec<ProcessEvent>, PluginError> {
2290        let rows = sqlx::query("SELECT sequence FROM lash_process_wake_acks WHERE process_id = $1")
2291            .bind(process_id)
2292            .fetch_all(&self.pool)
2293            .await
2294            .map_err(plugin_sqlx_error)?;
2295        let acked = rows
2296            .into_iter()
2297            .map(|row| row.get::<i64, _>(0) as u64)
2298            .collect::<HashSet<_>>();
2299        Ok(self
2300            .events_after(process_id, after_sequence)
2301            .await?
2302            .into_iter()
2303            .filter(|event| event.semantics.wake.is_some() && !acked.contains(&event.sequence))
2304            .collect())
2305    }
2306
2307    async fn wait_event_after(
2308        &self,
2309        process_id: &str,
2310        event_type: &str,
2311        after_sequence: u64,
2312    ) -> Result<ProcessEvent, PluginError> {
2313        loop {
2314            if let Some(event) = self
2315                .events_after(process_id, after_sequence)
2316                .await?
2317                .into_iter()
2318                .find(|event| event.event_type == event_type)
2319            {
2320                return Ok(event);
2321            }
2322            tokio::select! {
2323                _ = self.notify.notified() => {}
2324                _ = tokio::time::sleep(Duration::from_millis(50)) => {}
2325            }
2326        }
2327    }
2328
2329    async fn await_process(&self, process_id: &str) -> Result<ProcessAwaitOutput, PluginError> {
2330        loop {
2331            let record = load_process(&self.pool, process_id)
2332                .await?
2333                .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
2334            if let Some(await_output) = record.status.await_output() {
2335                return Ok(await_output.clone());
2336            }
2337            tokio::select! {
2338                _ = self.notify.notified() => {}
2339                _ = tokio::time::sleep(Duration::from_millis(50)) => {}
2340            }
2341        }
2342    }
2343
2344    async fn complete_process(
2345        &self,
2346        process_id: &str,
2347        await_output: ProcessAwaitOutput,
2348    ) -> Result<ProcessRecord, PluginError> {
2349        let event_type = match await_output.terminal_state() {
2350            lash_core::ProcessTerminalState::Completed => "process.completed",
2351            lash_core::ProcessTerminalState::Failed => "process.failed",
2352            lash_core::ProcessTerminalState::Cancelled => "process.cancelled",
2353        };
2354        self.append_event(
2355            process_id,
2356            ProcessEventAppendRequest::new(
2357                event_type,
2358                serde_json::json!({ "await_output": await_output }),
2359            )
2360            .with_replay_key(format!("process:{process_id}:terminal:{event_type}")),
2361        )
2362        .await?;
2363        load_process(&self.pool, process_id).await?.ok_or_else(|| {
2364            PluginError::Session(format!(
2365                "unknown process `{process_id}` after terminal event"
2366            ))
2367        })
2368    }
2369
2370    async fn get_process(&self, process_id: &str) -> Option<ProcessRecord> {
2371        load_process(&self.pool, process_id).await.ok().flatten()
2372    }
2373
2374    async fn ack_wake(&self, process_id: &str, sequence: u64) -> Result<(), PluginError> {
2375        if load_process(&self.pool, process_id).await?.is_none() {
2376            return Err(PluginError::Session(format!(
2377                "unknown process `{process_id}`"
2378            )));
2379        }
2380        sqlx::query(
2381            "INSERT INTO lash_process_wake_acks (process_id, sequence)
2382             VALUES ($1, $2)
2383             ON CONFLICT DO NOTHING",
2384        )
2385        .bind(process_id)
2386        .bind(sequence as i64)
2387        .execute(&self.pool)
2388        .await
2389        .map_err(plugin_sqlx_error)?;
2390        Ok(())
2391    }
2392
2393    async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, PluginError> {
2394        let rows = sqlx::query(
2395            "SELECT record_json FROM lash_processes
2396             WHERE status = 'running'
2397             ORDER BY process_id ASC",
2398        )
2399        .fetch_all(&self.pool)
2400        .await
2401        .map_err(plugin_sqlx_error)?;
2402        let mut records = Vec::new();
2403        for row in rows {
2404            let json: String = row.get(0);
2405            records.push(serde_json::from_str(&json).map_err(process_decode_error)?);
2406        }
2407        Ok(records)
2408    }
2409
2410    async fn claim_process_lease(
2411        &self,
2412        process_id: &str,
2413        owner_id: &str,
2414        lease_ttl_ms: u64,
2415    ) -> Result<ProcessLease, PluginError> {
2416        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2417        if load_process_tx(&mut tx, process_id).await?.is_none() {
2418            return Err(PluginError::Session(format!(
2419                "unknown process `{process_id}`"
2420            )));
2421        }
2422        let now = current_epoch_ms();
2423        let current = load_process_lease_tx(&mut tx, process_id).await?;
2424        if let Some(current) = current.as_ref()
2425            && current.expires_at_epoch_ms > now
2426            && current.owner_id != owner_id
2427        {
2428            return Err(process_lease_conflict(process_id, current));
2429        }
2430        let existing_fence: Option<i64> = sqlx::query_scalar(
2431            "SELECT lease_fencing_token FROM lash_process_leases WHERE process_id = $1 FOR UPDATE",
2432        )
2433        .bind(process_id)
2434        .fetch_optional(&mut *tx)
2435        .await
2436        .map_err(plugin_sqlx_error)?;
2437        let fencing_token = existing_fence.unwrap_or(0) as u64 + 1;
2438        let lease = ProcessLease {
2439            schema_version: PROCESS_LEASE_SCHEMA_VERSION,
2440            process_id: process_id.to_string(),
2441            owner_id: owner_id.to_string(),
2442            lease_token: format!(
2443                "{:x}",
2444                Sha256::digest(format!("{process_id}:{owner_id}:{now}:{fencing_token}").as_bytes())
2445            ),
2446            fencing_token,
2447            claimed_at_epoch_ms: now,
2448            expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
2449        };
2450        sqlx::query(
2451            "INSERT INTO lash_process_leases (
2452                process_id, lease_owner_id, lease_token, lease_fencing_token,
2453                lease_claimed_at_ms, lease_expires_at_ms
2454             )
2455             VALUES ($1, $2, $3, $4, $5, $6)
2456             ON CONFLICT (process_id) DO UPDATE SET
2457                lease_owner_id = EXCLUDED.lease_owner_id,
2458                lease_token = EXCLUDED.lease_token,
2459                lease_fencing_token = EXCLUDED.lease_fencing_token,
2460                lease_claimed_at_ms = EXCLUDED.lease_claimed_at_ms,
2461                lease_expires_at_ms = EXCLUDED.lease_expires_at_ms",
2462        )
2463        .bind(&lease.process_id)
2464        .bind(&lease.owner_id)
2465        .bind(&lease.lease_token)
2466        .bind(lease.fencing_token as i64)
2467        .bind(lease.claimed_at_epoch_ms as i64)
2468        .bind(lease.expires_at_epoch_ms as i64)
2469        .execute(&mut *tx)
2470        .await
2471        .map_err(plugin_sqlx_error)?;
2472        tx.commit().await.map_err(plugin_sqlx_error)?;
2473        Ok(lease)
2474    }
2475
2476    async fn renew_process_lease(
2477        &self,
2478        lease: &ProcessLease,
2479        lease_ttl_ms: u64,
2480    ) -> Result<ProcessLease, PluginError> {
2481        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2482        let now = current_epoch_ms();
2483        let current = load_process_lease_tx(&mut tx, &lease.process_id).await?;
2484        if !guard_lease(current.as_ref(), &lease.lease_token, now) {
2485            return Err(process_lease_expired(&lease.process_id));
2486        }
2487        let renewed = ProcessLease {
2488            expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
2489            ..lease.clone()
2490        };
2491        sqlx::query(
2492            "UPDATE lash_process_leases
2493             SET lease_expires_at_ms = $2
2494             WHERE process_id = $1 AND lease_token = $3",
2495        )
2496        .bind(&renewed.process_id)
2497        .bind(renewed.expires_at_epoch_ms as i64)
2498        .bind(&renewed.lease_token)
2499        .execute(&mut *tx)
2500        .await
2501        .map_err(plugin_sqlx_error)?;
2502        tx.commit().await.map_err(plugin_sqlx_error)?;
2503        Ok(renewed)
2504    }
2505
2506    async fn complete_process_lease(
2507        &self,
2508        completion: &ProcessLeaseCompletion,
2509    ) -> Result<(), PluginError> {
2510        sqlx::query(
2511            "UPDATE lash_process_leases
2512             SET lease_owner_id = NULL,
2513                 lease_token = NULL,
2514                 lease_claimed_at_ms = 0,
2515                 lease_expires_at_ms = 0
2516             WHERE process_id = $1 AND lease_token = $2",
2517        )
2518        .bind(&completion.process_id)
2519        .bind(&completion.lease_token)
2520        .execute(&self.pool)
2521        .await
2522        .map_err(plugin_sqlx_error)?;
2523        Ok(())
2524    }
2525}
2526
2527#[async_trait::async_trait]
2528impl HostEventStore for PostgresHostEventStore {
2529    fn durability_tier(&self) -> DurabilityTier {
2530        DurabilityTier::Durable
2531    }
2532
2533    async fn register_subscription(
2534        &self,
2535        draft: TriggerSubscriptionDraft,
2536    ) -> Result<TriggerSubscriptionRecord, PluginError> {
2537        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2538        let seq: i64 = sqlx::query_scalar("SELECT nextval('lash_host_event_subscription_seq')")
2539            .fetch_one(&mut *tx)
2540            .await
2541            .map_err(plugin_sqlx_error)?;
2542        let handle = format!("trigger:{seq}");
2543        let subscription_id = format!("subscription:{seq}");
2544        let now = current_epoch_ms();
2545        let record = TriggerSubscriptionRecord {
2546            subscription_id: subscription_id.clone(),
2547            session_id: draft.session_id,
2548            handle,
2549            name: draft.name,
2550            source_type: draft.source_type,
2551            source_key: draft.source_key,
2552            source: draft.source,
2553            event_ty: draft.event_ty,
2554            module_ref: draft.module_ref,
2555            required_surface_ref: draft.required_surface_ref,
2556            process_ref: draft.process_ref,
2557            process_name: draft.process_name,
2558            input_template: draft.input_template,
2559            enabled: true,
2560            created_at_ms: now,
2561            updated_at_ms: now,
2562        };
2563        sqlx::query(
2564            "INSERT INTO lash_host_event_trigger_subscriptions (
2565                subscription_id, session_id, handle, source_type, source_key,
2566                enabled, created_at_ms, updated_at_ms, record_json
2567             )
2568             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
2569        )
2570        .bind(&record.subscription_id)
2571        .bind(&record.session_id)
2572        .bind(&record.handle)
2573        .bind(&record.source_type)
2574        .bind(&record.source_key)
2575        .bind(record.enabled)
2576        .bind(record.created_at_ms as i64)
2577        .bind(record.updated_at_ms as i64)
2578        .bind(serde_json::to_string(&record).map_err(process_decode_error)?)
2579        .execute(&mut *tx)
2580        .await
2581        .map_err(plugin_sqlx_error)?;
2582        tx.commit().await.map_err(plugin_sqlx_error)?;
2583        Ok(record)
2584    }
2585
2586    async fn list_subscriptions(
2587        &self,
2588        filter: TriggerSubscriptionFilter,
2589    ) -> Result<Vec<TriggerSubscriptionRecord>, PluginError> {
2590        let rows = sqlx::query(
2591            "SELECT record_json FROM lash_host_event_trigger_subscriptions
2592             ORDER BY session_id ASC, handle ASC",
2593        )
2594        .fetch_all(&self.pool)
2595        .await
2596        .map_err(plugin_sqlx_error)?;
2597        let mut records = Vec::new();
2598        for row in rows {
2599            let json: String = row.get(0);
2600            let record: TriggerSubscriptionRecord =
2601                serde_json::from_str(&json).map_err(process_decode_error)?;
2602            if filter.matches(&record) {
2603                records.push(record);
2604            }
2605        }
2606        Ok(records)
2607    }
2608
2609    async fn cancel_subscription(
2610        &self,
2611        session_id: &str,
2612        handle: &str,
2613    ) -> Result<bool, PluginError> {
2614        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2615        let json: Option<String> = sqlx::query_scalar(
2616            "SELECT record_json FROM lash_host_event_trigger_subscriptions
2617             WHERE session_id = $1 AND handle = $2
2618             FOR UPDATE",
2619        )
2620        .bind(session_id)
2621        .bind(handle)
2622        .fetch_optional(&mut *tx)
2623        .await
2624        .map_err(plugin_sqlx_error)?;
2625        let Some(json) = json else {
2626            tx.commit().await.map_err(plugin_sqlx_error)?;
2627            return Ok(false);
2628        };
2629        let mut record: TriggerSubscriptionRecord =
2630            serde_json::from_str(&json).map_err(process_decode_error)?;
2631        let changed = record.enabled;
2632        record.enabled = false;
2633        record.updated_at_ms = current_epoch_ms();
2634        sqlx::query(
2635            "UPDATE lash_host_event_trigger_subscriptions
2636             SET enabled = $3, updated_at_ms = $4, record_json = $5
2637             WHERE session_id = $1 AND handle = $2",
2638        )
2639        .bind(session_id)
2640        .bind(handle)
2641        .bind(record.enabled)
2642        .bind(record.updated_at_ms as i64)
2643        .bind(serde_json::to_string(&record).map_err(process_decode_error)?)
2644        .execute(&mut *tx)
2645        .await
2646        .map_err(plugin_sqlx_error)?;
2647        tx.commit().await.map_err(plugin_sqlx_error)?;
2648        Ok(changed)
2649    }
2650
2651    async fn record_occurrence(
2652        &self,
2653        request: HostEventOccurrenceRequest,
2654    ) -> Result<HostEventOccurrenceRecord, PluginError> {
2655        lash_core::validate_host_event_occurrence_request(&request)?;
2656        let request_hash = lash_core::host_event_occurrence_request_hash(&request)?;
2657        let occurrence_id = lash_core::deterministic_occurrence_id(&request)?;
2658        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2659        let existing = sqlx::query(
2660            "SELECT request_hash, record_json
2661             FROM lash_host_event_occurrences
2662             WHERE idempotency_key = $1
2663             FOR UPDATE",
2664        )
2665        .bind(&request.idempotency_key)
2666        .fetch_optional(&mut *tx)
2667        .await
2668        .map_err(plugin_sqlx_error)?;
2669        if let Some(row) = existing {
2670            let existing_hash: String = row.get(0);
2671            let existing_json: String = row.get(1);
2672            if existing_hash != request_hash {
2673                return Err(PluginError::Session(format!(
2674                    "host event occurrence idempotency conflict for `{}`",
2675                    request.idempotency_key
2676                )));
2677            }
2678            let record = serde_json::from_str(&existing_json).map_err(process_decode_error)?;
2679            tx.commit().await.map_err(plugin_sqlx_error)?;
2680            return Ok(record);
2681        }
2682        let record = HostEventOccurrenceRecord {
2683            occurrence_id: occurrence_id.clone(),
2684            source_type: request.source_type,
2685            source_key: request.source_key,
2686            payload: request.payload,
2687            idempotency_key: request.idempotency_key.clone(),
2688            source: request.source,
2689            occurred_at_ms: current_epoch_ms(),
2690        };
2691        sqlx::query(
2692            "INSERT INTO lash_host_event_occurrences (
2693                occurrence_id, idempotency_key, request_hash, source_type, source_key,
2694                occurred_at_ms, record_json
2695             )
2696             VALUES ($1, $2, $3, $4, $5, $6, $7)",
2697        )
2698        .bind(&record.occurrence_id)
2699        .bind(&record.idempotency_key)
2700        .bind(&request_hash)
2701        .bind(&record.source_type)
2702        .bind(&record.source_key)
2703        .bind(record.occurred_at_ms as i64)
2704        .bind(serde_json::to_string(&record).map_err(process_decode_error)?)
2705        .execute(&mut *tx)
2706        .await
2707        .map_err(plugin_sqlx_error)?;
2708        tx.commit().await.map_err(plugin_sqlx_error)?;
2709        Ok(record)
2710    }
2711
2712    async fn reserve_matching_deliveries(
2713        &self,
2714        occurrence_id: &str,
2715    ) -> Result<Vec<TriggerDeliveryReservation>, PluginError> {
2716        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2717        let occurrence_json: Option<String> = sqlx::query_scalar(
2718            "SELECT record_json FROM lash_host_event_occurrences WHERE occurrence_id = $1",
2719        )
2720        .bind(occurrence_id)
2721        .fetch_optional(&mut *tx)
2722        .await
2723        .map_err(plugin_sqlx_error)?;
2724        let Some(occurrence_json) = occurrence_json else {
2725            return Err(PluginError::Session(format!(
2726                "unknown host event occurrence `{occurrence_id}`"
2727            )));
2728        };
2729        let occurrence: HostEventOccurrenceRecord =
2730            serde_json::from_str(&occurrence_json).map_err(process_decode_error)?;
2731        let rows = sqlx::query(
2732            "SELECT record_json FROM lash_host_event_trigger_subscriptions
2733             WHERE enabled = TRUE AND source_type = $1 AND source_key = $2
2734             ORDER BY session_id ASC, handle ASC",
2735        )
2736        .bind(&occurrence.source_type)
2737        .bind(&occurrence.source_key)
2738        .fetch_all(&mut *tx)
2739        .await
2740        .map_err(plugin_sqlx_error)?;
2741        let mut deliveries = Vec::new();
2742        for row in rows {
2743            let json: String = row.get(0);
2744            let subscription: TriggerSubscriptionRecord =
2745                serde_json::from_str(&json).map_err(process_decode_error)?;
2746            let process_id = lash_core::deterministic_delivery_process_id(
2747                &occurrence.occurrence_id,
2748                &subscription.subscription_id,
2749            )?;
2750            let inserted = sqlx::query(
2751                "INSERT INTO lash_host_event_deliveries (
2752                    occurrence_id, subscription_id, process_id, created_at_ms
2753                 )
2754                 VALUES ($1, $2, $3, $4)
2755                 ON CONFLICT DO NOTHING",
2756            )
2757            .bind(&occurrence.occurrence_id)
2758            .bind(&subscription.subscription_id)
2759            .bind(&process_id)
2760            .bind(current_epoch_ms() as i64)
2761            .execute(&mut *tx)
2762            .await
2763            .map_err(plugin_sqlx_error)?
2764            .rows_affected();
2765            if inserted == 0 {
2766                continue;
2767            }
2768            deliveries.push(TriggerDeliveryReservation {
2769                occurrence: occurrence.clone(),
2770                subscription,
2771                process_id,
2772            });
2773        }
2774        tx.commit().await.map_err(plugin_sqlx_error)?;
2775        Ok(deliveries)
2776    }
2777}
2778
2779#[async_trait::async_trait]
2780impl lashlang::LashlangArtifactStore for PostgresLashlangArtifactStore {
2781    fn durability_tier(&self) -> DurabilityTier {
2782        DurabilityTier::Durable
2783    }
2784
2785    async fn put_module_artifact(
2786        &self,
2787        artifact: &lashlang::ModuleArtifact,
2788    ) -> Result<(), lashlang::ArtifactStoreError> {
2789        let bytes = artifact
2790            .to_store_bytes()
2791            .map_err(lashlang::ArtifactStoreError::from)?;
2792        sqlx::query(
2793            "INSERT INTO lash_lashlang_artifacts (module_ref, artifact_bytes)
2794             VALUES ($1, $2)
2795             ON CONFLICT (module_ref) DO UPDATE SET artifact_bytes = EXCLUDED.artifact_bytes",
2796        )
2797        .bind(artifact.module_ref.as_str())
2798        .bind(bytes)
2799        .execute(&self.pool)
2800        .await
2801        .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?;
2802        Ok(())
2803    }
2804
2805    async fn get_module_artifact(
2806        &self,
2807        module_ref: &lashlang::ModuleRef,
2808    ) -> Result<Option<Arc<lashlang::ModuleArtifact>>, lashlang::ArtifactStoreError> {
2809        let bytes: Option<Vec<u8>> = sqlx::query_scalar(
2810            "SELECT artifact_bytes FROM lash_lashlang_artifacts WHERE module_ref = $1",
2811        )
2812        .bind(module_ref.as_str())
2813        .fetch_optional(&self.pool)
2814        .await
2815        .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?;
2816        bytes
2817            .map(|bytes| {
2818                lashlang::ModuleArtifact::from_store_bytes(&bytes)
2819                    .map(Arc::new)
2820                    .map_err(lashlang::ArtifactStoreError::from)
2821            })
2822            .transpose()
2823    }
2824}