Skip to main content

lash_postgres_store/
lib.rs

1//! PostgreSQL durable storage for Lash.
2//!
3//! One [`PostgresStorage`] owns a shared [`sqlx::PgPool`] and creates durable
4//! implementations for the runtime session store, process registry, host-event
5//! store, Lashlang artifact store, and attachment manifest.
6
7use std::collections::HashSet;
8use std::sync::{Arc, OnceLock};
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11use lash_core::runtime::{
12    ProcessHandleGrantEntry, QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim,
13    QueuedWorkClaimBoundary, QueuedWorkCompletion, QueuedWorkItem,
14};
15use lash_core::store::queued_work::{
16    ClaimCandidate, QueuedWorkClaimLease, claim_scan_limit, derive_batch_id, renewed_claim,
17    select_claim_prefix,
18};
19use lash_core::store::{
20    GraphCommitDelta, HydratedSessionCheckpoint, PersistedSessionRead, RuntimeCommit,
21    RuntimeCommitResult, SessionCheckpoint, SessionHeadMeta,
22};
23use lash_core::{
24    AttachmentId, AttachmentIntent, AttachmentManifest, AttachmentManifestEntry, BlobRef,
25    DeliveryPolicy, DurabilityTier, GcReport, MergeKey, ProcessAwaitOutput, ProcessEvent,
26    ProcessEventAppendRequest, ProcessEventAppendResult, ProcessExternalRef,
27    ProcessHandleDescriptor, ProcessHandleGrant, ProcessLease, ProcessLeaseCompletion,
28    ProcessRecord, ProcessRegistration, ProcessRegistry, RuntimePersistence, SessionMeta,
29    SessionNodeRecord, SessionReadScope, SessionScope, SessionStoreCreateRequest,
30    SessionStoreFactory, SlotPolicy, StoreError, TokenLedgerEntry, VacuumReport,
31};
32use lash_core::{
33    HostEventOccurrenceRecord, HostEventOccurrenceRequest, HostEventStore, PluginError,
34    TriggerDeliveryReservation, TriggerSubscriptionDraft, TriggerSubscriptionFilter,
35    TriggerSubscriptionRecord,
36};
37use sha2::{Digest, Sha256};
38use sqlx::postgres::{PgPool, PgPoolOptions, PgRow};
39use sqlx::{Executor, Row};
40
41const SCHEMA_COMPONENT: &str = "lash-postgres-store";
42const SCHEMA_VERSION: i32 = 1;
43const PROCESS_LEASE_SCHEMA_VERSION: u32 = lash_core::PROCESS_LEASE_SCHEMA_VERSION;
44
45#[derive(Clone)]
46pub struct PostgresStorage {
47    pool: PgPool,
48}
49
50#[derive(Clone)]
51pub struct PostgresSessionStoreFactory {
52    pool: PgPool,
53}
54
55#[derive(Clone)]
56pub struct PostgresSessionStore {
57    pool: PgPool,
58    /// Explicit session binding for handles created via the factory.
59    session_id: Option<String>,
60    /// In-memory bind-on-first-commit for an *unbound* handle. A session-store
61    /// handle commits to exactly one session; an unbound handle latches the first
62    /// session it commits and rejects others (Postgres is multi-session per
63    /// database, so this can't be inferred from a singleton head row the way the
64    /// single-file SQLite store does). Shared across clones via `Arc`.
65    bound_session: Arc<OnceLock<String>>,
66}
67
68#[derive(Clone)]
69pub struct PostgresProcessRegistry {
70    pool: PgPool,
71    notify: Arc<tokio::sync::Notify>,
72}
73
74#[derive(Clone)]
75pub struct PostgresHostEventStore {
76    pool: PgPool,
77}
78
79#[derive(Clone)]
80pub struct PostgresLashlangArtifactStore {
81    pool: PgPool,
82}
83
84/// Connection-pool and per-connection timeout knobs for [`PostgresStorage`].
85///
86/// Session commits use **optimistic CAS** on the head (`UPDATE … WHERE
87/// head_revision = expected`), not a held `SELECT … FOR UPDATE`, so concurrent
88/// writers never pin a pool connection while blocked on a lock. `lock_timeout` is
89/// defense in depth: it caps how long the single CAS write may wait on the head
90/// row's lock before erroring (surfaced as a retryable conflict), so a pathological
91/// burst can never starve the pool.
92#[derive(Clone, Debug)]
93pub struct PostgresStoreConfig {
94    /// Maximum pooled connections. Default 16.
95    pub max_connections: u32,
96    /// Minimum idle connections kept warm. Default 0.
97    pub min_connections: u32,
98    /// How long `acquire` waits for a free connection before erroring. Default 30s.
99    pub acquire_timeout: Duration,
100    /// Close a connection after this idle period. Default 10m.
101    pub idle_timeout: Option<Duration>,
102    /// Recycle a connection after this lifetime. Default 30m.
103    pub max_lifetime: Option<Duration>,
104    /// Postgres `lock_timeout` applied to every connection. Default 10s.
105    pub lock_timeout: Option<Duration>,
106    /// Postgres `statement_timeout` applied to every connection. Default 30s — a
107    /// backstop so a wedged query can never hold a connection indefinitely.
108    pub statement_timeout: Option<Duration>,
109}
110
111impl Default for PostgresStoreConfig {
112    fn default() -> Self {
113        Self {
114            max_connections: 16,
115            min_connections: 0,
116            acquire_timeout: Duration::from_secs(30),
117            idle_timeout: Some(Duration::from_secs(600)),
118            max_lifetime: Some(Duration::from_secs(1800)),
119            lock_timeout: Some(Duration::from_secs(10)),
120            statement_timeout: Some(Duration::from_secs(30)),
121        }
122    }
123}
124
125impl PostgresStorage {
126    /// Connect with [`PostgresStoreConfig::default`] pool/timeout settings.
127    pub async fn connect(database_url: &str) -> Result<Self, StoreError> {
128        Self::connect_with(database_url, PostgresStoreConfig::default()).await
129    }
130
131    /// Connect with explicit pool sizing and per-connection timeouts.
132    pub async fn connect_with(
133        database_url: &str,
134        config: PostgresStoreConfig,
135    ) -> Result<Self, StoreError> {
136        let lock_ms = config.lock_timeout.map(|d| d.as_millis().max(1) as u64);
137        let statement_ms = config
138            .statement_timeout
139            .map(|d| d.as_millis().max(1) as u64);
140        let mut options = PgPoolOptions::new()
141            .max_connections(config.max_connections)
142            .min_connections(config.min_connections)
143            .acquire_timeout(config.acquire_timeout);
144        if let Some(timeout) = config.idle_timeout {
145            options = options.idle_timeout(timeout);
146        }
147        if let Some(timeout) = config.max_lifetime {
148            options = options.max_lifetime(timeout);
149        }
150        let pool = options
151            .after_connect(move |conn, _meta| {
152                Box::pin(async move {
153                    if let Some(ms) = lock_ms {
154                        conn.execute(format!("SET lock_timeout = {ms}").as_str())
155                            .await?;
156                    }
157                    if let Some(ms) = statement_ms {
158                        conn.execute(format!("SET statement_timeout = {ms}").as_str())
159                            .await?;
160                    }
161                    Ok(())
162                })
163            })
164            .connect(database_url)
165            .await
166            .map_err(store_sqlx_error)?;
167        ensure_schema(&pool).await?;
168        Ok(Self { pool })
169    }
170
171    pub fn from_pool(pool: PgPool) -> Self {
172        Self { pool }
173    }
174
175    pub fn pool(&self) -> &PgPool {
176        &self.pool
177    }
178
179    pub fn session_store_factory(&self) -> PostgresSessionStoreFactory {
180        PostgresSessionStoreFactory {
181            pool: self.pool.clone(),
182        }
183    }
184
185    pub fn session_store(&self, session_id: impl Into<String>) -> PostgresSessionStore {
186        PostgresSessionStore {
187            pool: self.pool.clone(),
188            session_id: Some(session_id.into()),
189            bound_session: Arc::new(OnceLock::new()),
190        }
191    }
192
193    pub fn unbound_session_store(&self) -> PostgresSessionStore {
194        PostgresSessionStore {
195            pool: self.pool.clone(),
196            session_id: None,
197            bound_session: Arc::new(OnceLock::new()),
198        }
199    }
200
201    pub fn process_registry(&self) -> PostgresProcessRegistry {
202        PostgresProcessRegistry {
203            pool: self.pool.clone(),
204            notify: Arc::new(tokio::sync::Notify::new()),
205        }
206    }
207
208    pub fn host_event_store(&self) -> PostgresHostEventStore {
209        PostgresHostEventStore {
210            pool: self.pool.clone(),
211        }
212    }
213
214    pub fn lashlang_artifact_store(&self) -> PostgresLashlangArtifactStore {
215        PostgresLashlangArtifactStore {
216            pool: self.pool.clone(),
217        }
218    }
219}
220
221impl PostgresSessionStoreFactory {
222    pub fn new(storage: &PostgresStorage) -> Self {
223        storage.session_store_factory()
224    }
225}
226
227impl PostgresSessionStore {
228    pub fn unbound(storage: &PostgresStorage) -> Self {
229        storage.unbound_session_store()
230    }
231
232    async fn selected_session_id(&self) -> Result<Option<String>, StoreError> {
233        if let Some(session_id) = &self.session_id {
234            return Ok(Some(session_id.clone()));
235        }
236        sqlx::query_scalar("SELECT session_id FROM lash_sessions ORDER BY session_id ASC LIMIT 1")
237            .fetch_optional(&self.pool)
238            .await
239            .map_err(store_sqlx_error)
240    }
241}
242
243async fn ensure_schema(pool: &PgPool) -> Result<(), StoreError> {
244    let mut tx = pool.begin().await.map_err(store_sqlx_error)?;
245    tx.execute("SELECT pg_advisory_xact_lock(715421, 907001)")
246        .await
247        .map_err(store_sqlx_error)?;
248    tx.execute(
249        r#"
250        CREATE TABLE IF NOT EXISTS lash_schema_versions (
251            component TEXT PRIMARY KEY,
252            version INTEGER NOT NULL
253        );
254
255        CREATE TABLE IF NOT EXISTS lash_blobs (
256            hash TEXT PRIMARY KEY,
257            content BYTEA NOT NULL
258        );
259
260        CREATE TABLE IF NOT EXISTS lash_sessions (
261            session_id TEXT PRIMARY KEY,
262            head_revision BIGINT NOT NULL DEFAULT 0,
263            head_json TEXT NOT NULL,
264            checkpoint_ref TEXT
265        );
266
267        CREATE TABLE IF NOT EXISTS lash_graph_nodes (
268            session_id TEXT NOT NULL,
269            seq BIGSERIAL,
270            node_id TEXT NOT NULL,
271            node_json TEXT NOT NULL,
272            tombstoned BOOLEAN NOT NULL DEFAULT FALSE,
273            PRIMARY KEY (session_id, node_id)
274        );
275        CREATE INDEX IF NOT EXISTS idx_lash_graph_nodes_seq
276            ON lash_graph_nodes(session_id, seq);
277
278        CREATE TABLE IF NOT EXISTS lash_usage_deltas (
279            seq BIGSERIAL PRIMARY KEY,
280            session_id TEXT NOT NULL,
281            entry_json TEXT NOT NULL
282        );
283
284        CREATE TABLE IF NOT EXISTS lash_session_meta (
285            session_id TEXT PRIMARY KEY,
286            meta_json TEXT NOT NULL
287        );
288
289        CREATE TABLE IF NOT EXISTS lash_runtime_turn_commits (
290            session_id TEXT NOT NULL,
291            turn_id TEXT NOT NULL,
292            turn_commit_hash TEXT NOT NULL,
293            result_json TEXT NOT NULL,
294            committed_at_ms BIGINT NOT NULL,
295            PRIMARY KEY (session_id, turn_id)
296        );
297
298        CREATE TABLE IF NOT EXISTS lash_queued_work_batches (
299            enqueue_seq BIGSERIAL PRIMARY KEY,
300            batch_id TEXT NOT NULL UNIQUE,
301            session_id TEXT NOT NULL,
302            source_key TEXT,
303            delivery_policy TEXT NOT NULL,
304            slot_policy TEXT NOT NULL,
305            merge_key_json TEXT NOT NULL,
306            available_at_ms BIGINT NOT NULL,
307            enqueued_at_ms BIGINT NOT NULL,
308            claim_id TEXT,
309            claim_owner_id TEXT,
310            claim_token TEXT,
311            claim_fencing_token BIGINT NOT NULL DEFAULT 0,
312            claim_claimed_at_ms BIGINT NOT NULL DEFAULT 0,
313            claim_expires_at_ms BIGINT NOT NULL DEFAULT 0,
314            UNIQUE (session_id, source_key)
315        );
316        CREATE INDEX IF NOT EXISTS idx_lash_queued_work_ready
317            ON lash_queued_work_batches(session_id, available_at_ms, enqueue_seq);
318
319        CREATE TABLE IF NOT EXISTS lash_queued_work_items (
320            batch_id TEXT NOT NULL REFERENCES lash_queued_work_batches(batch_id) ON DELETE CASCADE,
321            item_index INTEGER NOT NULL,
322            item_id TEXT NOT NULL,
323            payload_json TEXT NOT NULL,
324            PRIMARY KEY (batch_id, item_index)
325        );
326
327        CREATE TABLE IF NOT EXISTS lash_attachment_manifest (
328            attachment_id TEXT PRIMARY KEY,
329            session_id TEXT NOT NULL,
330            canonical_uri TEXT NOT NULL,
331            intent_at_ms BIGINT NOT NULL,
332            committed_at_ms BIGINT
333        );
334        CREATE INDEX IF NOT EXISTS idx_lash_attachment_manifest_uncommitted
335            ON lash_attachment_manifest(committed_at_ms)
336            WHERE committed_at_ms IS NULL;
337
338        CREATE TABLE IF NOT EXISTS lash_processes (
339            process_id TEXT PRIMARY KEY,
340            registration_hash TEXT NOT NULL,
341            owner_scope_id TEXT NOT NULL,
342            host_profile_id TEXT NOT NULL,
343            created_at_ms BIGINT NOT NULL,
344            updated_at_ms BIGINT NOT NULL,
345            status TEXT NOT NULL,
346            record_json TEXT NOT NULL
347        );
348        CREATE INDEX IF NOT EXISTS idx_lash_processes_status
349            ON lash_processes(status);
350
351        CREATE TABLE IF NOT EXISTS lash_process_events (
352            process_id TEXT NOT NULL REFERENCES lash_processes(process_id) ON DELETE CASCADE,
353            sequence BIGINT NOT NULL,
354            event_type TEXT NOT NULL,
355            payload_hash TEXT NOT NULL,
356            idempotency_key TEXT,
357            occurred_at_ms BIGINT NOT NULL,
358            event_json TEXT NOT NULL,
359            PRIMARY KEY (process_id, sequence)
360        );
361        CREATE UNIQUE INDEX IF NOT EXISTS idx_lash_process_events_key
362            ON lash_process_events(process_id, idempotency_key)
363            WHERE idempotency_key IS NOT NULL;
364
365        CREATE TABLE IF NOT EXISTS lash_process_wake_acks (
366            process_id TEXT NOT NULL REFERENCES lash_processes(process_id) ON DELETE CASCADE,
367            sequence BIGINT NOT NULL,
368            PRIMARY KEY (process_id, sequence)
369        );
370
371        CREATE TABLE IF NOT EXISTS lash_process_handle_grants (
372            session_id TEXT NOT NULL,
373            scope_id TEXT NOT NULL,
374            process_id TEXT NOT NULL REFERENCES lash_processes(process_id) ON DELETE CASCADE,
375            descriptor_json TEXT NOT NULL,
376            PRIMARY KEY (scope_id, process_id)
377        );
378        CREATE INDEX IF NOT EXISTS idx_lash_process_handle_grants_session
379            ON lash_process_handle_grants(session_id);
380        CREATE INDEX IF NOT EXISTS idx_lash_process_handle_grants_process
381            ON lash_process_handle_grants(process_id);
382
383        CREATE TABLE IF NOT EXISTS lash_process_leases (
384            process_id TEXT PRIMARY KEY REFERENCES lash_processes(process_id) ON DELETE CASCADE,
385            lease_owner_id TEXT,
386            lease_token TEXT,
387            lease_fencing_token BIGINT NOT NULL DEFAULT 0,
388            lease_claimed_at_ms BIGINT NOT NULL DEFAULT 0,
389            lease_expires_at_ms BIGINT NOT NULL DEFAULT 0
390        );
391
392        CREATE SEQUENCE IF NOT EXISTS lash_host_event_subscription_seq;
393        CREATE TABLE IF NOT EXISTS lash_host_event_trigger_subscriptions (
394            subscription_id TEXT PRIMARY KEY,
395            registrant_scope_id TEXT NOT NULL,
396            handle TEXT NOT NULL,
397            source_type TEXT NOT NULL,
398            source_key TEXT NOT NULL,
399            enabled BOOLEAN NOT NULL,
400            created_at_ms BIGINT NOT NULL,
401            updated_at_ms BIGINT NOT NULL,
402            record_json TEXT NOT NULL,
403            UNIQUE(registrant_scope_id, handle)
404        );
405        CREATE INDEX IF NOT EXISTS idx_lash_host_event_subscriptions_registrant
406            ON lash_host_event_trigger_subscriptions(registrant_scope_id, handle);
407        CREATE INDEX IF NOT EXISTS idx_lash_host_event_subscriptions_source
408            ON lash_host_event_trigger_subscriptions(source_type, source_key, enabled);
409
410        CREATE TABLE IF NOT EXISTS lash_host_event_occurrences (
411            occurrence_id TEXT PRIMARY KEY,
412            idempotency_key TEXT NOT NULL UNIQUE,
413            request_hash TEXT NOT NULL,
414            source_type TEXT NOT NULL,
415            source_key TEXT NOT NULL,
416            occurred_at_ms BIGINT NOT NULL,
417            record_json TEXT NOT NULL
418        );
419
420        CREATE TABLE IF NOT EXISTS lash_host_event_deliveries (
421            occurrence_id TEXT NOT NULL REFERENCES lash_host_event_occurrences(occurrence_id) ON DELETE CASCADE,
422            subscription_id TEXT NOT NULL REFERENCES lash_host_event_trigger_subscriptions(subscription_id) ON DELETE CASCADE,
423            process_id TEXT NOT NULL,
424            created_at_ms BIGINT NOT NULL,
425            PRIMARY KEY (occurrence_id, subscription_id)
426        );
427
428        CREATE TABLE IF NOT EXISTS lash_lashlang_artifacts (
429            module_ref TEXT PRIMARY KEY,
430            artifact_bytes BYTEA NOT NULL
431        );
432        "#,
433    )
434    .await
435    .map_err(store_sqlx_error)?;
436
437    let existing: Option<i32> =
438        sqlx::query_scalar("SELECT version FROM lash_schema_versions WHERE component = $1")
439            .bind(SCHEMA_COMPONENT)
440            .fetch_optional(&mut *tx)
441            .await
442            .map_err(store_sqlx_error)?;
443    match existing {
444        Some(version) if version == SCHEMA_VERSION => {}
445        Some(version) => {
446            return Err(StoreError::Backend(format!(
447                "Postgres schema component `{SCHEMA_COMPONENT}` has version {version}, expected {SCHEMA_VERSION}"
448            )));
449        }
450        None => {
451            sqlx::query("INSERT INTO lash_schema_versions (component, version) VALUES ($1, $2)")
452                .bind(SCHEMA_COMPONENT)
453                .bind(SCHEMA_VERSION)
454                .execute(&mut *tx)
455                .await
456                .map_err(store_sqlx_error)?;
457        }
458    }
459    tx.commit().await.map_err(store_sqlx_error)
460}
461
462fn current_epoch_ms() -> u64 {
463    SystemTime::now()
464        .duration_since(UNIX_EPOCH)
465        .unwrap_or_default()
466        .as_millis() as u64
467}
468
469fn current_timestamp_string() -> String {
470    let now = SystemTime::now()
471        .duration_since(UNIX_EPOCH)
472        .unwrap_or_default();
473    format!("unix:{}", now.as_secs())
474}
475
476fn store_sqlx_error(err: sqlx::Error) -> StoreError {
477    StoreError::Backend(err.to_string())
478}
479
480/// Postgres SQLSTATEs that signal transient write contention rather than a hard
481/// failure: serialization failure, deadlock, and lock-acquisition timeout. On the
482/// session head these all mean "a concurrent committer got there first" — i.e. a
483/// revision conflict the caller should reload-and-retry, not a backend error.
484fn is_contention_error(err: &sqlx::Error) -> bool {
485    matches!(
486        err.as_database_error().and_then(|db| db.code()).as_deref(),
487        Some("40001" | "40P01" | "55P03")
488    )
489}
490
491fn plugin_sqlx_error(err: sqlx::Error) -> PluginError {
492    PluginError::Session(err.to_string())
493}
494
495fn process_decode_error(err: serde_json::Error) -> PluginError {
496    PluginError::Session(format!("failed to decode process registry row: {err}"))
497}
498
499fn store_decode_json<T: serde::de::DeserializeOwned>(
500    json: &str,
501    what: &str,
502) -> Result<T, StoreError> {
503    serde_json::from_str(json)
504        .map_err(|err| StoreError::Backend(format!("failed to decode {what}: {err}")))
505}
506
507fn encode_json<T: serde::Serialize>(value: &T) -> String {
508    serde_json::to_string(value).expect("persisted state should serialize")
509}
510
511fn encode_msgpack<T: serde::Serialize>(value: &T) -> Vec<u8> {
512    let mut buf = Vec::with_capacity(1024);
513    rmp_serde::encode::write_named(&mut buf, value).expect("value should serialize");
514    buf
515}
516
517fn decode_msgpack<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Option<T> {
518    rmp_serde::from_slice(bytes).ok()
519}
520
521fn block_on_detached<T: Send + 'static>(
522    future: impl std::future::Future<Output = T> + Send + 'static,
523) -> T {
524    std::thread::spawn(move || {
525        tokio::runtime::Builder::new_current_thread()
526            .enable_all()
527            .build()
528            .expect("postgres manifest runtime")
529            .block_on(future)
530    })
531    .join()
532    .expect("postgres manifest thread")
533}
534
535fn merge_token_ledger_entries(entries: Vec<TokenLedgerEntry>) -> Vec<TokenLedgerEntry> {
536    let mut merged = Vec::<TokenLedgerEntry>::new();
537    for entry in entries {
538        if entry.usage.total() == 0 {
539            continue;
540        }
541        if let Some(existing) = merged
542            .iter_mut()
543            .find(|existing| existing.source == entry.source && existing.model == entry.model)
544        {
545            existing.usage.input_tokens += entry.usage.input_tokens;
546            existing.usage.output_tokens += entry.usage.output_tokens;
547            existing.usage.cached_input_tokens += entry.usage.cached_input_tokens;
548            existing.usage.reasoning_tokens += entry.usage.reasoning_tokens;
549        } else {
550            merged.push(entry);
551        }
552    }
553    merged
554}
555
556#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
557struct SessionCheckpointEnvelope {
558    manifest: SessionCheckpoint,
559    tool_state: Option<lash_core::ToolState>,
560    plugin_snapshot: Option<lash_core::PluginSessionSnapshot>,
561    execution_state: Option<Vec<u8>>,
562}
563
564async fn put_blob_tx(
565    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
566    content: &[u8],
567) -> Result<BlobRef, StoreError> {
568    let hash = format!("{:x}", Sha256::digest(content));
569    sqlx::query(
570        "INSERT INTO lash_blobs (hash, content)
571         VALUES ($1, $2)
572         ON CONFLICT (hash) DO NOTHING",
573    )
574    .bind(&hash)
575    .bind(content)
576    .execute(&mut **tx)
577    .await
578    .map_err(store_sqlx_error)?;
579    Ok(BlobRef(hash))
580}
581
582async fn get_blob(pool: &PgPool, blob_ref: &BlobRef) -> Option<Vec<u8>> {
583    sqlx::query_scalar::<_, Vec<u8>>("SELECT content FROM lash_blobs WHERE hash = $1")
584        .bind(blob_ref.as_str())
585        .fetch_optional(pool)
586        .await
587        .ok()
588        .flatten()
589}
590
591async fn put_checkpoint_tx(
592    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
593    checkpoint: &HydratedSessionCheckpoint,
594) -> Result<(BlobRef, SessionCheckpoint), StoreError> {
595    let manifest = SessionCheckpoint {
596        turn_state: checkpoint.turn_state.clone(),
597        tool_state_ref: checkpoint.tool_state_ref.clone(),
598        plugin_snapshot_ref: checkpoint.plugin_snapshot_ref.clone(),
599        plugin_snapshot_revision: checkpoint.plugin_snapshot_revision,
600        execution_state_ref: checkpoint.execution_state_ref.clone(),
601    };
602    let envelope = SessionCheckpointEnvelope {
603        manifest: manifest.clone(),
604        tool_state: checkpoint.tool_state.clone(),
605        plugin_snapshot: checkpoint.plugin_snapshot.clone(),
606        execution_state: checkpoint.execution_state.clone(),
607    };
608    let bytes = encode_msgpack(&envelope);
609    let checkpoint_ref = put_blob_tx(tx, &bytes).await?;
610    Ok((checkpoint_ref, manifest))
611}
612
613async fn get_checkpoint(pool: &PgPool, blob_ref: &BlobRef) -> Option<HydratedSessionCheckpoint> {
614    let bytes = get_blob(pool, blob_ref).await?;
615    let envelope: SessionCheckpointEnvelope = decode_msgpack(&bytes)?;
616    Some(HydratedSessionCheckpoint {
617        turn_state: envelope.manifest.turn_state,
618        tool_state_ref: envelope.manifest.tool_state_ref,
619        tool_state: envelope.tool_state,
620        plugin_snapshot_ref: envelope.manifest.plugin_snapshot_ref,
621        plugin_snapshot: envelope.plugin_snapshot,
622        plugin_snapshot_revision: envelope.manifest.plugin_snapshot_revision,
623        execution_state_ref: envelope.manifest.execution_state_ref,
624        execution_state: envelope.execution_state,
625    })
626}
627
628async fn load_session_head_meta_tx(
629    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
630    session_id: &str,
631    for_update: bool,
632) -> Result<Option<SessionHeadMeta>, StoreError> {
633    let sql = if for_update {
634        "SELECT head_json, head_revision FROM lash_sessions WHERE session_id = $1 FOR UPDATE"
635    } else {
636        "SELECT head_json, head_revision FROM lash_sessions WHERE session_id = $1"
637    };
638    let row = sqlx::query(sql)
639        .bind(session_id)
640        .fetch_optional(&mut **tx)
641        .await
642        .map_err(store_sqlx_error)?;
643    let Some(row) = row else {
644        return Ok(None);
645    };
646    let head_json: String = row.get(0);
647    let head_revision: i64 = row.get(1);
648    let mut meta: SessionHeadMeta = store_decode_json(&head_json, "session head")?;
649    meta.head_revision = head_revision as u64;
650    Ok(Some(meta))
651}
652
653async fn load_usage_deltas(pool: &PgPool, session_id: &str) -> Vec<TokenLedgerEntry> {
654    let rows = sqlx::query(
655        "SELECT entry_json FROM lash_usage_deltas WHERE session_id = $1 ORDER BY seq ASC",
656    )
657    .bind(session_id)
658    .fetch_all(pool)
659    .await
660    .unwrap_or_default();
661    rows.into_iter()
662        .filter_map(|row| {
663            let json: String = row.get(0);
664            serde_json::from_str(&json).ok()
665        })
666        .collect()
667}
668
669async fn load_graph(
670    pool: &PgPool,
671    session_id: &str,
672    leaf_node_id: Option<String>,
673    active_path: bool,
674) -> Result<lash_core::SessionGraph, StoreError> {
675    let rows = sqlx::query(
676        "SELECT node_json FROM lash_graph_nodes
677         WHERE session_id = $1 AND tombstoned = FALSE
678         ORDER BY seq ASC",
679    )
680    .bind(session_id)
681    .fetch_all(pool)
682    .await
683    .map_err(store_sqlx_error)?;
684    let mut nodes = Vec::<SessionNodeRecord>::new();
685    for row in rows {
686        let json: String = row.get(0);
687        nodes.push(store_decode_json(&json, "session graph node")?);
688    }
689    if active_path && let Some(leaf) = leaf_node_id.clone() {
690        let wanted = active_path_node_ids(&nodes, &leaf);
691        nodes.retain(|node| wanted.contains(&node.node_id));
692    }
693    Ok(lash_core::SessionGraph::from_nodes(nodes, leaf_node_id))
694}
695
696fn active_path_node_ids(nodes: &[SessionNodeRecord], leaf_node_id: &str) -> HashSet<String> {
697    let mut parent_by_id = std::collections::BTreeMap::new();
698    for node in nodes {
699        parent_by_id.insert(node.node_id.clone(), node.parent_node_id.clone());
700    }
701    let mut wanted = HashSet::new();
702    let mut cursor = Some(leaf_node_id.to_string());
703    while let Some(node_id) = cursor {
704        if !wanted.insert(node_id.clone()) {
705            break;
706        }
707        cursor = parent_by_id.get(&node_id).cloned().flatten();
708    }
709    wanted
710}
711
712async fn commit_attachment_refs_tx(
713    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
714    session_id: &str,
715    attachment_ids: &[AttachmentId],
716) -> Result<(), StoreError> {
717    if attachment_ids.is_empty() {
718        return Ok(());
719    }
720    let now = current_epoch_ms() as i64;
721    for id in attachment_ids {
722        sqlx::query(
723            "UPDATE lash_attachment_manifest
724             SET committed_at_ms = COALESCE(committed_at_ms, $1)
725             WHERE attachment_id = $2 AND session_id = $3",
726        )
727        .bind(now)
728        .bind(id.as_str())
729        .bind(session_id)
730        .execute(&mut **tx)
731        .await
732        .map_err(store_sqlx_error)?;
733    }
734    Ok(())
735}
736
737impl AttachmentManifest for PostgresSessionStore {
738    fn record_intent(&self, intent: AttachmentIntent) -> Result<(), StoreError> {
739        let pool = self.pool.clone();
740        block_on_detached(async move {
741            sqlx::query(
742                "INSERT INTO lash_attachment_manifest (
743                    attachment_id, session_id, canonical_uri, intent_at_ms, committed_at_ms
744                 )
745                 VALUES ($1, $2, $3, $4, NULL)
746                 ON CONFLICT (attachment_id) DO UPDATE SET
747                    session_id = EXCLUDED.session_id,
748                    canonical_uri = EXCLUDED.canonical_uri,
749                    intent_at_ms = EXCLUDED.intent_at_ms",
750            )
751            .bind(intent.attachment_id.as_str())
752            .bind(intent.session_id)
753            .bind(intent.canonical_uri)
754            .bind(intent.intent_at_epoch_ms as i64)
755            .execute(&pool)
756            .await
757            .map(|_| ())
758            .map_err(store_sqlx_error)
759        })
760    }
761
762    fn commit_refs(
763        &self,
764        session_id: &str,
765        attachment_ids: &[AttachmentId],
766    ) -> Result<(), StoreError> {
767        let pool = self.pool.clone();
768        let session_id = session_id.to_string();
769        let attachment_ids = attachment_ids.to_vec();
770        block_on_detached(async move {
771            let mut tx = pool.begin().await.map_err(store_sqlx_error)?;
772            commit_attachment_refs_tx(&mut tx, &session_id, &attachment_ids).await?;
773            tx.commit().await.map_err(store_sqlx_error)
774        })
775    }
776
777    fn list_uncommitted(
778        &self,
779        older_than_epoch_ms: u64,
780    ) -> Result<Vec<AttachmentManifestEntry>, StoreError> {
781        let pool = self.pool.clone();
782        block_on_detached(async move {
783            let rows = sqlx::query(
784                "SELECT attachment_id, session_id, canonical_uri, intent_at_ms, committed_at_ms
785                 FROM lash_attachment_manifest
786                 WHERE committed_at_ms IS NULL AND intent_at_ms <= $1
787                 ORDER BY attachment_id ASC",
788            )
789            .bind(older_than_epoch_ms as i64)
790            .fetch_all(&pool)
791            .await
792            .map_err(store_sqlx_error)?;
793            Ok(rows
794                .into_iter()
795                .map(|row| AttachmentManifestEntry {
796                    attachment_id: AttachmentId::new(row.get::<String, _>(0)),
797                    session_id: row.get(1),
798                    canonical_uri: row.get(2),
799                    intent_at_epoch_ms: row.get::<i64, _>(3) as u64,
800                    committed_at_epoch_ms: row.get::<Option<i64>, _>(4).map(|value| value as u64),
801                })
802                .collect())
803        })
804    }
805
806    fn forget(&self, attachment_id: &AttachmentId) -> Result<(), StoreError> {
807        let pool = self.pool.clone();
808        let attachment_id = attachment_id.to_string();
809        block_on_detached(async move {
810            sqlx::query("DELETE FROM lash_attachment_manifest WHERE attachment_id = $1")
811                .bind(attachment_id)
812                .execute(&pool)
813                .await
814                .map(|_| ())
815                .map_err(store_sqlx_error)
816        })
817    }
818}
819
820#[async_trait::async_trait]
821impl SessionStoreFactory for PostgresSessionStoreFactory {
822    fn durability_tier(&self) -> DurabilityTier {
823        DurabilityTier::Durable
824    }
825
826    async fn create_store(
827        &self,
828        request: &SessionStoreCreateRequest,
829    ) -> Result<Arc<dyn RuntimePersistence>, String> {
830        let store = PostgresSessionStore {
831            pool: self.pool.clone(),
832            session_id: Some(request.session_id.clone()),
833            bound_session: Arc::new(OnceLock::new()),
834        };
835        if store
836            .load_session_meta()
837            .await
838            .map_err(|err| err.to_string())?
839            .is_none()
840        {
841            store
842                .save_session_meta(SessionMeta {
843                    session_id: request.session_id.clone(),
844                    session_name: request.session_id.clone(),
845                    created_at: current_timestamp_string(),
846                    model: request.policy.model.id.clone(),
847                    cwd: std::env::current_dir()
848                        .ok()
849                        .and_then(|path| path.to_str().map(str::to_string)),
850                    relation: request.relation.clone(),
851                })
852                .await
853                .map_err(|err| err.to_string())?;
854        }
855        Ok(Arc::new(store))
856    }
857
858    async fn open_existing_store(
859        &self,
860        request: &SessionStoreCreateRequest,
861    ) -> Result<Option<Arc<dyn RuntimePersistence>>, String> {
862        let store = PostgresSessionStore {
863            pool: self.pool.clone(),
864            session_id: Some(request.session_id.clone()),
865            bound_session: Arc::new(OnceLock::new()),
866        };
867        if store
868            .load_session_meta()
869            .await
870            .map_err(|err| err.to_string())?
871            .is_some()
872        {
873            Ok(Some(Arc::new(store)))
874        } else {
875            Ok(None)
876        }
877    }
878
879    async fn delete_session(&self, session_id: &str) -> Result<(), String> {
880        let mut tx = self.pool.begin().await.map_err(|err| err.to_string())?;
881        for sql in [
882            "DELETE FROM lash_queued_work_items WHERE batch_id IN (SELECT batch_id FROM lash_queued_work_batches WHERE session_id = $1)",
883            "DELETE FROM lash_queued_work_batches WHERE session_id = $1",
884            "DELETE FROM lash_usage_deltas WHERE session_id = $1",
885            "DELETE FROM lash_graph_nodes WHERE session_id = $1",
886            "DELETE FROM lash_runtime_turn_commits WHERE session_id = $1",
887            "DELETE FROM lash_session_meta WHERE session_id = $1",
888            "DELETE FROM lash_sessions WHERE session_id = $1",
889            "DELETE FROM lash_attachment_manifest WHERE session_id = $1",
890        ] {
891            sqlx::query(sql)
892                .bind(session_id)
893                .execute(&mut *tx)
894                .await
895                .map_err(|err| err.to_string())?;
896        }
897        tx.commit().await.map_err(|err| err.to_string())
898    }
899}
900
901#[derive(Clone, Debug)]
902struct QueuedBatchRow {
903    enqueue_seq: u64,
904    batch_id: String,
905    session_id: String,
906    source_key: Option<String>,
907    delivery_policy: DeliveryPolicy,
908    slot_policy: SlotPolicy,
909    merge_key: MergeKey,
910    available_at_ms: u64,
911    enqueued_at_ms: u64,
912    claim_fencing_token: u64,
913}
914
915fn queued_batch_row(row: PgRow) -> Result<QueuedBatchRow, StoreError> {
916    let delivery_policy =
917        DeliveryPolicy::from_wire_str(row.get::<String, _>("delivery_policy").as_str())
918            .ok_or_else(|| {
919                StoreError::Backend("invalid queued work delivery policy".to_string())
920            })?;
921    let slot_policy = SlotPolicy::from_wire_str(row.get::<String, _>("slot_policy").as_str())
922        .ok_or_else(|| StoreError::Backend("invalid queued work slot policy".to_string()))?;
923    let merge_json: String = row.get("merge_key_json");
924    Ok(QueuedBatchRow {
925        enqueue_seq: row.get::<i64, _>("enqueue_seq") as u64,
926        batch_id: row.get("batch_id"),
927        session_id: row.get("session_id"),
928        source_key: row.get("source_key"),
929        delivery_policy,
930        slot_policy,
931        merge_key: store_decode_json(&merge_json, "queued work merge key")?,
932        available_at_ms: row.get::<i64, _>("available_at_ms") as u64,
933        enqueued_at_ms: row.get::<i64, _>("enqueued_at_ms") as u64,
934        claim_fencing_token: row.get::<i64, _>("claim_fencing_token") as u64,
935    })
936}
937
938async fn load_queued_batch(
939    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
940    batch_id: &str,
941) -> Result<Option<QueuedWorkBatch>, StoreError> {
942    let row = sqlx::query(
943        "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
944                slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
945                claim_fencing_token
946         FROM lash_queued_work_batches
947         WHERE batch_id = $1",
948    )
949    .bind(batch_id)
950    .fetch_optional(&mut **tx)
951    .await
952    .map_err(store_sqlx_error)?;
953    let Some(row) = row else {
954        return Ok(None);
955    };
956    let row = queued_batch_row(row)?;
957    queued_work_batch_from_row(tx, row).await.map(Some)
958}
959
960async fn queued_work_batch_from_row(
961    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
962    row: QueuedBatchRow,
963) -> Result<QueuedWorkBatch, StoreError> {
964    let item_rows = sqlx::query(
965        "SELECT item_id, payload_json
966         FROM lash_queued_work_items
967         WHERE batch_id = $1
968         ORDER BY item_index ASC",
969    )
970    .bind(&row.batch_id)
971    .fetch_all(&mut **tx)
972    .await
973    .map_err(store_sqlx_error)?;
974    let mut items = Vec::new();
975    for item in item_rows {
976        let payload_json: String = item.get(1);
977        items.push(QueuedWorkItem {
978            item_id: item.get(0),
979            payload: store_decode_json(&payload_json, "queued work payload")?,
980        });
981    }
982    Ok(QueuedWorkBatch {
983        batch_id: row.batch_id,
984        session_id: row.session_id,
985        enqueue_seq: row.enqueue_seq,
986        source_key: row.source_key,
987        delivery_policy: row.delivery_policy,
988        slot_policy: row.slot_policy,
989        merge_key: row.merge_key,
990        available_at_ms: row.available_at_ms,
991        enqueued_at_ms: row.enqueued_at_ms,
992        items,
993    })
994}
995
996async fn ensure_queued_work_completion_tx(
997    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
998    completed: &QueuedWorkCompletion,
999) -> Result<(), StoreError> {
1000    for batch_id in &completed.batch_ids {
1001        let exists: Option<i64> = sqlx::query_scalar(
1002            "SELECT 1::BIGINT FROM lash_queued_work_batches
1003             WHERE session_id = $1
1004               AND batch_id = $2
1005               AND claim_id = $3
1006               AND claim_token = $4
1007             LIMIT 1",
1008        )
1009        .bind(&completed.session_id)
1010        .bind(batch_id)
1011        .bind(&completed.claim_id)
1012        .bind(&completed.lease_token)
1013        .fetch_optional(&mut **tx)
1014        .await
1015        .map_err(store_sqlx_error)?;
1016        if exists.is_none() {
1017            return Err(StoreError::QueuedWorkClaimExpired {
1018                session_id: completed.session_id.clone(),
1019                claim_id: completed.claim_id.clone(),
1020            });
1021        }
1022    }
1023    Ok(())
1024}
1025
1026#[async_trait::async_trait]
1027impl RuntimePersistence for PostgresSessionStore {
1028    fn durability_tier(&self) -> DurabilityTier {
1029        DurabilityTier::Durable
1030    }
1031
1032    async fn load_session(
1033        &self,
1034        scope: SessionReadScope,
1035    ) -> Result<Option<PersistedSessionRead>, StoreError> {
1036        let Some(session_id) = self.selected_session_id().await? else {
1037            return Ok(None);
1038        };
1039        let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1040        let Some(meta) = load_session_head_meta_tx(&mut tx, &session_id, false).await? else {
1041            return Ok(None);
1042        };
1043        tx.commit().await.map_err(store_sqlx_error)?;
1044        let leaf_node_id = match &scope {
1045            SessionReadScope::FullGraph => meta.leaf_node_id.clone(),
1046            SessionReadScope::ActivePath { leaf_node_id } => {
1047                leaf_node_id.clone().or_else(|| meta.leaf_node_id.clone())
1048            }
1049        };
1050        let graph = load_graph(
1051            &self.pool,
1052            &session_id,
1053            leaf_node_id.clone(),
1054            matches!(scope, SessionReadScope::ActivePath { .. }),
1055        )
1056        .await?;
1057        let checkpoint = match meta.checkpoint_ref.as_ref() {
1058            Some(blob_ref) => get_checkpoint(&self.pool, blob_ref).await,
1059            None => None,
1060        };
1061        Ok(Some(PersistedSessionRead {
1062            session_id: meta.session_id,
1063            head_revision: meta.head_revision,
1064            config: meta.config,
1065            agent_frames: meta.agent_frames,
1066            current_agent_frame_id: meta.current_agent_frame_id,
1067            graph,
1068            checkpoint_ref: meta.checkpoint_ref,
1069            checkpoint,
1070            token_ledger: merge_token_ledger_entries(
1071                load_usage_deltas(&self.pool, &session_id).await,
1072            ),
1073        }))
1074    }
1075
1076    async fn load_node(&self, node_id: &str) -> Result<Option<SessionNodeRecord>, StoreError> {
1077        let json: Option<String> = if let Some(session_id) = &self.session_id {
1078            sqlx::query_scalar(
1079                "SELECT node_json FROM lash_graph_nodes
1080                 WHERE session_id = $1 AND node_id = $2 AND tombstoned = FALSE",
1081            )
1082            .bind(session_id)
1083            .bind(node_id)
1084            .fetch_optional(&self.pool)
1085            .await
1086            .map_err(store_sqlx_error)?
1087        } else {
1088            sqlx::query_scalar(
1089                "SELECT node_json FROM lash_graph_nodes
1090                 WHERE node_id = $1 AND tombstoned = FALSE
1091                 ORDER BY session_id ASC
1092                 LIMIT 1",
1093            )
1094            .bind(node_id)
1095            .fetch_optional(&self.pool)
1096            .await
1097            .map_err(store_sqlx_error)?
1098        };
1099        json.map(|json| store_decode_json(&json, "session graph node"))
1100            .transpose()
1101    }
1102
1103    async fn commit_runtime_state(
1104        &self,
1105        commit: RuntimeCommit,
1106    ) -> Result<RuntimeCommitResult, StoreError> {
1107        let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1108        // Read the head WITHOUT a lock; the conditional CAS write below is the
1109        // serialization point (optimistic concurrency), so no pessimistic
1110        // `FOR UPDATE` lock is held across the rest of this transaction.
1111        let existing = load_session_head_meta_tx(&mut tx, &commit.session_id, false).await?;
1112        if let Some(bound_session_id) = existing.as_ref().map(|meta| meta.session_id.as_str())
1113            && bound_session_id != commit.session_id
1114        {
1115            return Err(StoreError::SessionBindingMismatch {
1116                bound_session_id: bound_session_id.to_string(),
1117                attempted_session_id: commit.session_id,
1118            });
1119        }
1120        // A session-store handle commits to exactly one session. An explicit
1121        // binding (`self.session_id`) is authoritative; otherwise the handle binds
1122        // to the first session it commits and rejects any other thereafter.
1123        let effective_binding = self
1124            .session_id
1125            .clone()
1126            .or_else(|| self.bound_session.get().cloned());
1127        if let Some(bound_session_id) = &effective_binding
1128            && commit.session_id != *bound_session_id
1129        {
1130            return Err(StoreError::SessionBindingMismatch {
1131                bound_session_id: bound_session_id.clone(),
1132                attempted_session_id: commit.session_id,
1133            });
1134        }
1135        if self.session_id.is_none() {
1136            let _ = self.bound_session.set(commit.session_id.clone());
1137        }
1138        if let Some(completed) = &commit.turn_commit {
1139            if completed.session_id != commit.session_id {
1140                return Err(StoreError::RuntimeTurnCommitConflict {
1141                    session_id: completed.session_id.clone(),
1142                    turn_id: completed.turn_id.clone(),
1143                });
1144            }
1145            let prior = sqlx::query(
1146                "SELECT turn_commit_hash, result_json
1147                 FROM lash_runtime_turn_commits
1148                 WHERE session_id = $1 AND turn_id = $2",
1149            )
1150            .bind(&completed.session_id)
1151            .bind(&completed.turn_id)
1152            .fetch_optional(&mut *tx)
1153            .await
1154            .map_err(store_sqlx_error)?;
1155            if let Some(row) = prior {
1156                let hash: String = row.get(0);
1157                let result_json: String = row.get(1);
1158                if hash == completed.turn_commit_hash {
1159                    return store_decode_json(&result_json, "runtime turn commit result");
1160                }
1161                return Err(StoreError::RuntimeTurnCommitConflict {
1162                    session_id: completed.session_id.clone(),
1163                    turn_id: completed.turn_id.clone(),
1164                });
1165            }
1166        }
1167        let actual_revision = existing.as_ref().map_or(0, |meta| meta.head_revision);
1168        if commit.expected_head_revision.is_some()
1169            && commit.expected_head_revision != Some(actual_revision)
1170        {
1171            return Err(StoreError::HeadRevisionConflict {
1172                expected: commit.expected_head_revision,
1173                actual: actual_revision,
1174            });
1175        }
1176        for completed in &commit.completed_queue_claims {
1177            if completed.session_id != commit.session_id {
1178                return Err(StoreError::QueuedWorkClaimExpired {
1179                    session_id: completed.session_id.clone(),
1180                    claim_id: completed.claim_id.clone(),
1181                });
1182            }
1183            ensure_queued_work_completion_tx(&mut tx, completed).await?;
1184        }
1185        let (checkpoint_ref, manifest) = put_checkpoint_tx(&mut tx, &commit.checkpoint).await?;
1186        for entry in &commit.usage_deltas {
1187            sqlx::query("INSERT INTO lash_usage_deltas (session_id, entry_json) VALUES ($1, $2)")
1188                .bind(&commit.session_id)
1189                .bind(encode_json(entry))
1190                .execute(&mut *tx)
1191                .await
1192                .map_err(store_sqlx_error)?;
1193        }
1194        let leaf_node_id = match &commit.graph {
1195            GraphCommitDelta::Unchanged { leaf_node_id } => leaf_node_id.clone(),
1196            GraphCommitDelta::Append {
1197                nodes,
1198                leaf_node_id,
1199            } => {
1200                for node in nodes {
1201                    sqlx::query(
1202                        "INSERT INTO lash_graph_nodes (session_id, node_id, node_json)
1203                         VALUES ($1, $2, $3)
1204                         ON CONFLICT (session_id, node_id) DO UPDATE SET
1205                            node_json = EXCLUDED.node_json,
1206                            tombstoned = FALSE",
1207                    )
1208                    .bind(&commit.session_id)
1209                    .bind(&node.node_id)
1210                    .bind(encode_json(node))
1211                    .execute(&mut *tx)
1212                    .await
1213                    .map_err(store_sqlx_error)?;
1214                }
1215                leaf_node_id.clone()
1216            }
1217            GraphCommitDelta::ReplaceFull(graph) => {
1218                sqlx::query("DELETE FROM lash_graph_nodes WHERE session_id = $1")
1219                    .bind(&commit.session_id)
1220                    .execute(&mut *tx)
1221                    .await
1222                    .map_err(store_sqlx_error)?;
1223                for node in &graph.nodes {
1224                    sqlx::query(
1225                        "INSERT INTO lash_graph_nodes (session_id, node_id, node_json)
1226                         VALUES ($1, $2, $3)",
1227                    )
1228                    .bind(&commit.session_id)
1229                    .bind(&node.node_id)
1230                    .bind(encode_json(node))
1231                    .execute(&mut *tx)
1232                    .await
1233                    .map_err(store_sqlx_error)?;
1234                }
1235                graph.leaf_node_id.clone()
1236            }
1237        };
1238        let graph_node_count: i64 = sqlx::query_scalar(
1239            "SELECT COUNT(*) FROM lash_graph_nodes WHERE session_id = $1 AND tombstoned = FALSE",
1240        )
1241        .bind(&commit.session_id)
1242        .fetch_one(&mut *tx)
1243        .await
1244        .map_err(store_sqlx_error)?;
1245        let next_revision = actual_revision + 1;
1246        let meta = SessionHeadMeta {
1247            session_id: commit.session_id.clone(),
1248            head_revision: next_revision,
1249            config: commit.config.clone(),
1250            agent_frames: commit.agent_frames.clone(),
1251            current_agent_frame_id: commit.current_agent_frame_id.clone(),
1252            checkpoint_ref: Some(checkpoint_ref.clone()),
1253            leaf_node_id,
1254            graph_node_count: graph_node_count as usize,
1255            token_ledger: Vec::new(),
1256        };
1257        // Optimistic CAS on the head revision. The `WHERE head_revision = $5`
1258        // guard makes the write succeed only if no concurrent committer moved the
1259        // head since our unlocked read above. A brand-new session inserts (no
1260        // conflict); an existing one updates only when the revision still matches.
1261        let head_write = sqlx::query(
1262            "INSERT INTO lash_sessions (session_id, head_revision, head_json, checkpoint_ref)
1263             VALUES ($1, $2, $3, $4)
1264             ON CONFLICT (session_id) DO UPDATE SET
1265                head_revision = EXCLUDED.head_revision,
1266                head_json = EXCLUDED.head_json,
1267                checkpoint_ref = EXCLUDED.checkpoint_ref
1268             WHERE lash_sessions.head_revision = $5",
1269        )
1270        .bind(&commit.session_id)
1271        .bind(next_revision as i64)
1272        .bind(encode_json(&meta))
1273        .bind(checkpoint_ref.as_str())
1274        .bind(actual_revision as i64)
1275        .execute(&mut *tx)
1276        .await;
1277        let head_write = match head_write {
1278            Ok(result) => result,
1279            Err(err) if is_contention_error(&err) => {
1280                // The head row is contended by a concurrent committer (lock
1281                // timeout / serialization failure / deadlock). That is a conflict,
1282                // not an opaque backend error: surface it so the caller reloads
1283                // and retries. The tx is now aborted; returning drops it.
1284                return Err(StoreError::HeadRevisionConflict {
1285                    expected: commit.expected_head_revision.or(Some(actual_revision)),
1286                    actual: actual_revision,
1287                });
1288            }
1289            Err(err) => return Err(store_sqlx_error(err)),
1290        };
1291        if head_write.rows_affected() == 0 {
1292            // A concurrent commit won the race: the head no longer matches the
1293            // revision we read. Re-read the now-current revision for an accurate
1294            // report, then drop `tx` (auto-rollback), discarding this attempt's
1295            // node/usage writes; the caller reloads and retries.
1296            let actual_now = sqlx::query_scalar::<_, i64>(
1297                "SELECT head_revision FROM lash_sessions WHERE session_id = $1",
1298            )
1299            .bind(&commit.session_id)
1300            .fetch_optional(&mut *tx)
1301            .await
1302            .map_err(store_sqlx_error)?
1303            .map_or(actual_revision, |revision| revision as u64);
1304            return Err(StoreError::HeadRevisionConflict {
1305                expected: commit.expected_head_revision.or(Some(actual_revision)),
1306                actual: actual_now,
1307            });
1308        }
1309        for completed in &commit.completed_queue_claims {
1310            for batch_id in &completed.batch_ids {
1311                sqlx::query(
1312                    "DELETE FROM lash_queued_work_batches
1313                     WHERE session_id = $1 AND batch_id = $2 AND claim_id = $3 AND claim_token = $4",
1314                )
1315                .bind(&completed.session_id)
1316                .bind(batch_id)
1317                .bind(&completed.claim_id)
1318                .bind(&completed.lease_token)
1319                .execute(&mut *tx)
1320                .await
1321                .map_err(store_sqlx_error)?;
1322            }
1323        }
1324        commit_attachment_refs_tx(
1325            &mut tx,
1326            &commit.session_id,
1327            &commit.committed_attachment_ids,
1328        )
1329        .await?;
1330        let result = RuntimeCommitResult {
1331            head_revision: next_revision,
1332            checkpoint_ref,
1333            manifest,
1334        };
1335        if let Some(completed) = &commit.turn_commit {
1336            sqlx::query(
1337                "INSERT INTO lash_runtime_turn_commits (
1338                    session_id, turn_id, turn_commit_hash, result_json, committed_at_ms
1339                 )
1340                 VALUES ($1, $2, $3, $4, $5)",
1341            )
1342            .bind(&completed.session_id)
1343            .bind(&completed.turn_id)
1344            .bind(&completed.turn_commit_hash)
1345            .bind(encode_json(&result))
1346            .bind(current_epoch_ms() as i64)
1347            .execute(&mut *tx)
1348            .await
1349            .map_err(store_sqlx_error)?;
1350        }
1351        tx.commit().await.map_err(store_sqlx_error)?;
1352        Ok(result)
1353    }
1354
1355    async fn enqueue_queued_work(
1356        &self,
1357        batch: QueuedWorkBatchDraft,
1358    ) -> Result<QueuedWorkBatch, StoreError> {
1359        let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1360        if let Some(source_key) = batch.source_key.as_deref() {
1361            let existing_id: Option<String> = sqlx::query_scalar(
1362                "SELECT batch_id FROM lash_queued_work_batches
1363                 WHERE session_id = $1 AND source_key = $2",
1364            )
1365            .bind(&batch.session_id)
1366            .bind(source_key)
1367            .fetch_optional(&mut *tx)
1368            .await
1369            .map_err(store_sqlx_error)?;
1370            if let Some(batch_id) = existing_id {
1371                let existing = load_queued_batch(&mut tx, &batch_id)
1372                    .await?
1373                    .ok_or_else(|| {
1374                        StoreError::Backend("queued work source row disappeared".to_string())
1375                    })?;
1376                tx.commit().await.map_err(store_sqlx_error)?;
1377                return Ok(existing);
1378            }
1379        }
1380        let now = current_epoch_ms();
1381        let batch_id = derive_batch_id(&batch.session_id, batch.source_key.as_deref(), now, None);
1382        let row = sqlx::query_scalar::<_, i64>(
1383            "INSERT INTO lash_queued_work_batches (
1384                batch_id, session_id, source_key, delivery_policy, slot_policy,
1385                merge_key_json, available_at_ms, enqueued_at_ms
1386             )
1387             VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
1388             RETURNING enqueue_seq",
1389        )
1390        .bind(&batch_id)
1391        .bind(&batch.session_id)
1392        .bind(&batch.source_key)
1393        .bind(batch.delivery_policy.as_str())
1394        .bind(batch.slot_policy.as_str())
1395        .bind(encode_json(&batch.merge_key))
1396        .bind(batch.available_at_ms as i64)
1397        .bind(now as i64)
1398        .fetch_one(&mut *tx)
1399        .await
1400        .map_err(store_sqlx_error)?;
1401        for (index, payload) in batch.payloads.iter().enumerate() {
1402            let item_id = format!("{batch_id}:item:{index}");
1403            sqlx::query(
1404                "INSERT INTO lash_queued_work_items (batch_id, item_index, item_id, payload_json)
1405                 VALUES ($1, $2, $3, $4)",
1406            )
1407            .bind(&batch_id)
1408            .bind(index as i32)
1409            .bind(item_id)
1410            .bind(encode_json(payload))
1411            .execute(&mut *tx)
1412            .await
1413            .map_err(store_sqlx_error)?;
1414        }
1415        let queued = load_queued_batch(&mut tx, &batch_id)
1416            .await?
1417            .ok_or_else(|| StoreError::Backend("queued work insert disappeared".to_string()))?;
1418        debug_assert_eq!(queued.enqueue_seq, row as u64);
1419        tx.commit().await.map_err(store_sqlx_error)?;
1420        Ok(queued)
1421    }
1422
1423    async fn claim_ready_queued_work(
1424        &self,
1425        session_id: &str,
1426        owner_id: &str,
1427        boundary: QueuedWorkClaimBoundary,
1428        lease_ttl_ms: u64,
1429        max_batches: usize,
1430    ) -> Result<Option<QueuedWorkClaim>, StoreError> {
1431        if max_batches == 0 {
1432            return Ok(None);
1433        }
1434        let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1435        let now = current_epoch_ms();
1436        let rows = sqlx::query(
1437            "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1438                    slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1439                    claim_fencing_token
1440             FROM lash_queued_work_batches
1441             WHERE session_id = $1
1442               AND available_at_ms <= $2
1443               AND (claim_token IS NULL OR claim_expires_at_ms <= $2)
1444             ORDER BY enqueue_seq ASC
1445             LIMIT $3
1446             FOR UPDATE SKIP LOCKED",
1447        )
1448        .bind(session_id)
1449        .bind(now as i64)
1450        .bind(claim_scan_limit(max_batches))
1451        .fetch_all(&mut *tx)
1452        .await
1453        .map_err(store_sqlx_error)?;
1454        let mut selected = Vec::new();
1455        for row in rows {
1456            selected.push(queued_batch_row(row)?);
1457        }
1458        let candidates = selected
1459            .iter()
1460            .map(|row| ClaimCandidate {
1461                enqueue_seq: row.enqueue_seq,
1462                claim_fencing_token: row.claim_fencing_token,
1463                delivery_policy: row.delivery_policy,
1464                slot_policy: row.slot_policy,
1465                merge_key: row.merge_key.clone(),
1466            })
1467            .collect::<Vec<_>>();
1468        let selected_len = select_claim_prefix(&candidates, boundary, max_batches);
1469        if selected_len == 0 {
1470            tx.commit().await.map_err(store_sqlx_error)?;
1471            return Ok(None);
1472        }
1473        selected.truncate(selected_len);
1474        let lease =
1475            QueuedWorkClaimLease::derive(&candidates[0], session_id, owner_id, now, lease_ttl_ms);
1476        for row in &selected {
1477            let changed = sqlx::query(
1478                "UPDATE lash_queued_work_batches
1479                 SET claim_id = $3,
1480                     claim_owner_id = $4,
1481                     claim_token = $5,
1482                     claim_fencing_token = claim_fencing_token + 1,
1483                     claim_claimed_at_ms = $6,
1484                     claim_expires_at_ms = $7
1485                 WHERE session_id = $1
1486                   AND batch_id = $2
1487                   AND (claim_token IS NULL OR claim_expires_at_ms <= $6)",
1488            )
1489            .bind(session_id)
1490            .bind(&row.batch_id)
1491            .bind(&lease.claim_id)
1492            .bind(owner_id)
1493            .bind(&lease.lease_token)
1494            .bind(now as i64)
1495            .bind(lease.expires_at_epoch_ms as i64)
1496            .execute(&mut *tx)
1497            .await
1498            .map_err(store_sqlx_error)?
1499            .rows_affected();
1500            if changed == 0 {
1501                tx.rollback().await.map_err(store_sqlx_error)?;
1502                return Ok(None);
1503            }
1504        }
1505        let mut batches = Vec::new();
1506        for row in selected {
1507            batches.push(queued_work_batch_from_row(&mut tx, row).await?);
1508        }
1509        tx.commit().await.map_err(store_sqlx_error)?;
1510        Ok(Some(QueuedWorkClaim {
1511            session_id: session_id.to_string(),
1512            claim_id: lease.claim_id,
1513            owner_id: owner_id.to_string(),
1514            lease_token: lease.lease_token,
1515            fencing_token: lease.fencing_token,
1516            claimed_at_epoch_ms: lease.claimed_at_epoch_ms,
1517            expires_at_epoch_ms: lease.expires_at_epoch_ms,
1518            batches,
1519        }))
1520    }
1521
1522    async fn renew_queued_work_claim(
1523        &self,
1524        claim: &QueuedWorkClaim,
1525        lease_ttl_ms: u64,
1526    ) -> Result<QueuedWorkClaim, StoreError> {
1527        let expires_at = current_epoch_ms().saturating_add(lease_ttl_ms);
1528        let changed = sqlx::query(
1529            "UPDATE lash_queued_work_batches
1530             SET claim_expires_at_ms = $4
1531             WHERE session_id = $1 AND claim_id = $2 AND claim_token = $3",
1532        )
1533        .bind(&claim.session_id)
1534        .bind(&claim.claim_id)
1535        .bind(&claim.lease_token)
1536        .bind(expires_at as i64)
1537        .execute(&self.pool)
1538        .await
1539        .map_err(store_sqlx_error)?
1540        .rows_affected();
1541        renewed_claim(claim, changed as usize, expires_at)
1542    }
1543
1544    async fn abandon_queued_work_claim(&self, claim: &QueuedWorkClaim) -> Result<(), StoreError> {
1545        sqlx::query(
1546            "UPDATE lash_queued_work_batches
1547             SET claim_id = NULL,
1548                 claim_owner_id = NULL,
1549                 claim_token = NULL,
1550                 claim_claimed_at_ms = 0,
1551                 claim_expires_at_ms = 0
1552             WHERE session_id = $1 AND claim_id = $2 AND claim_token = $3",
1553        )
1554        .bind(&claim.session_id)
1555        .bind(&claim.claim_id)
1556        .bind(&claim.lease_token)
1557        .execute(&self.pool)
1558        .await
1559        .map_err(store_sqlx_error)?;
1560        Ok(())
1561    }
1562
1563    async fn cancel_queued_work_batch(
1564        &self,
1565        session_id: &str,
1566        batch_id: &str,
1567    ) -> Result<Option<QueuedWorkBatch>, StoreError> {
1568        let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1569        let now = current_epoch_ms();
1570        let row = sqlx::query(
1571            "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1572                    slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1573                    claim_fencing_token
1574             FROM lash_queued_work_batches
1575             WHERE session_id = $1
1576               AND batch_id = $2
1577               AND (claim_token IS NULL OR claim_expires_at_ms <= $3)
1578             FOR UPDATE",
1579        )
1580        .bind(session_id)
1581        .bind(batch_id)
1582        .bind(now as i64)
1583        .fetch_optional(&mut *tx)
1584        .await
1585        .map_err(store_sqlx_error)?;
1586        let Some(row) = row else {
1587            tx.commit().await.map_err(store_sqlx_error)?;
1588            return Ok(None);
1589        };
1590        let batch = queued_work_batch_from_row(&mut tx, queued_batch_row(row)?).await?;
1591        sqlx::query("DELETE FROM lash_queued_work_batches WHERE batch_id = $1")
1592            .bind(batch_id)
1593            .execute(&mut *tx)
1594            .await
1595            .map_err(store_sqlx_error)?;
1596        tx.commit().await.map_err(store_sqlx_error)?;
1597        Ok(Some(batch))
1598    }
1599
1600    async fn list_queued_work(&self, session_id: &str) -> Result<Vec<QueuedWorkBatch>, StoreError> {
1601        let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1602        let rows = sqlx::query(
1603            "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1604                    slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1605                    claim_fencing_token
1606             FROM lash_queued_work_batches
1607             WHERE session_id = $1
1608             ORDER BY enqueue_seq ASC",
1609        )
1610        .bind(session_id)
1611        .fetch_all(&mut *tx)
1612        .await
1613        .map_err(store_sqlx_error)?;
1614        let mut batches = Vec::new();
1615        for row in rows {
1616            batches.push(queued_work_batch_from_row(&mut tx, queued_batch_row(row)?).await?);
1617        }
1618        tx.commit().await.map_err(store_sqlx_error)?;
1619        Ok(batches)
1620    }
1621
1622    async fn list_pending_queued_work(
1623        &self,
1624        session_id: &str,
1625    ) -> Result<Vec<QueuedWorkBatch>, StoreError> {
1626        let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1627        let now = current_epoch_ms();
1628        let rows = sqlx::query(
1629            "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1630                    slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1631                    claim_fencing_token
1632             FROM lash_queued_work_batches
1633             WHERE session_id = $1
1634               AND (claim_token IS NULL OR claim_expires_at_ms <= $2)
1635             ORDER BY enqueue_seq ASC",
1636        )
1637        .bind(session_id)
1638        .bind(now as i64)
1639        .fetch_all(&mut *tx)
1640        .await
1641        .map_err(store_sqlx_error)?;
1642        let mut batches = Vec::new();
1643        for row in rows {
1644            batches.push(queued_work_batch_from_row(&mut tx, queued_batch_row(row)?).await?);
1645        }
1646        tx.commit().await.map_err(store_sqlx_error)?;
1647        Ok(batches)
1648    }
1649
1650    async fn save_session_meta(&self, meta: SessionMeta) -> Result<(), StoreError> {
1651        sqlx::query(
1652            "INSERT INTO lash_session_meta (session_id, meta_json)
1653             VALUES ($1, $2)
1654             ON CONFLICT (session_id) DO UPDATE SET meta_json = EXCLUDED.meta_json",
1655        )
1656        .bind(&meta.session_id)
1657        .bind(encode_json(&meta))
1658        .execute(&self.pool)
1659        .await
1660        .map_err(store_sqlx_error)?;
1661        Ok(())
1662    }
1663
1664    async fn load_session_meta(&self) -> Result<Option<SessionMeta>, StoreError> {
1665        let json: Option<String> = if let Some(session_id) = &self.session_id {
1666            sqlx::query_scalar("SELECT meta_json FROM lash_session_meta WHERE session_id = $1")
1667                .bind(session_id)
1668                .fetch_optional(&self.pool)
1669                .await
1670                .map_err(store_sqlx_error)?
1671        } else {
1672            sqlx::query_scalar(
1673                "SELECT meta_json FROM lash_session_meta ORDER BY session_id ASC LIMIT 1",
1674            )
1675            .fetch_optional(&self.pool)
1676            .await
1677            .map_err(store_sqlx_error)?
1678        };
1679        json.map(|json| store_decode_json(&json, "session meta"))
1680            .transpose()
1681    }
1682
1683    async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), StoreError> {
1684        for id in ids {
1685            if let Some(session_id) = &self.session_id {
1686                sqlx::query(
1687                    "UPDATE lash_graph_nodes
1688                     SET tombstoned = TRUE
1689                     WHERE session_id = $1 AND node_id = $2",
1690                )
1691                .bind(session_id)
1692                .bind(id)
1693                .execute(&self.pool)
1694                .await
1695                .map_err(store_sqlx_error)?;
1696            } else {
1697                sqlx::query(
1698                    "UPDATE lash_graph_nodes
1699                     SET tombstoned = TRUE
1700                     WHERE node_id = $1",
1701                )
1702                .bind(id)
1703                .execute(&self.pool)
1704                .await
1705                .map_err(store_sqlx_error)?;
1706            }
1707        }
1708        Ok(())
1709    }
1710
1711    async fn vacuum(&self) -> Result<VacuumReport, StoreError> {
1712        let removed = if let Some(session_id) = &self.session_id {
1713            sqlx::query("DELETE FROM lash_graph_nodes WHERE session_id = $1 AND tombstoned = TRUE")
1714                .bind(session_id)
1715                .execute(&self.pool)
1716                .await
1717                .map_err(store_sqlx_error)?
1718                .rows_affected()
1719        } else {
1720            sqlx::query("DELETE FROM lash_graph_nodes WHERE tombstoned = TRUE")
1721                .execute(&self.pool)
1722                .await
1723                .map_err(store_sqlx_error)?
1724                .rows_affected()
1725        };
1726        Ok(VacuumReport {
1727            removed_node_count: removed as usize,
1728        })
1729    }
1730
1731    async fn gc_unreachable(&self) -> Result<GcReport, StoreError> {
1732        Ok(GcReport::default())
1733    }
1734}
1735
1736fn process_status_label(record: &ProcessRecord) -> &'static str {
1737    record.status.label()
1738}
1739
1740async fn load_process_tx(
1741    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1742    process_id: &str,
1743) -> Result<Option<ProcessRecord>, PluginError> {
1744    let json: Option<String> = sqlx::query_scalar(
1745        "SELECT record_json
1746             FROM lash_processes
1747             WHERE process_id = $1
1748             FOR UPDATE",
1749    )
1750    .bind(process_id)
1751    .fetch_optional(&mut **tx)
1752    .await
1753    .map_err(plugin_sqlx_error)?;
1754    json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
1755        .transpose()
1756}
1757
1758async fn load_process(
1759    pool: &PgPool,
1760    process_id: &str,
1761) -> Result<Option<ProcessRecord>, PluginError> {
1762    let json: Option<String> =
1763        sqlx::query_scalar("SELECT record_json FROM lash_processes WHERE process_id = $1")
1764            .bind(process_id)
1765            .fetch_optional(pool)
1766            .await
1767            .map_err(plugin_sqlx_error)?;
1768    json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
1769        .transpose()
1770}
1771
1772async fn save_process_tx(
1773    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1774    record: &ProcessRecord,
1775) -> Result<(), PluginError> {
1776    sqlx::query(
1777        "UPDATE lash_processes
1778         SET updated_at_ms = $2, status = $3, record_json = $4
1779         WHERE process_id = $1",
1780    )
1781    .bind(&record.id)
1782    .bind(record.updated_at_ms as i64)
1783    .bind(process_status_label(record))
1784    .bind(serde_json::to_string(record).map_err(process_decode_error)?)
1785    .execute(&mut **tx)
1786    .await
1787    .map_err(plugin_sqlx_error)?;
1788    Ok(())
1789}
1790
1791async fn load_event_by_key_tx(
1792    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1793    process_id: &str,
1794    replay_key: &str,
1795) -> Result<Option<(String, ProcessEvent)>, PluginError> {
1796    let row = sqlx::query(
1797        "SELECT payload_hash, event_json
1798         FROM lash_process_events
1799         WHERE process_id = $1 AND idempotency_key = $2",
1800    )
1801    .bind(process_id)
1802    .bind(replay_key)
1803    .fetch_optional(&mut **tx)
1804    .await
1805    .map_err(plugin_sqlx_error)?;
1806    row.map(|row| {
1807        let hash: String = row.get(0);
1808        let json: String = row.get(1);
1809        serde_json::from_str(&json)
1810            .map(|event| (hash, event))
1811            .map_err(process_decode_error)
1812    })
1813    .transpose()
1814}
1815
1816async fn load_process_lease_tx(
1817    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1818    process_id: &str,
1819) -> Result<Option<ProcessLease>, PluginError> {
1820    let row = sqlx::query(
1821        "SELECT lease_owner_id, lease_token, lease_fencing_token,
1822                lease_claimed_at_ms, lease_expires_at_ms
1823         FROM lash_process_leases
1824         WHERE process_id = $1",
1825    )
1826    .bind(process_id)
1827    .fetch_optional(&mut **tx)
1828    .await
1829    .map_err(plugin_sqlx_error)?;
1830    let Some(row) = row else {
1831        return Ok(None);
1832    };
1833    let owner_id: Option<String> = row.get(0);
1834    let lease_token: Option<String> = row.get(1);
1835    let (Some(owner_id), Some(lease_token)) = (owner_id, lease_token) else {
1836        return Ok(None);
1837    };
1838    Ok(Some(ProcessLease {
1839        schema_version: PROCESS_LEASE_SCHEMA_VERSION,
1840        process_id: process_id.to_string(),
1841        owner_id,
1842        lease_token,
1843        fencing_token: row.get::<i64, _>(2) as u64,
1844        claimed_at_epoch_ms: row.get::<i64, _>(3) as u64,
1845        expires_at_epoch_ms: row.get::<i64, _>(4) as u64,
1846    }))
1847}
1848
1849fn process_lease_conflict(process_id: &str, current: &ProcessLease) -> PluginError {
1850    PluginError::Session(format!(
1851        "process `{process_id}` is already leased by `{}` until {}",
1852        current.owner_id, current.expires_at_epoch_ms
1853    ))
1854}
1855
1856fn process_lease_expired(process_id: &str) -> PluginError {
1857    PluginError::Session(format!(
1858        "process lease for `{process_id}` is missing or expired"
1859    ))
1860}
1861
1862fn guard_lease(current: Option<&ProcessLease>, lease_token: &str, now: u64) -> bool {
1863    current
1864        .map(|current| current.lease_token == lease_token && current.expires_at_epoch_ms > now)
1865        .unwrap_or(false)
1866}
1867
1868async fn list_grants_for_scope(
1869    pool: &PgPool,
1870    session_scope: &SessionScope,
1871    live_only: bool,
1872) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
1873    let status_clause = if live_only {
1874        "AND p.status = 'running'"
1875    } else {
1876        ""
1877    };
1878    let sql = format!(
1879        "SELECT g.process_id, g.descriptor_json, p.record_json
1880         FROM lash_process_handle_grants g
1881         JOIN lash_processes p ON p.process_id = g.process_id
1882         WHERE g.scope_id = $1 {status_clause}
1883         ORDER BY g.process_id ASC"
1884    );
1885    let rows = sqlx::query(&sql)
1886        .bind(session_scope.id().as_str())
1887        .fetch_all(pool)
1888        .await
1889        .map_err(plugin_sqlx_error)?;
1890    let mut entries = Vec::new();
1891    for row in rows {
1892        let process_id: String = row.get(0);
1893        let descriptor_json: String = row.get(1);
1894        let record_json: String = row.get(2);
1895        let descriptor: ProcessHandleDescriptor =
1896            serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
1897        let record: ProcessRecord =
1898            serde_json::from_str(&record_json).map_err(process_decode_error)?;
1899        entries.push((
1900            ProcessHandleGrant {
1901                session_id: session_scope.session_id.clone(),
1902                process_id,
1903                descriptor,
1904            },
1905            record,
1906        ));
1907    }
1908    Ok(entries)
1909}
1910
1911#[async_trait::async_trait]
1912impl ProcessRegistry for PostgresProcessRegistry {
1913    fn durability_tier(&self) -> DurabilityTier {
1914        DurabilityTier::Durable
1915    }
1916
1917    async fn register_process(
1918        &self,
1919        registration: ProcessRegistration,
1920    ) -> Result<ProcessRecord, PluginError> {
1921        let (registration, registration_hash) =
1922            lash_core::runtime::prepare_process_registration(registration)?;
1923        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
1924        if let Some(existing) = load_process_tx(&mut tx, &registration.id).await? {
1925            if existing.registration_hash == registration_hash {
1926                tx.commit().await.map_err(plugin_sqlx_error)?;
1927                return Ok(existing);
1928            }
1929            return Err(PluginError::Session(format!(
1930                "process `{}` registration hash conflict: existing {}, new {}",
1931                registration.id, existing.registration_hash, registration_hash
1932            )));
1933        }
1934        let now = current_epoch_ms();
1935        let record =
1936            ProcessRecord::from_prepared_registration(registration, registration_hash, now);
1937        let record_json = serde_json::to_string(&record).map_err(process_decode_error)?;
1938        sqlx::query(
1939            "INSERT INTO lash_processes (
1940                process_id, registration_hash, owner_scope_id, host_profile_id,
1941                created_at_ms, updated_at_ms, status, record_json
1942             )
1943             VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
1944        )
1945        .bind(&record.id)
1946        .bind(&record.registration_hash)
1947        .bind(record.originator_scope_id().as_str())
1948        .bind(record.host_profile_id())
1949        .bind(record.created_at_ms as i64)
1950        .bind(record.updated_at_ms as i64)
1951        .bind(process_status_label(&record))
1952        .bind(record_json)
1953        .execute(&mut *tx)
1954        .await
1955        .map_err(plugin_sqlx_error)?;
1956        tx.commit().await.map_err(plugin_sqlx_error)?;
1957        Ok(record)
1958    }
1959
1960    async fn set_external_ref(
1961        &self,
1962        process_id: &str,
1963        external_ref: ProcessExternalRef,
1964    ) -> Result<ProcessRecord, PluginError> {
1965        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
1966        let mut record = load_process_tx(&mut tx, process_id)
1967            .await?
1968            .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
1969        record.external_ref = Some(external_ref);
1970        record.updated_at_ms = current_epoch_ms();
1971        save_process_tx(&mut tx, &record).await?;
1972        tx.commit().await.map_err(plugin_sqlx_error)?;
1973        Ok(record)
1974    }
1975
1976    async fn grant_handle(
1977        &self,
1978        session_scope: &SessionScope,
1979        process_id: &str,
1980        descriptor: ProcessHandleDescriptor,
1981    ) -> Result<ProcessHandleGrant, PluginError> {
1982        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
1983        if load_process_tx(&mut tx, process_id).await?.is_none() {
1984            return Err(PluginError::Session(format!(
1985                "unknown process `{process_id}`"
1986            )));
1987        }
1988        sqlx::query(
1989            "INSERT INTO lash_process_handle_grants (session_id, scope_id, process_id, descriptor_json)
1990             VALUES ($1, $2, $3, $4)
1991             ON CONFLICT (scope_id, process_id) DO UPDATE SET
1992                session_id = EXCLUDED.session_id,
1993                descriptor_json = EXCLUDED.descriptor_json",
1994        )
1995        .bind(&session_scope.session_id)
1996        .bind(session_scope.id().as_str())
1997        .bind(process_id)
1998        .bind(serde_json::to_string(&descriptor).map_err(process_decode_error)?)
1999        .execute(&mut *tx)
2000        .await
2001        .map_err(plugin_sqlx_error)?;
2002        tx.commit().await.map_err(plugin_sqlx_error)?;
2003        Ok(ProcessHandleGrant {
2004            session_id: session_scope.session_id.clone(),
2005            process_id: process_id.to_string(),
2006            descriptor,
2007        })
2008    }
2009
2010    async fn revoke_handle(
2011        &self,
2012        session_scope: &SessionScope,
2013        process_id: &str,
2014    ) -> Result<(), PluginError> {
2015        sqlx::query(
2016            "DELETE FROM lash_process_handle_grants WHERE scope_id = $1 AND process_id = $2",
2017        )
2018        .bind(session_scope.id().as_str())
2019        .bind(process_id)
2020        .execute(&self.pool)
2021        .await
2022        .map_err(plugin_sqlx_error)?;
2023        Ok(())
2024    }
2025
2026    async fn transfer_handle_grants(
2027        &self,
2028        from_scope: &SessionScope,
2029        to_scope: &SessionScope,
2030        process_ids: &[String],
2031    ) -> Result<(), PluginError> {
2032        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2033        for process_id in process_ids {
2034            let descriptor_json: Option<String> = sqlx::query_scalar(
2035                "SELECT descriptor_json FROM lash_process_handle_grants
2036                 WHERE scope_id = $1 AND process_id = $2",
2037            )
2038            .bind(from_scope.id().as_str())
2039            .bind(process_id)
2040            .fetch_optional(&mut *tx)
2041            .await
2042            .map_err(plugin_sqlx_error)?;
2043            let Some(descriptor_json) = descriptor_json else {
2044                return Err(PluginError::Session(format!(
2045                    "process handle `{process_id}` is not granted to session `{}`",
2046                    from_scope.session_id
2047                )));
2048            };
2049            sqlx::query(
2050                "DELETE FROM lash_process_handle_grants WHERE scope_id = $1 AND process_id = $2",
2051            )
2052            .bind(from_scope.id().as_str())
2053            .bind(process_id)
2054            .execute(&mut *tx)
2055            .await
2056            .map_err(plugin_sqlx_error)?;
2057            sqlx::query(
2058                "INSERT INTO lash_process_handle_grants (session_id, scope_id, process_id, descriptor_json)
2059                 VALUES ($1, $2, $3, $4)
2060                 ON CONFLICT (scope_id, process_id) DO UPDATE SET
2061                    session_id = EXCLUDED.session_id,
2062                    descriptor_json = EXCLUDED.descriptor_json",
2063            )
2064            .bind(&to_scope.session_id)
2065            .bind(to_scope.id().as_str())
2066            .bind(process_id)
2067            .bind(descriptor_json)
2068            .execute(&mut *tx)
2069            .await
2070            .map_err(plugin_sqlx_error)?;
2071        }
2072        tx.commit().await.map_err(plugin_sqlx_error)
2073    }
2074
2075    async fn list_handle_grants(
2076        &self,
2077        session_scope: &SessionScope,
2078    ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
2079        list_grants_for_scope(&self.pool, session_scope, false).await
2080    }
2081
2082    async fn list_live_handle_grants(
2083        &self,
2084        session_scope: &SessionScope,
2085    ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
2086        list_grants_for_scope(&self.pool, session_scope, true).await
2087    }
2088
2089    async fn has_handle_grant(
2090        &self,
2091        session_scope: &SessionScope,
2092        process_id: &str,
2093    ) -> Result<bool, PluginError> {
2094        let exists: Option<i64> = sqlx::query_scalar(
2095            "SELECT 1::BIGINT FROM lash_process_handle_grants
2096             WHERE scope_id = $1 AND process_id = $2
2097             LIMIT 1",
2098        )
2099        .bind(session_scope.id().as_str())
2100        .bind(process_id)
2101        .fetch_optional(&self.pool)
2102        .await
2103        .map_err(plugin_sqlx_error)?;
2104        Ok(exists.is_some())
2105    }
2106
2107    async fn handle_grants_for_process(
2108        &self,
2109        process_id: &str,
2110    ) -> Result<Vec<ProcessHandleGrant>, PluginError> {
2111        if load_process(&self.pool, process_id).await?.is_none() {
2112            return Err(PluginError::Session(format!(
2113                "unknown process `{process_id}`"
2114            )));
2115        }
2116        let rows = sqlx::query(
2117            "SELECT session_id, descriptor_json
2118             FROM lash_process_handle_grants
2119             WHERE process_id = $1
2120             ORDER BY session_id ASC, scope_id ASC",
2121        )
2122        .bind(process_id)
2123        .fetch_all(&self.pool)
2124        .await
2125        .map_err(plugin_sqlx_error)?;
2126        let mut grants = Vec::new();
2127        for row in rows {
2128            let descriptor_json: String = row.get(1);
2129            grants.push(ProcessHandleGrant {
2130                session_id: row.get(0),
2131                process_id: process_id.to_string(),
2132                descriptor: serde_json::from_str(&descriptor_json).map_err(process_decode_error)?,
2133            });
2134        }
2135        Ok(grants)
2136    }
2137
2138    async fn delete_session_process_state(
2139        &self,
2140        session_id: &str,
2141    ) -> Result<lash_core::ProcessSessionDeleteReport, PluginError> {
2142        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2143        let rows = sqlx::query(
2144            "SELECT g.process_id, p.record_json
2145             FROM lash_process_handle_grants g
2146             JOIN lash_processes p ON p.process_id = g.process_id
2147             WHERE g.session_id = $1
2148             ORDER BY g.process_id ASC",
2149        )
2150        .bind(session_id)
2151        .fetch_all(&mut *tx)
2152        .await
2153        .map_err(plugin_sqlx_error)?;
2154        let mut removed = Vec::new();
2155        for row in rows {
2156            let process_id: String = row.get(0);
2157            let record_json: String = row.get(1);
2158            let record: ProcessRecord =
2159                serde_json::from_str(&record_json).map_err(process_decode_error)?;
2160            removed.push((process_id, record));
2161        }
2162        // Wake acknowledgements are process-scoped consumed-event markers. Session
2163        // deletion removes materialized session-addressed deliveries through the
2164        // session store; clearing these rows would re-expose already-consumed wakes
2165        // to surviving grants or future host readers.
2166        let deleted_wake_count = 0;
2167        let revoked = sqlx::query("DELETE FROM lash_process_handle_grants WHERE session_id = $1")
2168            .bind(session_id)
2169            .execute(&mut *tx)
2170            .await
2171            .map_err(plugin_sqlx_error)?
2172            .rows_affected() as usize;
2173        let mut orphaned_process_ids = Vec::new();
2174        let mut preserved_process_ids = Vec::new();
2175        for (process_id, record) in removed {
2176            if record.is_terminal() {
2177                continue;
2178            }
2179            let remaining: i64 = sqlx::query_scalar(
2180                "SELECT COUNT(*) FROM lash_process_handle_grants WHERE process_id = $1",
2181            )
2182            .bind(&process_id)
2183            .fetch_one(&mut *tx)
2184            .await
2185            .map_err(plugin_sqlx_error)?;
2186            if remaining == 0 {
2187                orphaned_process_ids.push(process_id);
2188            } else {
2189                preserved_process_ids.push(process_id);
2190            }
2191        }
2192        tx.commit().await.map_err(plugin_sqlx_error)?;
2193        orphaned_process_ids.sort();
2194        orphaned_process_ids.dedup();
2195        preserved_process_ids.sort();
2196        preserved_process_ids.dedup();
2197        Ok(lash_core::ProcessSessionDeleteReport {
2198            session_id: session_id.to_string(),
2199            revoked_handle_count: revoked,
2200            deleted_wake_count,
2201            orphaned_process_ids,
2202            preserved_process_ids,
2203        })
2204    }
2205
2206    async fn append_event(
2207        &self,
2208        process_id: &str,
2209        request: ProcessEventAppendRequest,
2210    ) -> Result<ProcessEventAppendResult, PluginError> {
2211        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2212        let mut record = load_process_tx(&mut tx, process_id)
2213            .await?
2214            .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
2215        let replay_lookup =
2216            if let Some(replay_key) = request.replay.as_ref().map(|r| r.key.as_str()) {
2217                load_event_by_key_tx(&mut tx, process_id, replay_key).await?
2218            } else {
2219                None
2220            };
2221        let sequence: i64 = sqlx::query_scalar(
2222            "SELECT COALESCE(MAX(sequence), 0) + 1 FROM lash_process_events WHERE process_id = $1",
2223        )
2224        .bind(process_id)
2225        .fetch_one(&mut *tx)
2226        .await
2227        .map_err(plugin_sqlx_error)?;
2228        let occurred_at_ms = current_epoch_ms();
2229        let prepared = lash_core::runtime::prepare_process_event_append(
2230            &record,
2231            request,
2232            sequence as u64,
2233            replay_lookup,
2234            occurred_at_ms,
2235        )?;
2236        if prepared.replayed {
2237            let repaired = if let Some(status) = prepared.status_update.clone() {
2238                record.status = status;
2239                if record.status.is_terminal() {
2240                    record.wait = None;
2241                }
2242                record.updated_at_ms = prepared.occurred_at_ms;
2243                save_process_tx(&mut tx, &record).await?;
2244                true
2245            } else {
2246                false
2247            };
2248            tx.commit().await.map_err(plugin_sqlx_error)?;
2249            if repaired {
2250                self.notify.notify_waiters();
2251            }
2252            return Ok(ProcessEventAppendResult {
2253                event: prepared.event,
2254                wake_delivery: prepared.wake_delivery,
2255            });
2256        }
2257        let event = prepared.event;
2258        sqlx::query(
2259            "INSERT INTO lash_process_events (
2260                process_id, sequence, event_type, payload_hash, idempotency_key,
2261                occurred_at_ms, event_json
2262             )
2263             VALUES ($1, $2, $3, $4, $5, $6, $7)",
2264        )
2265        .bind(process_id)
2266        .bind(sequence)
2267        .bind(event.event_type.as_str())
2268        .bind(&prepared.payload_hash)
2269        .bind(event.invocation.replay_key())
2270        .bind(prepared.occurred_at_ms as i64)
2271        .bind(serde_json::to_string(&event).map_err(process_decode_error)?)
2272        .execute(&mut *tx)
2273        .await
2274        .map_err(plugin_sqlx_error)?;
2275        if let Some(status) = prepared.status_update.clone() {
2276            record.status = status;
2277            if record.status.is_terminal() {
2278                record.wait = None;
2279            }
2280        }
2281        record.updated_at_ms = prepared.occurred_at_ms;
2282        save_process_tx(&mut tx, &record).await?;
2283        tx.commit().await.map_err(plugin_sqlx_error)?;
2284        self.notify.notify_waiters();
2285        Ok(ProcessEventAppendResult {
2286            event,
2287            wake_delivery: prepared.wake_delivery,
2288        })
2289    }
2290
2291    async fn events_after(
2292        &self,
2293        process_id: &str,
2294        after_sequence: u64,
2295    ) -> Result<Vec<ProcessEvent>, PluginError> {
2296        if load_process(&self.pool, process_id).await?.is_none() {
2297            return Err(PluginError::Session(format!(
2298                "unknown process `{process_id}`"
2299            )));
2300        }
2301        let rows = sqlx::query(
2302            "SELECT event_json FROM lash_process_events
2303             WHERE process_id = $1 AND sequence > $2
2304             ORDER BY sequence ASC",
2305        )
2306        .bind(process_id)
2307        .bind(after_sequence as i64)
2308        .fetch_all(&self.pool)
2309        .await
2310        .map_err(plugin_sqlx_error)?;
2311        let mut events = Vec::new();
2312        for row in rows {
2313            let json: String = row.get(0);
2314            events.push(serde_json::from_str(&json).map_err(process_decode_error)?);
2315        }
2316        Ok(events)
2317    }
2318
2319    async fn count_events_through(
2320        &self,
2321        process_id: &str,
2322        event_type: &str,
2323        up_to_sequence: u64,
2324    ) -> Result<u64, PluginError> {
2325        if load_process(&self.pool, process_id).await?.is_none() {
2326            return Err(PluginError::Session(format!(
2327                "unknown process `{process_id}`"
2328            )));
2329        }
2330        let row = sqlx::query(
2331            "SELECT COUNT(*) FROM lash_process_events
2332             WHERE process_id = $1 AND event_type = $2 AND sequence <= $3",
2333        )
2334        .bind(process_id)
2335        .bind(event_type)
2336        .bind(up_to_sequence as i64)
2337        .fetch_one(&self.pool)
2338        .await
2339        .map_err(plugin_sqlx_error)?;
2340        let count: i64 = row.get(0);
2341        Ok(count as u64)
2342    }
2343
2344    async fn recent_events(
2345        &self,
2346        process_id: &str,
2347        limit: usize,
2348    ) -> Result<Vec<ProcessEvent>, PluginError> {
2349        if load_process(&self.pool, process_id).await?.is_none() {
2350            return Err(PluginError::Session(format!(
2351                "unknown process `{process_id}`"
2352            )));
2353        }
2354        let rows = sqlx::query(
2355            "SELECT event_json FROM lash_process_events
2356             WHERE process_id = $1
2357             ORDER BY sequence DESC
2358             LIMIT $2",
2359        )
2360        .bind(process_id)
2361        .bind(limit as i64)
2362        .fetch_all(&self.pool)
2363        .await
2364        .map_err(plugin_sqlx_error)?;
2365        let mut events = Vec::new();
2366        for row in rows {
2367            let json: String = row.get(0);
2368            events.push(serde_json::from_str(&json).map_err(process_decode_error)?);
2369        }
2370        events.reverse();
2371        Ok(events)
2372    }
2373
2374    async fn wake_events_after(
2375        &self,
2376        process_id: &str,
2377        after_sequence: u64,
2378    ) -> Result<Vec<ProcessEvent>, PluginError> {
2379        let rows = sqlx::query("SELECT sequence FROM lash_process_wake_acks WHERE process_id = $1")
2380            .bind(process_id)
2381            .fetch_all(&self.pool)
2382            .await
2383            .map_err(plugin_sqlx_error)?;
2384        let acked = rows
2385            .into_iter()
2386            .map(|row| row.get::<i64, _>(0) as u64)
2387            .collect::<HashSet<_>>();
2388        Ok(self
2389            .events_after(process_id, after_sequence)
2390            .await?
2391            .into_iter()
2392            .filter(|event| event.semantics.wake.is_some() && !acked.contains(&event.sequence))
2393            .collect())
2394    }
2395
2396    async fn wait_event_after(
2397        &self,
2398        process_id: &str,
2399        event_type: &str,
2400        after_sequence: u64,
2401    ) -> Result<ProcessEvent, PluginError> {
2402        loop {
2403            if let Some(event) = self
2404                .events_after(process_id, after_sequence)
2405                .await?
2406                .into_iter()
2407                .find(|event| event.event_type == event_type)
2408            {
2409                return Ok(event);
2410            }
2411            tokio::select! {
2412                _ = self.notify.notified() => {}
2413                _ = tokio::time::sleep(Duration::from_millis(50)) => {}
2414            }
2415        }
2416    }
2417
2418    async fn await_process(&self, process_id: &str) -> Result<ProcessAwaitOutput, PluginError> {
2419        loop {
2420            let record = load_process(&self.pool, process_id)
2421                .await?
2422                .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
2423            if let Some(await_output) = record.status.await_output() {
2424                return Ok(await_output.clone());
2425            }
2426            tokio::select! {
2427                _ = self.notify.notified() => {}
2428                _ = tokio::time::sleep(Duration::from_millis(50)) => {}
2429            }
2430        }
2431    }
2432
2433    async fn complete_process(
2434        &self,
2435        process_id: &str,
2436        await_output: ProcessAwaitOutput,
2437    ) -> Result<ProcessRecord, PluginError> {
2438        let event_type = match await_output.terminal_state() {
2439            lash_core::ProcessTerminalState::Completed => "process.completed",
2440            lash_core::ProcessTerminalState::Failed => "process.failed",
2441            lash_core::ProcessTerminalState::Cancelled => "process.cancelled",
2442        };
2443        self.append_event(
2444            process_id,
2445            ProcessEventAppendRequest::new(
2446                event_type,
2447                serde_json::json!({ "await_output": await_output }),
2448            )
2449            .with_replay_key(format!("process:{process_id}:terminal:{event_type}")),
2450        )
2451        .await?;
2452        load_process(&self.pool, process_id).await?.ok_or_else(|| {
2453            PluginError::Session(format!(
2454                "unknown process `{process_id}` after terminal event"
2455            ))
2456        })
2457    }
2458
2459    async fn set_process_wait(
2460        &self,
2461        process_id: &str,
2462        wait: lash_core::WaitState,
2463    ) -> Result<ProcessRecord, PluginError> {
2464        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2465        let mut record = load_process_tx(&mut tx, process_id)
2466            .await?
2467            .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
2468        if record.is_terminal() {
2469            return Err(PluginError::Session(format!(
2470                "terminal process `{process_id}` cannot enter a wait state"
2471            )));
2472        }
2473        record.wait = Some(wait);
2474        record.updated_at_ms = current_epoch_ms();
2475        save_process_tx(&mut tx, &record).await?;
2476        tx.commit().await.map_err(plugin_sqlx_error)?;
2477        self.notify.notify_waiters();
2478        Ok(record)
2479    }
2480
2481    async fn clear_process_wait(&self, process_id: &str) -> Result<ProcessRecord, PluginError> {
2482        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2483        let mut record = load_process_tx(&mut tx, process_id)
2484            .await?
2485            .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
2486        record.wait = None;
2487        record.updated_at_ms = current_epoch_ms();
2488        save_process_tx(&mut tx, &record).await?;
2489        tx.commit().await.map_err(plugin_sqlx_error)?;
2490        self.notify.notify_waiters();
2491        Ok(record)
2492    }
2493
2494    async fn get_process(&self, process_id: &str) -> Option<ProcessRecord> {
2495        load_process(&self.pool, process_id).await.ok().flatten()
2496    }
2497
2498    async fn list_processes(
2499        &self,
2500        filter: &lash_core::ProcessListFilter,
2501    ) -> Result<Vec<ProcessRecord>, PluginError> {
2502        let rows = sqlx::query(
2503            "SELECT record_json FROM lash_processes
2504             ORDER BY process_id ASC",
2505        )
2506        .fetch_all(&self.pool)
2507        .await
2508        .map_err(plugin_sqlx_error)?;
2509        let mut records = Vec::new();
2510        for row in rows {
2511            let json: String = row.get(0);
2512            let record: ProcessRecord =
2513                serde_json::from_str(&json).map_err(process_decode_error)?;
2514            if filter.matches_record(&record) {
2515                records.push(record);
2516            }
2517        }
2518        Ok(records)
2519    }
2520
2521    async fn ack_wake(&self, process_id: &str, sequence: u64) -> Result<(), PluginError> {
2522        if load_process(&self.pool, process_id).await?.is_none() {
2523            return Err(PluginError::Session(format!(
2524                "unknown process `{process_id}`"
2525            )));
2526        }
2527        sqlx::query(
2528            "INSERT INTO lash_process_wake_acks (process_id, sequence)
2529             VALUES ($1, $2)
2530             ON CONFLICT DO NOTHING",
2531        )
2532        .bind(process_id)
2533        .bind(sequence as i64)
2534        .execute(&self.pool)
2535        .await
2536        .map_err(plugin_sqlx_error)?;
2537        Ok(())
2538    }
2539
2540    async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, PluginError> {
2541        let rows = sqlx::query(
2542            "SELECT record_json FROM lash_processes
2543             WHERE status = 'running'
2544             ORDER BY process_id ASC",
2545        )
2546        .fetch_all(&self.pool)
2547        .await
2548        .map_err(plugin_sqlx_error)?;
2549        let mut records = Vec::new();
2550        for row in rows {
2551            let json: String = row.get(0);
2552            records.push(serde_json::from_str(&json).map_err(process_decode_error)?);
2553        }
2554        Ok(records)
2555    }
2556
2557    async fn claim_process_lease(
2558        &self,
2559        process_id: &str,
2560        owner_id: &str,
2561        lease_ttl_ms: u64,
2562    ) -> Result<ProcessLease, PluginError> {
2563        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2564        if load_process_tx(&mut tx, process_id).await?.is_none() {
2565            return Err(PluginError::Session(format!(
2566                "unknown process `{process_id}`"
2567            )));
2568        }
2569        let now = current_epoch_ms();
2570        let current = load_process_lease_tx(&mut tx, process_id).await?;
2571        if let Some(current) = current.as_ref()
2572            && current.expires_at_epoch_ms > now
2573            && current.owner_id != owner_id
2574        {
2575            return Err(process_lease_conflict(process_id, current));
2576        }
2577        let existing_fence: Option<i64> = sqlx::query_scalar(
2578            "SELECT lease_fencing_token FROM lash_process_leases WHERE process_id = $1 FOR UPDATE",
2579        )
2580        .bind(process_id)
2581        .fetch_optional(&mut *tx)
2582        .await
2583        .map_err(plugin_sqlx_error)?;
2584        let fencing_token = existing_fence.unwrap_or(0) as u64 + 1;
2585        let lease = ProcessLease {
2586            schema_version: PROCESS_LEASE_SCHEMA_VERSION,
2587            process_id: process_id.to_string(),
2588            owner_id: owner_id.to_string(),
2589            lease_token: format!(
2590                "{:x}",
2591                Sha256::digest(format!("{process_id}:{owner_id}:{now}:{fencing_token}").as_bytes())
2592            ),
2593            fencing_token,
2594            claimed_at_epoch_ms: now,
2595            expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
2596        };
2597        sqlx::query(
2598            "INSERT INTO lash_process_leases (
2599                process_id, lease_owner_id, lease_token, lease_fencing_token,
2600                lease_claimed_at_ms, lease_expires_at_ms
2601             )
2602             VALUES ($1, $2, $3, $4, $5, $6)
2603             ON CONFLICT (process_id) DO UPDATE SET
2604                lease_owner_id = EXCLUDED.lease_owner_id,
2605                lease_token = EXCLUDED.lease_token,
2606                lease_fencing_token = EXCLUDED.lease_fencing_token,
2607                lease_claimed_at_ms = EXCLUDED.lease_claimed_at_ms,
2608                lease_expires_at_ms = EXCLUDED.lease_expires_at_ms",
2609        )
2610        .bind(&lease.process_id)
2611        .bind(&lease.owner_id)
2612        .bind(&lease.lease_token)
2613        .bind(lease.fencing_token as i64)
2614        .bind(lease.claimed_at_epoch_ms as i64)
2615        .bind(lease.expires_at_epoch_ms as i64)
2616        .execute(&mut *tx)
2617        .await
2618        .map_err(plugin_sqlx_error)?;
2619        tx.commit().await.map_err(plugin_sqlx_error)?;
2620        Ok(lease)
2621    }
2622
2623    async fn renew_process_lease(
2624        &self,
2625        lease: &ProcessLease,
2626        lease_ttl_ms: u64,
2627    ) -> Result<ProcessLease, PluginError> {
2628        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2629        let now = current_epoch_ms();
2630        let current = load_process_lease_tx(&mut tx, &lease.process_id).await?;
2631        if !guard_lease(current.as_ref(), &lease.lease_token, now) {
2632            return Err(process_lease_expired(&lease.process_id));
2633        }
2634        let renewed = ProcessLease {
2635            expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
2636            ..lease.clone()
2637        };
2638        sqlx::query(
2639            "UPDATE lash_process_leases
2640             SET lease_expires_at_ms = $2
2641             WHERE process_id = $1 AND lease_token = $3",
2642        )
2643        .bind(&renewed.process_id)
2644        .bind(renewed.expires_at_epoch_ms as i64)
2645        .bind(&renewed.lease_token)
2646        .execute(&mut *tx)
2647        .await
2648        .map_err(plugin_sqlx_error)?;
2649        tx.commit().await.map_err(plugin_sqlx_error)?;
2650        Ok(renewed)
2651    }
2652
2653    async fn complete_process_lease(
2654        &self,
2655        completion: &ProcessLeaseCompletion,
2656    ) -> Result<(), PluginError> {
2657        sqlx::query(
2658            "UPDATE lash_process_leases
2659             SET lease_owner_id = NULL,
2660                 lease_token = NULL,
2661                 lease_claimed_at_ms = 0,
2662                 lease_expires_at_ms = 0
2663             WHERE process_id = $1 AND lease_token = $2",
2664        )
2665        .bind(&completion.process_id)
2666        .bind(&completion.lease_token)
2667        .execute(&self.pool)
2668        .await
2669        .map_err(plugin_sqlx_error)?;
2670        Ok(())
2671    }
2672}
2673
2674#[async_trait::async_trait]
2675impl HostEventStore for PostgresHostEventStore {
2676    fn durability_tier(&self) -> DurabilityTier {
2677        DurabilityTier::Durable
2678    }
2679
2680    async fn register_subscription(
2681        &self,
2682        draft: TriggerSubscriptionDraft,
2683    ) -> Result<TriggerSubscriptionRecord, PluginError> {
2684        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2685        let seq: i64 = sqlx::query_scalar("SELECT nextval('lash_host_event_subscription_seq')")
2686            .fetch_one(&mut *tx)
2687            .await
2688            .map_err(plugin_sqlx_error)?;
2689        let handle = format!("trigger:{seq}");
2690        let subscription_id = format!("subscription:{seq}");
2691        let now = current_epoch_ms();
2692        let record = TriggerSubscriptionRecord {
2693            subscription_id: subscription_id.clone(),
2694            registrant: draft.registrant,
2695            env_ref: draft.env_ref,
2696            wake_target: draft.wake_target,
2697            handle,
2698            name: draft.name,
2699            source_type: draft.source_type,
2700            source_key: draft.source_key,
2701            source: draft.source,
2702            event_ty: draft.event_ty,
2703            module_ref: draft.module_ref,
2704            required_surface_ref: draft.required_surface_ref,
2705            process_ref: draft.process_ref,
2706            process_name: draft.process_name,
2707            input_template: draft.input_template,
2708            enabled: true,
2709            created_at_ms: now,
2710            updated_at_ms: now,
2711        };
2712        sqlx::query(
2713            "INSERT INTO lash_host_event_trigger_subscriptions (
2714                subscription_id, registrant_scope_id, handle, source_type, source_key,
2715                enabled, created_at_ms, updated_at_ms, record_json
2716             )
2717             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
2718        )
2719        .bind(&record.subscription_id)
2720        .bind(record.registrant_scope_id())
2721        .bind(&record.handle)
2722        .bind(&record.source_type)
2723        .bind(&record.source_key)
2724        .bind(record.enabled)
2725        .bind(record.created_at_ms as i64)
2726        .bind(record.updated_at_ms as i64)
2727        .bind(serde_json::to_string(&record).map_err(process_decode_error)?)
2728        .execute(&mut *tx)
2729        .await
2730        .map_err(plugin_sqlx_error)?;
2731        tx.commit().await.map_err(plugin_sqlx_error)?;
2732        Ok(record)
2733    }
2734
2735    async fn list_subscriptions(
2736        &self,
2737        filter: TriggerSubscriptionFilter,
2738    ) -> Result<Vec<TriggerSubscriptionRecord>, PluginError> {
2739        let rows = sqlx::query(
2740            "SELECT record_json FROM lash_host_event_trigger_subscriptions
2741             ORDER BY registrant_scope_id ASC, handle ASC",
2742        )
2743        .fetch_all(&self.pool)
2744        .await
2745        .map_err(plugin_sqlx_error)?;
2746        let mut records = Vec::new();
2747        for row in rows {
2748            let json: String = row.get(0);
2749            let record: TriggerSubscriptionRecord =
2750                serde_json::from_str(&json).map_err(process_decode_error)?;
2751            if filter.matches(&record) {
2752                records.push(record);
2753            }
2754        }
2755        Ok(records)
2756    }
2757
2758    async fn cancel_subscription(
2759        &self,
2760        session_id: &str,
2761        handle: &str,
2762    ) -> Result<bool, PluginError> {
2763        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2764        let rows = sqlx::query(
2765            "SELECT record_json FROM lash_host_event_trigger_subscriptions
2766             WHERE handle = $1
2767             ORDER BY registrant_scope_id ASC
2768             FOR UPDATE",
2769        )
2770        .bind(handle)
2771        .fetch_all(&mut *tx)
2772        .await
2773        .map_err(plugin_sqlx_error)?;
2774        let mut json = None;
2775        for row in rows {
2776            let candidate: String = row.get(0);
2777            let record: TriggerSubscriptionRecord =
2778                serde_json::from_str(&candidate).map_err(process_decode_error)?;
2779            if record.registrant_session_id() == Some(session_id) {
2780                json = Some(candidate);
2781                break;
2782            }
2783        }
2784        let Some(json) = json else {
2785            tx.commit().await.map_err(plugin_sqlx_error)?;
2786            return Ok(false);
2787        };
2788        let mut record: TriggerSubscriptionRecord =
2789            serde_json::from_str(&json).map_err(process_decode_error)?;
2790        let changed = record.enabled;
2791        record.enabled = false;
2792        record.updated_at_ms = current_epoch_ms();
2793        sqlx::query(
2794            "UPDATE lash_host_event_trigger_subscriptions
2795             SET enabled = $3, updated_at_ms = $4, record_json = $5
2796             WHERE registrant_scope_id = $1 AND handle = $2",
2797        )
2798        .bind(record.registrant_scope_id())
2799        .bind(handle)
2800        .bind(record.enabled)
2801        .bind(record.updated_at_ms as i64)
2802        .bind(serde_json::to_string(&record).map_err(process_decode_error)?)
2803        .execute(&mut *tx)
2804        .await
2805        .map_err(plugin_sqlx_error)?;
2806        tx.commit().await.map_err(plugin_sqlx_error)?;
2807        Ok(changed)
2808    }
2809
2810    async fn delete_session_subscriptions(&self, session_id: &str) -> Result<usize, PluginError> {
2811        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2812        let rows = sqlx::query(
2813            "SELECT subscription_id, record_json
2814             FROM lash_host_event_trigger_subscriptions
2815             ORDER BY registrant_scope_id ASC, handle ASC
2816             FOR UPDATE",
2817        )
2818        .fetch_all(&mut *tx)
2819        .await
2820        .map_err(plugin_sqlx_error)?;
2821        let mut deleted = 0usize;
2822        for row in rows {
2823            let subscription_id: String = row.get(0);
2824            let json: String = row.get(1);
2825            let record: TriggerSubscriptionRecord =
2826                serde_json::from_str(&json).map_err(process_decode_error)?;
2827            if record.registrant_session_id() != Some(session_id) {
2828                continue;
2829            }
2830            sqlx::query(
2831                "DELETE FROM lash_host_event_trigger_subscriptions WHERE subscription_id = $1",
2832            )
2833            .bind(&subscription_id)
2834            .execute(&mut *tx)
2835            .await
2836            .map_err(plugin_sqlx_error)?;
2837            deleted = deleted.saturating_add(1);
2838        }
2839        tx.commit().await.map_err(plugin_sqlx_error)?;
2840        Ok(deleted)
2841    }
2842
2843    async fn record_occurrence(
2844        &self,
2845        request: HostEventOccurrenceRequest,
2846    ) -> Result<HostEventOccurrenceRecord, PluginError> {
2847        lash_core::validate_host_event_occurrence_request(&request)?;
2848        let request_hash = lash_core::host_event_occurrence_request_hash(&request)?;
2849        let occurrence_id = lash_core::deterministic_occurrence_id(&request)?;
2850        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2851        let existing = sqlx::query(
2852            "SELECT request_hash, record_json
2853             FROM lash_host_event_occurrences
2854             WHERE idempotency_key = $1
2855             FOR UPDATE",
2856        )
2857        .bind(&request.idempotency_key)
2858        .fetch_optional(&mut *tx)
2859        .await
2860        .map_err(plugin_sqlx_error)?;
2861        if let Some(row) = existing {
2862            let existing_hash: String = row.get(0);
2863            let existing_json: String = row.get(1);
2864            if existing_hash != request_hash {
2865                return Err(PluginError::Session(format!(
2866                    "host event occurrence idempotency conflict for `{}`",
2867                    request.idempotency_key
2868                )));
2869            }
2870            let record = serde_json::from_str(&existing_json).map_err(process_decode_error)?;
2871            tx.commit().await.map_err(plugin_sqlx_error)?;
2872            return Ok(record);
2873        }
2874        let record = HostEventOccurrenceRecord {
2875            occurrence_id: occurrence_id.clone(),
2876            source_type: request.source_type,
2877            source_key: request.source_key,
2878            payload: request.payload,
2879            idempotency_key: request.idempotency_key.clone(),
2880            source: request.source,
2881            occurred_at_ms: current_epoch_ms(),
2882        };
2883        sqlx::query(
2884            "INSERT INTO lash_host_event_occurrences (
2885                occurrence_id, idempotency_key, request_hash, source_type, source_key,
2886                occurred_at_ms, record_json
2887             )
2888             VALUES ($1, $2, $3, $4, $5, $6, $7)",
2889        )
2890        .bind(&record.occurrence_id)
2891        .bind(&record.idempotency_key)
2892        .bind(&request_hash)
2893        .bind(&record.source_type)
2894        .bind(&record.source_key)
2895        .bind(record.occurred_at_ms as i64)
2896        .bind(serde_json::to_string(&record).map_err(process_decode_error)?)
2897        .execute(&mut *tx)
2898        .await
2899        .map_err(plugin_sqlx_error)?;
2900        tx.commit().await.map_err(plugin_sqlx_error)?;
2901        Ok(record)
2902    }
2903
2904    async fn reserve_matching_deliveries(
2905        &self,
2906        occurrence_id: &str,
2907    ) -> Result<Vec<TriggerDeliveryReservation>, PluginError> {
2908        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2909        let occurrence_json: Option<String> = sqlx::query_scalar(
2910            "SELECT record_json FROM lash_host_event_occurrences WHERE occurrence_id = $1",
2911        )
2912        .bind(occurrence_id)
2913        .fetch_optional(&mut *tx)
2914        .await
2915        .map_err(plugin_sqlx_error)?;
2916        let Some(occurrence_json) = occurrence_json else {
2917            return Err(PluginError::Session(format!(
2918                "unknown host event occurrence `{occurrence_id}`"
2919            )));
2920        };
2921        let occurrence: HostEventOccurrenceRecord =
2922            serde_json::from_str(&occurrence_json).map_err(process_decode_error)?;
2923        let rows = sqlx::query(
2924            "SELECT record_json FROM lash_host_event_trigger_subscriptions
2925             WHERE enabled = TRUE AND source_type = $1 AND source_key = $2
2926             ORDER BY registrant_scope_id ASC, handle ASC",
2927        )
2928        .bind(&occurrence.source_type)
2929        .bind(&occurrence.source_key)
2930        .fetch_all(&mut *tx)
2931        .await
2932        .map_err(plugin_sqlx_error)?;
2933        let mut deliveries = Vec::new();
2934        for row in rows {
2935            let json: String = row.get(0);
2936            let subscription: TriggerSubscriptionRecord =
2937                serde_json::from_str(&json).map_err(process_decode_error)?;
2938            let process_id = lash_core::deterministic_delivery_process_id(
2939                &occurrence.occurrence_id,
2940                &subscription.subscription_id,
2941            )?;
2942            let inserted = sqlx::query(
2943                "INSERT INTO lash_host_event_deliveries (
2944                    occurrence_id, subscription_id, process_id, created_at_ms
2945                 )
2946                 VALUES ($1, $2, $3, $4)
2947                 ON CONFLICT DO NOTHING",
2948            )
2949            .bind(&occurrence.occurrence_id)
2950            .bind(&subscription.subscription_id)
2951            .bind(&process_id)
2952            .bind(current_epoch_ms() as i64)
2953            .execute(&mut *tx)
2954            .await
2955            .map_err(plugin_sqlx_error)?
2956            .rows_affected();
2957            if inserted == 0 {
2958                continue;
2959            }
2960            deliveries.push(TriggerDeliveryReservation {
2961                occurrence: occurrence.clone(),
2962                subscription,
2963                process_id,
2964            });
2965        }
2966        tx.commit().await.map_err(plugin_sqlx_error)?;
2967        Ok(deliveries)
2968    }
2969}
2970
2971#[async_trait::async_trait]
2972impl lashlang::LashlangArtifactStore for PostgresLashlangArtifactStore {
2973    fn durability_tier(&self) -> DurabilityTier {
2974        DurabilityTier::Durable
2975    }
2976
2977    async fn put_module_artifact(
2978        &self,
2979        artifact: &lashlang::ModuleArtifact,
2980    ) -> Result<(), lashlang::ArtifactStoreError> {
2981        let bytes = artifact
2982            .to_store_bytes()
2983            .map_err(lashlang::ArtifactStoreError::from)?;
2984        sqlx::query(
2985            "INSERT INTO lash_lashlang_artifacts (module_ref, artifact_bytes)
2986             VALUES ($1, $2)
2987             ON CONFLICT (module_ref) DO UPDATE SET artifact_bytes = EXCLUDED.artifact_bytes",
2988        )
2989        .bind(artifact.module_ref.as_str())
2990        .bind(bytes)
2991        .execute(&self.pool)
2992        .await
2993        .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?;
2994        Ok(())
2995    }
2996
2997    async fn get_module_artifact(
2998        &self,
2999        module_ref: &lashlang::ModuleRef,
3000    ) -> Result<Option<Arc<lashlang::ModuleArtifact>>, lashlang::ArtifactStoreError> {
3001        let bytes: Option<Vec<u8>> = sqlx::query_scalar(
3002            "SELECT artifact_bytes FROM lash_lashlang_artifacts WHERE module_ref = $1",
3003        )
3004        .bind(module_ref.as_str())
3005        .fetch_optional(&self.pool)
3006        .await
3007        .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?;
3008        bytes
3009            .map(|bytes| {
3010                lashlang::ModuleArtifact::from_store_bytes(&bytes)
3011                    .map(Arc::new)
3012                    .map_err(lashlang::ArtifactStoreError::from)
3013            })
3014            .transpose()
3015    }
3016
3017    async fn put_artifact_bytes(
3018        &self,
3019        artifact_ref: &str,
3020        _descriptor: &str,
3021        bytes: &[u8],
3022    ) -> Result<(), lashlang::ArtifactStoreError> {
3023        sqlx::query(
3024            "INSERT INTO lash_lashlang_artifacts (module_ref, artifact_bytes)
3025             VALUES ($1, $2)
3026             ON CONFLICT (module_ref) DO UPDATE SET artifact_bytes = EXCLUDED.artifact_bytes",
3027        )
3028        .bind(artifact_ref)
3029        .bind(bytes)
3030        .execute(&self.pool)
3031        .await
3032        .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?;
3033        Ok(())
3034    }
3035
3036    async fn get_artifact_bytes(
3037        &self,
3038        artifact_ref: &str,
3039    ) -> Result<Option<Vec<u8>>, lashlang::ArtifactStoreError> {
3040        sqlx::query_scalar(
3041            "SELECT artifact_bytes FROM lash_lashlang_artifacts WHERE module_ref = $1",
3042        )
3043        .bind(artifact_ref)
3044        .fetch_optional(&self.pool)
3045        .await
3046        .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))
3047    }
3048}