Skip to main content

lash_postgres_store/
lib.rs

1//! PostgreSQL durable storage for Lash.
2//!
3//! One [`PostgresStorage`] owns a shared [`sqlx::PgPool`] and creates durable
4//! implementations for the runtime session store, process registry, host-event
5//! store, Lashlang artifact store, and attachment manifest.
6
7use std::collections::HashSet;
8use std::sync::{Arc, OnceLock};
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11use lash_core::runtime::{
12    ProcessHandleGrantEntry, QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim,
13    QueuedWorkClaimBoundary, QueuedWorkCompletion, QueuedWorkItem,
14};
15use lash_core::store::{
16    GraphCommitDelta, HydratedSessionCheckpoint, PersistedSessionRead, RuntimeCommit,
17    RuntimeCommitResult, SessionCheckpoint, SessionHeadMeta,
18};
19use lash_core::{
20    AttachmentId, AttachmentIntent, AttachmentManifest, AttachmentManifestEntry, BlobRef,
21    DeliveryPolicy, DurabilityTier, GcReport, MergeKey, ProcessAwaitOutput, ProcessEvent,
22    ProcessEventAppendRequest, ProcessEventAppendResult, ProcessExternalRef,
23    ProcessHandleDescriptor, ProcessHandleGrant, ProcessLease, ProcessLeaseCompletion,
24    ProcessRecord, ProcessRegistration, ProcessRegistry, ProcessScope, RuntimePersistence,
25    SessionMeta, SessionNodeRecord, SessionReadScope, SessionStoreCreateRequest,
26    SessionStoreFactory, SlotPolicy, StoreError, TokenLedgerEntry, VacuumReport,
27};
28use lash_core::{
29    HostEventOccurrenceRecord, HostEventOccurrenceRequest, HostEventStore, PluginError,
30    TriggerDeliveryReservation, TriggerSubscriptionDraft, TriggerSubscriptionFilter,
31    TriggerSubscriptionRecord,
32};
33use sha2::{Digest, Sha256};
34use sqlx::postgres::{PgPool, PgPoolOptions, PgRow};
35use sqlx::{Executor, Row};
36
37const SCHEMA_COMPONENT: &str = "lash-postgres-store";
38const SCHEMA_VERSION: i32 = 1;
39const PROCESS_LEASE_SCHEMA_VERSION: u32 = lash_core::PROCESS_LEASE_SCHEMA_VERSION;
40
41#[derive(Clone)]
42pub struct PostgresStorage {
43    pool: PgPool,
44}
45
46#[derive(Clone)]
47pub struct PostgresSessionStoreFactory {
48    pool: PgPool,
49}
50
51#[derive(Clone)]
52pub struct PostgresSessionStore {
53    pool: PgPool,
54    /// Explicit session binding for handles created via the factory.
55    session_id: Option<String>,
56    /// In-memory bind-on-first-commit for an *unbound* handle. A session-store
57    /// handle commits to exactly one session; an unbound handle latches the first
58    /// session it commits and rejects others (Postgres is multi-session per
59    /// database, so this can't be inferred from a singleton head row the way the
60    /// single-file SQLite store does). Shared across clones via `Arc`.
61    bound_session: Arc<OnceLock<String>>,
62}
63
64#[derive(Clone)]
65pub struct PostgresProcessRegistry {
66    pool: PgPool,
67    notify: Arc<tokio::sync::Notify>,
68}
69
70#[derive(Clone)]
71pub struct PostgresHostEventStore {
72    pool: PgPool,
73}
74
75#[derive(Clone)]
76pub struct PostgresLashlangArtifactStore {
77    pool: PgPool,
78}
79
80/// Connection-pool and per-connection timeout knobs for [`PostgresStorage`].
81///
82/// Session commits use **optimistic CAS** on the head (`UPDATE … WHERE
83/// head_revision = expected`), not a held `SELECT … FOR UPDATE`, so concurrent
84/// writers never pin a pool connection while blocked on a lock. `lock_timeout` is
85/// defense in depth: it caps how long the single CAS write may wait on the head
86/// row's lock before erroring (surfaced as a retryable conflict), so a pathological
87/// burst can never starve the pool.
88#[derive(Clone, Debug)]
89pub struct PostgresStoreConfig {
90    /// Maximum pooled connections. Default 16.
91    pub max_connections: u32,
92    /// Minimum idle connections kept warm. Default 0.
93    pub min_connections: u32,
94    /// How long `acquire` waits for a free connection before erroring. Default 30s.
95    pub acquire_timeout: Duration,
96    /// Close a connection after this idle period. Default 10m.
97    pub idle_timeout: Option<Duration>,
98    /// Recycle a connection after this lifetime. Default 30m.
99    pub max_lifetime: Option<Duration>,
100    /// Postgres `lock_timeout` applied to every connection. Default 10s.
101    pub lock_timeout: Option<Duration>,
102    /// Postgres `statement_timeout` applied to every connection. Default 30s — a
103    /// backstop so a wedged query can never hold a connection indefinitely.
104    pub statement_timeout: Option<Duration>,
105}
106
107impl Default for PostgresStoreConfig {
108    fn default() -> Self {
109        Self {
110            max_connections: 16,
111            min_connections: 0,
112            acquire_timeout: Duration::from_secs(30),
113            idle_timeout: Some(Duration::from_secs(600)),
114            max_lifetime: Some(Duration::from_secs(1800)),
115            lock_timeout: Some(Duration::from_secs(10)),
116            statement_timeout: Some(Duration::from_secs(30)),
117        }
118    }
119}
120
121impl PostgresStorage {
122    /// Connect with [`PostgresStoreConfig::default`] pool/timeout settings.
123    pub async fn connect(database_url: &str) -> Result<Self, StoreError> {
124        Self::connect_with(database_url, PostgresStoreConfig::default()).await
125    }
126
127    /// Connect with explicit pool sizing and per-connection timeouts.
128    pub async fn connect_with(
129        database_url: &str,
130        config: PostgresStoreConfig,
131    ) -> Result<Self, StoreError> {
132        let lock_ms = config.lock_timeout.map(|d| d.as_millis().max(1) as u64);
133        let statement_ms = config
134            .statement_timeout
135            .map(|d| d.as_millis().max(1) as u64);
136        let mut options = PgPoolOptions::new()
137            .max_connections(config.max_connections)
138            .min_connections(config.min_connections)
139            .acquire_timeout(config.acquire_timeout);
140        if let Some(timeout) = config.idle_timeout {
141            options = options.idle_timeout(timeout);
142        }
143        if let Some(timeout) = config.max_lifetime {
144            options = options.max_lifetime(timeout);
145        }
146        let pool = options
147            .after_connect(move |conn, _meta| {
148                Box::pin(async move {
149                    if let Some(ms) = lock_ms {
150                        conn.execute(format!("SET lock_timeout = {ms}").as_str())
151                            .await?;
152                    }
153                    if let Some(ms) = statement_ms {
154                        conn.execute(format!("SET statement_timeout = {ms}").as_str())
155                            .await?;
156                    }
157                    Ok(())
158                })
159            })
160            .connect(database_url)
161            .await
162            .map_err(store_sqlx_error)?;
163        ensure_schema(&pool).await?;
164        Ok(Self { pool })
165    }
166
167    pub fn from_pool(pool: PgPool) -> Self {
168        Self { pool }
169    }
170
171    pub fn pool(&self) -> &PgPool {
172        &self.pool
173    }
174
175    pub fn session_store_factory(&self) -> PostgresSessionStoreFactory {
176        PostgresSessionStoreFactory {
177            pool: self.pool.clone(),
178        }
179    }
180
181    pub fn session_store(&self, session_id: impl Into<String>) -> PostgresSessionStore {
182        PostgresSessionStore {
183            pool: self.pool.clone(),
184            session_id: Some(session_id.into()),
185            bound_session: Arc::new(OnceLock::new()),
186        }
187    }
188
189    pub fn unbound_session_store(&self) -> PostgresSessionStore {
190        PostgresSessionStore {
191            pool: self.pool.clone(),
192            session_id: None,
193            bound_session: Arc::new(OnceLock::new()),
194        }
195    }
196
197    pub fn process_registry(&self) -> PostgresProcessRegistry {
198        PostgresProcessRegistry {
199            pool: self.pool.clone(),
200            notify: Arc::new(tokio::sync::Notify::new()),
201        }
202    }
203
204    pub fn host_event_store(&self) -> PostgresHostEventStore {
205        PostgresHostEventStore {
206            pool: self.pool.clone(),
207        }
208    }
209
210    pub fn lashlang_artifact_store(&self) -> PostgresLashlangArtifactStore {
211        PostgresLashlangArtifactStore {
212            pool: self.pool.clone(),
213        }
214    }
215}
216
217impl PostgresSessionStoreFactory {
218    pub fn new(storage: &PostgresStorage) -> Self {
219        storage.session_store_factory()
220    }
221}
222
223impl PostgresSessionStore {
224    pub fn unbound(storage: &PostgresStorage) -> Self {
225        storage.unbound_session_store()
226    }
227
228    async fn selected_session_id(&self) -> Result<Option<String>, StoreError> {
229        if let Some(session_id) = &self.session_id {
230            return Ok(Some(session_id.clone()));
231        }
232        sqlx::query_scalar("SELECT session_id FROM lash_sessions ORDER BY session_id ASC LIMIT 1")
233            .fetch_optional(&self.pool)
234            .await
235            .map_err(store_sqlx_error)
236    }
237}
238
239async fn ensure_schema(pool: &PgPool) -> Result<(), StoreError> {
240    let mut tx = pool.begin().await.map_err(store_sqlx_error)?;
241    tx.execute("SELECT pg_advisory_xact_lock(715421, 907001)")
242        .await
243        .map_err(store_sqlx_error)?;
244    tx.execute(
245        r#"
246        CREATE TABLE IF NOT EXISTS lash_schema_versions (
247            component TEXT PRIMARY KEY,
248            version INTEGER NOT NULL
249        );
250
251        CREATE TABLE IF NOT EXISTS lash_blobs (
252            hash TEXT PRIMARY KEY,
253            content BYTEA NOT NULL
254        );
255
256        CREATE TABLE IF NOT EXISTS lash_sessions (
257            session_id TEXT PRIMARY KEY,
258            head_revision BIGINT NOT NULL DEFAULT 0,
259            head_json TEXT NOT NULL,
260            checkpoint_ref TEXT
261        );
262
263        CREATE TABLE IF NOT EXISTS lash_graph_nodes (
264            session_id TEXT NOT NULL,
265            seq BIGSERIAL,
266            node_id TEXT NOT NULL,
267            node_json TEXT NOT NULL,
268            tombstoned BOOLEAN NOT NULL DEFAULT FALSE,
269            PRIMARY KEY (session_id, node_id)
270        );
271        CREATE INDEX IF NOT EXISTS idx_lash_graph_nodes_seq
272            ON lash_graph_nodes(session_id, seq);
273
274        CREATE TABLE IF NOT EXISTS lash_usage_deltas (
275            seq BIGSERIAL PRIMARY KEY,
276            session_id TEXT NOT NULL,
277            entry_json TEXT NOT NULL
278        );
279
280        CREATE TABLE IF NOT EXISTS lash_session_meta (
281            session_id TEXT PRIMARY KEY,
282            meta_json TEXT NOT NULL
283        );
284
285        CREATE TABLE IF NOT EXISTS lash_runtime_turn_commits (
286            session_id TEXT NOT NULL,
287            turn_id TEXT NOT NULL,
288            turn_commit_hash TEXT NOT NULL,
289            result_json TEXT NOT NULL,
290            committed_at_ms BIGINT NOT NULL,
291            PRIMARY KEY (session_id, turn_id)
292        );
293
294        CREATE TABLE IF NOT EXISTS lash_queued_work_batches (
295            enqueue_seq BIGSERIAL PRIMARY KEY,
296            batch_id TEXT NOT NULL UNIQUE,
297            session_id TEXT NOT NULL,
298            source_key TEXT,
299            delivery_policy TEXT NOT NULL,
300            slot_policy TEXT NOT NULL,
301            merge_key_json TEXT NOT NULL,
302            available_at_ms BIGINT NOT NULL,
303            enqueued_at_ms BIGINT NOT NULL,
304            claim_id TEXT,
305            claim_owner_id TEXT,
306            claim_token TEXT,
307            claim_fencing_token BIGINT NOT NULL DEFAULT 0,
308            claim_claimed_at_ms BIGINT NOT NULL DEFAULT 0,
309            claim_expires_at_ms BIGINT NOT NULL DEFAULT 0,
310            UNIQUE (session_id, source_key)
311        );
312        CREATE INDEX IF NOT EXISTS idx_lash_queued_work_ready
313            ON lash_queued_work_batches(session_id, available_at_ms, enqueue_seq);
314
315        CREATE TABLE IF NOT EXISTS lash_queued_work_items (
316            batch_id TEXT NOT NULL REFERENCES lash_queued_work_batches(batch_id) ON DELETE CASCADE,
317            item_index INTEGER NOT NULL,
318            item_id TEXT NOT NULL,
319            payload_json TEXT NOT NULL,
320            PRIMARY KEY (batch_id, item_index)
321        );
322
323        CREATE TABLE IF NOT EXISTS lash_attachment_manifest (
324            attachment_id TEXT PRIMARY KEY,
325            session_id TEXT NOT NULL,
326            canonical_uri TEXT NOT NULL,
327            intent_at_ms BIGINT NOT NULL,
328            committed_at_ms BIGINT
329        );
330        CREATE INDEX IF NOT EXISTS idx_lash_attachment_manifest_uncommitted
331            ON lash_attachment_manifest(committed_at_ms)
332            WHERE committed_at_ms IS NULL;
333
334        CREATE TABLE IF NOT EXISTS lash_processes (
335            process_id TEXT PRIMARY KEY,
336            registration_hash TEXT NOT NULL,
337            owner_scope_id TEXT NOT NULL,
338            host_profile_id TEXT NOT NULL,
339            created_at_ms BIGINT NOT NULL,
340            updated_at_ms BIGINT NOT NULL,
341            status TEXT NOT NULL,
342            record_json TEXT NOT NULL
343        );
344        CREATE INDEX IF NOT EXISTS idx_lash_processes_status
345            ON lash_processes(status);
346
347        CREATE TABLE IF NOT EXISTS lash_process_events (
348            process_id TEXT NOT NULL REFERENCES lash_processes(process_id) ON DELETE CASCADE,
349            sequence BIGINT NOT NULL,
350            event_type TEXT NOT NULL,
351            payload_hash TEXT NOT NULL,
352            idempotency_key TEXT,
353            occurred_at_ms BIGINT NOT NULL,
354            event_json TEXT NOT NULL,
355            PRIMARY KEY (process_id, sequence)
356        );
357        CREATE UNIQUE INDEX IF NOT EXISTS idx_lash_process_events_key
358            ON lash_process_events(process_id, idempotency_key)
359            WHERE idempotency_key IS NOT NULL;
360
361        CREATE TABLE IF NOT EXISTS lash_process_wake_acks (
362            process_id TEXT NOT NULL REFERENCES lash_processes(process_id) ON DELETE CASCADE,
363            sequence BIGINT NOT NULL,
364            PRIMARY KEY (process_id, sequence)
365        );
366
367        CREATE TABLE IF NOT EXISTS lash_process_handle_grants (
368            session_id TEXT NOT NULL,
369            scope_id TEXT NOT NULL,
370            process_id TEXT NOT NULL REFERENCES lash_processes(process_id) ON DELETE CASCADE,
371            descriptor_json TEXT NOT NULL,
372            PRIMARY KEY (scope_id, process_id)
373        );
374        CREATE INDEX IF NOT EXISTS idx_lash_process_handle_grants_session
375            ON lash_process_handle_grants(session_id);
376        CREATE INDEX IF NOT EXISTS idx_lash_process_handle_grants_process
377            ON lash_process_handle_grants(process_id);
378
379        CREATE TABLE IF NOT EXISTS lash_process_leases (
380            process_id TEXT PRIMARY KEY REFERENCES lash_processes(process_id) ON DELETE CASCADE,
381            lease_owner_id TEXT,
382            lease_token TEXT,
383            lease_fencing_token BIGINT NOT NULL DEFAULT 0,
384            lease_claimed_at_ms BIGINT NOT NULL DEFAULT 0,
385            lease_expires_at_ms BIGINT NOT NULL DEFAULT 0
386        );
387
388        CREATE SEQUENCE IF NOT EXISTS lash_host_event_subscription_seq;
389        CREATE TABLE IF NOT EXISTS lash_host_event_trigger_subscriptions (
390            subscription_id TEXT PRIMARY KEY,
391            session_id TEXT NOT NULL,
392            handle TEXT NOT NULL,
393            source_type TEXT NOT NULL,
394            source_key TEXT NOT NULL,
395            enabled BOOLEAN NOT NULL,
396            created_at_ms BIGINT NOT NULL,
397            updated_at_ms BIGINT NOT NULL,
398            record_json TEXT NOT NULL,
399            UNIQUE(session_id, handle)
400        );
401        CREATE INDEX IF NOT EXISTS idx_lash_host_event_subscriptions_source
402            ON lash_host_event_trigger_subscriptions(source_type, source_key, enabled);
403
404        CREATE TABLE IF NOT EXISTS lash_host_event_occurrences (
405            occurrence_id TEXT PRIMARY KEY,
406            idempotency_key TEXT NOT NULL UNIQUE,
407            request_hash TEXT NOT NULL,
408            source_type TEXT NOT NULL,
409            source_key TEXT NOT NULL,
410            occurred_at_ms BIGINT NOT NULL,
411            record_json TEXT NOT NULL
412        );
413
414        CREATE TABLE IF NOT EXISTS lash_host_event_deliveries (
415            occurrence_id TEXT NOT NULL REFERENCES lash_host_event_occurrences(occurrence_id) ON DELETE CASCADE,
416            subscription_id TEXT NOT NULL REFERENCES lash_host_event_trigger_subscriptions(subscription_id) ON DELETE CASCADE,
417            process_id TEXT NOT NULL,
418            created_at_ms BIGINT NOT NULL,
419            PRIMARY KEY (occurrence_id, subscription_id)
420        );
421
422        CREATE TABLE IF NOT EXISTS lash_lashlang_artifacts (
423            module_ref TEXT PRIMARY KEY,
424            artifact_bytes BYTEA NOT NULL
425        );
426        "#,
427    )
428    .await
429    .map_err(store_sqlx_error)?;
430
431    let existing: Option<i32> =
432        sqlx::query_scalar("SELECT version FROM lash_schema_versions WHERE component = $1")
433            .bind(SCHEMA_COMPONENT)
434            .fetch_optional(&mut *tx)
435            .await
436            .map_err(store_sqlx_error)?;
437    match existing {
438        Some(version) if version == SCHEMA_VERSION => {}
439        Some(version) => {
440            return Err(StoreError::Backend(format!(
441                "Postgres schema component `{SCHEMA_COMPONENT}` has version {version}, expected {SCHEMA_VERSION}"
442            )));
443        }
444        None => {
445            sqlx::query("INSERT INTO lash_schema_versions (component, version) VALUES ($1, $2)")
446                .bind(SCHEMA_COMPONENT)
447                .bind(SCHEMA_VERSION)
448                .execute(&mut *tx)
449                .await
450                .map_err(store_sqlx_error)?;
451        }
452    }
453    tx.commit().await.map_err(store_sqlx_error)
454}
455
456fn current_epoch_ms() -> u64 {
457    SystemTime::now()
458        .duration_since(UNIX_EPOCH)
459        .unwrap_or_default()
460        .as_millis() as u64
461}
462
463fn current_timestamp_string() -> String {
464    let now = SystemTime::now()
465        .duration_since(UNIX_EPOCH)
466        .unwrap_or_default();
467    format!("unix:{}", now.as_secs())
468}
469
470fn store_sqlx_error(err: sqlx::Error) -> StoreError {
471    StoreError::Backend(err.to_string())
472}
473
474/// Postgres SQLSTATEs that signal transient write contention rather than a hard
475/// failure: serialization failure, deadlock, and lock-acquisition timeout. On the
476/// session head these all mean "a concurrent committer got there first" — i.e. a
477/// revision conflict the caller should reload-and-retry, not a backend error.
478fn is_contention_error(err: &sqlx::Error) -> bool {
479    matches!(
480        err.as_database_error()
481            .and_then(|db| db.code())
482            .as_deref(),
483        Some("40001" | "40P01" | "55P03")
484    )
485}
486
487fn plugin_sqlx_error(err: sqlx::Error) -> PluginError {
488    PluginError::Session(err.to_string())
489}
490
491fn process_decode_error(err: serde_json::Error) -> PluginError {
492    PluginError::Session(format!("failed to decode process registry row: {err}"))
493}
494
495fn store_decode_json<T: serde::de::DeserializeOwned>(
496    json: &str,
497    what: &str,
498) -> Result<T, StoreError> {
499    serde_json::from_str(json)
500        .map_err(|err| StoreError::Backend(format!("failed to decode {what}: {err}")))
501}
502
503fn encode_json<T: serde::Serialize>(value: &T) -> String {
504    serde_json::to_string(value).expect("persisted state should serialize")
505}
506
507fn encode_msgpack<T: serde::Serialize>(value: &T) -> Vec<u8> {
508    let mut buf = Vec::with_capacity(1024);
509    rmp_serde::encode::write_named(&mut buf, value).expect("value should serialize");
510    buf
511}
512
513fn decode_msgpack<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Option<T> {
514    rmp_serde::from_slice(bytes).ok()
515}
516
517fn block_on_detached<T: Send + 'static>(
518    future: impl std::future::Future<Output = T> + Send + 'static,
519) -> T {
520    std::thread::spawn(move || {
521        tokio::runtime::Builder::new_current_thread()
522            .enable_all()
523            .build()
524            .expect("postgres manifest runtime")
525            .block_on(future)
526    })
527    .join()
528    .expect("postgres manifest thread")
529}
530
531fn merge_token_ledger_entries(entries: Vec<TokenLedgerEntry>) -> Vec<TokenLedgerEntry> {
532    let mut merged = Vec::<TokenLedgerEntry>::new();
533    for entry in entries {
534        if entry.usage.total() == 0 {
535            continue;
536        }
537        if let Some(existing) = merged
538            .iter_mut()
539            .find(|existing| existing.source == entry.source && existing.model == entry.model)
540        {
541            existing.usage.input_tokens += entry.usage.input_tokens;
542            existing.usage.output_tokens += entry.usage.output_tokens;
543            existing.usage.cached_input_tokens += entry.usage.cached_input_tokens;
544            existing.usage.reasoning_tokens += entry.usage.reasoning_tokens;
545        } else {
546            merged.push(entry);
547        }
548    }
549    merged
550}
551
552#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
553struct SessionCheckpointEnvelope {
554    manifest: SessionCheckpoint,
555    tool_state: Option<lash_core::ToolState>,
556    plugin_snapshot: Option<lash_core::PluginSessionSnapshot>,
557    execution_state: Option<Vec<u8>>,
558}
559
560async fn put_blob_tx(
561    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
562    content: &[u8],
563) -> Result<BlobRef, StoreError> {
564    let hash = format!("{:x}", Sha256::digest(content));
565    sqlx::query(
566        "INSERT INTO lash_blobs (hash, content)
567         VALUES ($1, $2)
568         ON CONFLICT (hash) DO NOTHING",
569    )
570    .bind(&hash)
571    .bind(content)
572    .execute(&mut **tx)
573    .await
574    .map_err(store_sqlx_error)?;
575    Ok(BlobRef(hash))
576}
577
578async fn get_blob(pool: &PgPool, blob_ref: &BlobRef) -> Option<Vec<u8>> {
579    sqlx::query_scalar::<_, Vec<u8>>("SELECT content FROM lash_blobs WHERE hash = $1")
580        .bind(blob_ref.as_str())
581        .fetch_optional(pool)
582        .await
583        .ok()
584        .flatten()
585}
586
587async fn put_checkpoint_tx(
588    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
589    checkpoint: &HydratedSessionCheckpoint,
590) -> Result<(BlobRef, SessionCheckpoint), StoreError> {
591    let manifest = SessionCheckpoint {
592        turn_state: checkpoint.turn_state.clone(),
593        tool_state_ref: checkpoint.tool_state_ref.clone(),
594        plugin_snapshot_ref: checkpoint.plugin_snapshot_ref.clone(),
595        plugin_snapshot_revision: checkpoint.plugin_snapshot_revision,
596        execution_state_ref: checkpoint.execution_state_ref.clone(),
597    };
598    let envelope = SessionCheckpointEnvelope {
599        manifest: manifest.clone(),
600        tool_state: checkpoint.tool_state.clone(),
601        plugin_snapshot: checkpoint.plugin_snapshot.clone(),
602        execution_state: checkpoint.execution_state.clone(),
603    };
604    let bytes = encode_msgpack(&envelope);
605    let checkpoint_ref = put_blob_tx(tx, &bytes).await?;
606    Ok((checkpoint_ref, manifest))
607}
608
609async fn get_checkpoint(pool: &PgPool, blob_ref: &BlobRef) -> Option<HydratedSessionCheckpoint> {
610    let bytes = get_blob(pool, blob_ref).await?;
611    let envelope: SessionCheckpointEnvelope = decode_msgpack(&bytes)?;
612    Some(HydratedSessionCheckpoint {
613        turn_state: envelope.manifest.turn_state,
614        tool_state_ref: envelope.manifest.tool_state_ref,
615        tool_state: envelope.tool_state,
616        plugin_snapshot_ref: envelope.manifest.plugin_snapshot_ref,
617        plugin_snapshot: envelope.plugin_snapshot,
618        plugin_snapshot_revision: envelope.manifest.plugin_snapshot_revision,
619        execution_state_ref: envelope.manifest.execution_state_ref,
620        execution_state: envelope.execution_state,
621    })
622}
623
624async fn load_session_head_meta_tx(
625    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
626    session_id: &str,
627    for_update: bool,
628) -> Result<Option<SessionHeadMeta>, StoreError> {
629    let sql = if for_update {
630        "SELECT head_json, head_revision FROM lash_sessions WHERE session_id = $1 FOR UPDATE"
631    } else {
632        "SELECT head_json, head_revision FROM lash_sessions WHERE session_id = $1"
633    };
634    let row = sqlx::query(sql)
635        .bind(session_id)
636        .fetch_optional(&mut **tx)
637        .await
638        .map_err(store_sqlx_error)?;
639    let Some(row) = row else {
640        return Ok(None);
641    };
642    let head_json: String = row.get(0);
643    let head_revision: i64 = row.get(1);
644    let mut meta: SessionHeadMeta = store_decode_json(&head_json, "session head")?;
645    meta.head_revision = head_revision as u64;
646    Ok(Some(meta))
647}
648
649async fn load_usage_deltas(pool: &PgPool, session_id: &str) -> Vec<TokenLedgerEntry> {
650    let rows = sqlx::query(
651        "SELECT entry_json FROM lash_usage_deltas WHERE session_id = $1 ORDER BY seq ASC",
652    )
653    .bind(session_id)
654    .fetch_all(pool)
655    .await
656    .unwrap_or_default();
657    rows.into_iter()
658        .filter_map(|row| {
659            let json: String = row.get(0);
660            serde_json::from_str(&json).ok()
661        })
662        .collect()
663}
664
665async fn load_graph(
666    pool: &PgPool,
667    session_id: &str,
668    leaf_node_id: Option<String>,
669    active_path: bool,
670) -> Result<lash_core::SessionGraph, StoreError> {
671    let rows = sqlx::query(
672        "SELECT node_json FROM lash_graph_nodes
673         WHERE session_id = $1 AND tombstoned = FALSE
674         ORDER BY seq ASC",
675    )
676    .bind(session_id)
677    .fetch_all(pool)
678    .await
679    .map_err(store_sqlx_error)?;
680    let mut nodes = Vec::<SessionNodeRecord>::new();
681    for row in rows {
682        let json: String = row.get(0);
683        nodes.push(store_decode_json(&json, "session graph node")?);
684    }
685    if active_path {
686        if let Some(leaf) = leaf_node_id.clone() {
687            let wanted = active_path_node_ids(&nodes, &leaf);
688            nodes.retain(|node| wanted.contains(&node.node_id));
689        }
690    }
691    Ok(lash_core::SessionGraph::from_nodes(nodes, leaf_node_id))
692}
693
694fn active_path_node_ids(nodes: &[SessionNodeRecord], leaf_node_id: &str) -> HashSet<String> {
695    let mut parent_by_id = std::collections::BTreeMap::new();
696    for node in nodes {
697        parent_by_id.insert(node.node_id.clone(), node.parent_node_id.clone());
698    }
699    let mut wanted = HashSet::new();
700    let mut cursor = Some(leaf_node_id.to_string());
701    while let Some(node_id) = cursor {
702        if !wanted.insert(node_id.clone()) {
703            break;
704        }
705        cursor = parent_by_id.get(&node_id).cloned().flatten();
706    }
707    wanted
708}
709
710async fn commit_attachment_refs_tx(
711    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
712    session_id: &str,
713    attachment_ids: &[AttachmentId],
714) -> Result<(), StoreError> {
715    if attachment_ids.is_empty() {
716        return Ok(());
717    }
718    let now = current_epoch_ms() as i64;
719    for id in attachment_ids {
720        sqlx::query(
721            "UPDATE lash_attachment_manifest
722             SET committed_at_ms = COALESCE(committed_at_ms, $1)
723             WHERE attachment_id = $2 AND session_id = $3",
724        )
725        .bind(now)
726        .bind(id.as_str())
727        .bind(session_id)
728        .execute(&mut **tx)
729        .await
730        .map_err(store_sqlx_error)?;
731    }
732    Ok(())
733}
734
735impl AttachmentManifest for PostgresSessionStore {
736    fn record_intent(&self, intent: AttachmentIntent) -> Result<(), StoreError> {
737        let pool = self.pool.clone();
738        block_on_detached(async move {
739            sqlx::query(
740                "INSERT INTO lash_attachment_manifest (
741                    attachment_id, session_id, canonical_uri, intent_at_ms, committed_at_ms
742                 )
743                 VALUES ($1, $2, $3, $4, NULL)
744                 ON CONFLICT (attachment_id) DO UPDATE SET
745                    session_id = EXCLUDED.session_id,
746                    canonical_uri = EXCLUDED.canonical_uri,
747                    intent_at_ms = EXCLUDED.intent_at_ms",
748            )
749            .bind(intent.attachment_id.as_str())
750            .bind(intent.session_id)
751            .bind(intent.canonical_uri)
752            .bind(intent.intent_at_epoch_ms as i64)
753            .execute(&pool)
754            .await
755            .map(|_| ())
756            .map_err(store_sqlx_error)
757        })
758    }
759
760    fn commit_refs(
761        &self,
762        session_id: &str,
763        attachment_ids: &[AttachmentId],
764    ) -> Result<(), StoreError> {
765        let pool = self.pool.clone();
766        let session_id = session_id.to_string();
767        let attachment_ids = attachment_ids.to_vec();
768        block_on_detached(async move {
769            let mut tx = pool.begin().await.map_err(store_sqlx_error)?;
770            commit_attachment_refs_tx(&mut tx, &session_id, &attachment_ids).await?;
771            tx.commit().await.map_err(store_sqlx_error)
772        })
773    }
774
775    fn list_uncommitted(
776        &self,
777        older_than_epoch_ms: u64,
778    ) -> Result<Vec<AttachmentManifestEntry>, StoreError> {
779        let pool = self.pool.clone();
780        block_on_detached(async move {
781            let rows = sqlx::query(
782                "SELECT attachment_id, session_id, canonical_uri, intent_at_ms, committed_at_ms
783                 FROM lash_attachment_manifest
784                 WHERE committed_at_ms IS NULL AND intent_at_ms <= $1
785                 ORDER BY attachment_id ASC",
786            )
787            .bind(older_than_epoch_ms as i64)
788            .fetch_all(&pool)
789            .await
790            .map_err(store_sqlx_error)?;
791            Ok(rows
792                .into_iter()
793                .map(|row| AttachmentManifestEntry {
794                    attachment_id: AttachmentId::new(row.get::<String, _>(0)),
795                    session_id: row.get(1),
796                    canonical_uri: row.get(2),
797                    intent_at_epoch_ms: row.get::<i64, _>(3) as u64,
798                    committed_at_epoch_ms: row.get::<Option<i64>, _>(4).map(|value| value as u64),
799                })
800                .collect())
801        })
802    }
803
804    fn forget(&self, attachment_id: &AttachmentId) -> Result<(), StoreError> {
805        let pool = self.pool.clone();
806        let attachment_id = attachment_id.to_string();
807        block_on_detached(async move {
808            sqlx::query("DELETE FROM lash_attachment_manifest WHERE attachment_id = $1")
809                .bind(attachment_id)
810                .execute(&pool)
811                .await
812                .map(|_| ())
813                .map_err(store_sqlx_error)
814        })
815    }
816}
817
818#[async_trait::async_trait]
819impl SessionStoreFactory for PostgresSessionStoreFactory {
820    fn durability_tier(&self) -> DurabilityTier {
821        DurabilityTier::Durable
822    }
823
824    async fn create_store(
825        &self,
826        request: &SessionStoreCreateRequest,
827    ) -> Result<Arc<dyn RuntimePersistence>, String> {
828        let store = PostgresSessionStore {
829            pool: self.pool.clone(),
830            session_id: Some(request.session_id.clone()),
831            bound_session: Arc::new(OnceLock::new()),
832        };
833        if store
834            .load_session_meta()
835            .await
836            .map_err(|err| err.to_string())?
837            .is_none()
838        {
839            store
840                .save_session_meta(SessionMeta {
841                    session_id: request.session_id.clone(),
842                    session_name: request.session_id.clone(),
843                    created_at: current_timestamp_string(),
844                    model: request.policy.model.id.clone(),
845                    cwd: std::env::current_dir()
846                        .ok()
847                        .and_then(|path| path.to_str().map(str::to_string)),
848                    relation: request.relation.clone(),
849                })
850                .await
851                .map_err(|err| err.to_string())?;
852        }
853        Ok(Arc::new(store))
854    }
855
856    async fn delete_session(&self, session_id: &str) -> Result<(), String> {
857        let mut tx = self.pool.begin().await.map_err(|err| err.to_string())?;
858        for sql in [
859            "DELETE FROM lash_queued_work_items WHERE batch_id IN (SELECT batch_id FROM lash_queued_work_batches WHERE session_id = $1)",
860            "DELETE FROM lash_queued_work_batches WHERE session_id = $1",
861            "DELETE FROM lash_usage_deltas WHERE session_id = $1",
862            "DELETE FROM lash_graph_nodes WHERE session_id = $1",
863            "DELETE FROM lash_runtime_turn_commits WHERE session_id = $1",
864            "DELETE FROM lash_session_meta WHERE session_id = $1",
865            "DELETE FROM lash_sessions WHERE session_id = $1",
866            "DELETE FROM lash_attachment_manifest WHERE session_id = $1",
867        ] {
868            sqlx::query(sql)
869                .bind(session_id)
870                .execute(&mut *tx)
871                .await
872                .map_err(|err| err.to_string())?;
873        }
874        tx.commit().await.map_err(|err| err.to_string())
875    }
876}
877
878#[derive(Clone, Debug)]
879struct QueuedBatchRow {
880    enqueue_seq: u64,
881    batch_id: String,
882    session_id: String,
883    source_key: Option<String>,
884    delivery_policy: DeliveryPolicy,
885    slot_policy: SlotPolicy,
886    merge_key: MergeKey,
887    available_at_ms: u64,
888    enqueued_at_ms: u64,
889    claim_fencing_token: u64,
890}
891
892fn queued_batch_row(row: PgRow) -> Result<QueuedBatchRow, StoreError> {
893    let delivery_policy =
894        DeliveryPolicy::from_wire_str(row.get::<String, _>("delivery_policy").as_str())
895            .ok_or_else(|| {
896                StoreError::Backend("invalid queued work delivery policy".to_string())
897            })?;
898    let slot_policy = SlotPolicy::from_wire_str(row.get::<String, _>("slot_policy").as_str())
899        .ok_or_else(|| StoreError::Backend("invalid queued work slot policy".to_string()))?;
900    let merge_json: String = row.get("merge_key_json");
901    Ok(QueuedBatchRow {
902        enqueue_seq: row.get::<i64, _>("enqueue_seq") as u64,
903        batch_id: row.get("batch_id"),
904        session_id: row.get("session_id"),
905        source_key: row.get("source_key"),
906        delivery_policy,
907        slot_policy,
908        merge_key: store_decode_json(&merge_json, "queued work merge key")?,
909        available_at_ms: row.get::<i64, _>("available_at_ms") as u64,
910        enqueued_at_ms: row.get::<i64, _>("enqueued_at_ms") as u64,
911        claim_fencing_token: row.get::<i64, _>("claim_fencing_token") as u64,
912    })
913}
914
915async fn load_queued_batch(
916    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
917    batch_id: &str,
918) -> Result<Option<QueuedWorkBatch>, StoreError> {
919    let row = sqlx::query(
920        "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
921                slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
922                claim_fencing_token
923         FROM lash_queued_work_batches
924         WHERE batch_id = $1",
925    )
926    .bind(batch_id)
927    .fetch_optional(&mut **tx)
928    .await
929    .map_err(store_sqlx_error)?;
930    let Some(row) = row else {
931        return Ok(None);
932    };
933    let row = queued_batch_row(row)?;
934    queued_work_batch_from_row(tx, row).await.map(Some)
935}
936
937async fn queued_work_batch_from_row(
938    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
939    row: QueuedBatchRow,
940) -> Result<QueuedWorkBatch, StoreError> {
941    let item_rows = sqlx::query(
942        "SELECT item_id, payload_json
943         FROM lash_queued_work_items
944         WHERE batch_id = $1
945         ORDER BY item_index ASC",
946    )
947    .bind(&row.batch_id)
948    .fetch_all(&mut **tx)
949    .await
950    .map_err(store_sqlx_error)?;
951    let mut items = Vec::new();
952    for item in item_rows {
953        let payload_json: String = item.get(1);
954        items.push(QueuedWorkItem {
955            item_id: item.get(0),
956            payload: store_decode_json(&payload_json, "queued work payload")?,
957        });
958    }
959    Ok(QueuedWorkBatch {
960        batch_id: row.batch_id,
961        session_id: row.session_id,
962        enqueue_seq: row.enqueue_seq,
963        source_key: row.source_key,
964        delivery_policy: row.delivery_policy,
965        slot_policy: row.slot_policy,
966        merge_key: row.merge_key,
967        available_at_ms: row.available_at_ms,
968        enqueued_at_ms: row.enqueued_at_ms,
969        items,
970    })
971}
972
973async fn ensure_queued_work_completion_tx(
974    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
975    completed: &QueuedWorkCompletion,
976) -> Result<(), StoreError> {
977    for batch_id in &completed.batch_ids {
978        let exists: Option<i64> = sqlx::query_scalar(
979            "SELECT 1::BIGINT FROM lash_queued_work_batches
980             WHERE session_id = $1
981               AND batch_id = $2
982               AND claim_id = $3
983               AND claim_token = $4
984             LIMIT 1",
985        )
986        .bind(&completed.session_id)
987        .bind(batch_id)
988        .bind(&completed.claim_id)
989        .bind(&completed.lease_token)
990        .fetch_optional(&mut **tx)
991        .await
992        .map_err(store_sqlx_error)?;
993        if exists.is_none() {
994            return Err(StoreError::QueuedWorkClaimExpired {
995                session_id: completed.session_id.clone(),
996                claim_id: completed.claim_id.clone(),
997            });
998        }
999    }
1000    Ok(())
1001}
1002
1003#[async_trait::async_trait]
1004impl RuntimePersistence for PostgresSessionStore {
1005    fn durability_tier(&self) -> DurabilityTier {
1006        DurabilityTier::Durable
1007    }
1008
1009    async fn load_session(
1010        &self,
1011        scope: SessionReadScope,
1012    ) -> Result<Option<PersistedSessionRead>, StoreError> {
1013        let Some(session_id) = self.selected_session_id().await? else {
1014            return Ok(None);
1015        };
1016        let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1017        let Some(meta) = load_session_head_meta_tx(&mut tx, &session_id, false).await? else {
1018            return Ok(None);
1019        };
1020        tx.commit().await.map_err(store_sqlx_error)?;
1021        let leaf_node_id = match &scope {
1022            SessionReadScope::FullGraph => meta.leaf_node_id.clone(),
1023            SessionReadScope::ActivePath { leaf_node_id } => {
1024                leaf_node_id.clone().or_else(|| meta.leaf_node_id.clone())
1025            }
1026        };
1027        let graph = load_graph(
1028            &self.pool,
1029            &session_id,
1030            leaf_node_id.clone(),
1031            matches!(scope, SessionReadScope::ActivePath { .. }),
1032        )
1033        .await?;
1034        let checkpoint = match meta.checkpoint_ref.as_ref() {
1035            Some(blob_ref) => get_checkpoint(&self.pool, blob_ref).await,
1036            None => None,
1037        };
1038        Ok(Some(PersistedSessionRead {
1039            session_id: meta.session_id,
1040            head_revision: meta.head_revision,
1041            config: meta.config,
1042            agent_frames: meta.agent_frames,
1043            current_agent_frame_id: meta.current_agent_frame_id,
1044            graph,
1045            checkpoint_ref: meta.checkpoint_ref,
1046            checkpoint,
1047            token_ledger: merge_token_ledger_entries(
1048                load_usage_deltas(&self.pool, &session_id).await,
1049            ),
1050        }))
1051    }
1052
1053    async fn load_node(&self, node_id: &str) -> Result<Option<SessionNodeRecord>, StoreError> {
1054        let json: Option<String> = if let Some(session_id) = &self.session_id {
1055            sqlx::query_scalar(
1056                "SELECT node_json FROM lash_graph_nodes
1057                 WHERE session_id = $1 AND node_id = $2 AND tombstoned = FALSE",
1058            )
1059            .bind(session_id)
1060            .bind(node_id)
1061            .fetch_optional(&self.pool)
1062            .await
1063            .map_err(store_sqlx_error)?
1064        } else {
1065            sqlx::query_scalar(
1066                "SELECT node_json FROM lash_graph_nodes
1067                 WHERE node_id = $1 AND tombstoned = FALSE
1068                 ORDER BY session_id ASC
1069                 LIMIT 1",
1070            )
1071            .bind(node_id)
1072            .fetch_optional(&self.pool)
1073            .await
1074            .map_err(store_sqlx_error)?
1075        };
1076        json.map(|json| store_decode_json(&json, "session graph node"))
1077            .transpose()
1078    }
1079
1080    async fn commit_runtime_state(
1081        &self,
1082        commit: RuntimeCommit,
1083    ) -> Result<RuntimeCommitResult, StoreError> {
1084        let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1085        // Read the head WITHOUT a lock; the conditional CAS write below is the
1086        // serialization point (optimistic concurrency), so no pessimistic
1087        // `FOR UPDATE` lock is held across the rest of this transaction.
1088        let existing = load_session_head_meta_tx(&mut tx, &commit.session_id, false).await?;
1089        if let Some(bound_session_id) = existing.as_ref().map(|meta| meta.session_id.as_str())
1090            && bound_session_id != commit.session_id
1091        {
1092            return Err(StoreError::SessionBindingMismatch {
1093                bound_session_id: bound_session_id.to_string(),
1094                attempted_session_id: commit.session_id,
1095            });
1096        }
1097        // A session-store handle commits to exactly one session. An explicit
1098        // binding (`self.session_id`) is authoritative; otherwise the handle binds
1099        // to the first session it commits and rejects any other thereafter.
1100        let effective_binding = self
1101            .session_id
1102            .clone()
1103            .or_else(|| self.bound_session.get().cloned());
1104        if let Some(bound_session_id) = &effective_binding
1105            && commit.session_id != *bound_session_id
1106        {
1107            return Err(StoreError::SessionBindingMismatch {
1108                bound_session_id: bound_session_id.clone(),
1109                attempted_session_id: commit.session_id,
1110            });
1111        }
1112        if self.session_id.is_none() {
1113            let _ = self.bound_session.set(commit.session_id.clone());
1114        }
1115        if let Some(completed) = &commit.turn_commit {
1116            if completed.session_id != commit.session_id {
1117                return Err(StoreError::RuntimeTurnCommitConflict {
1118                    session_id: completed.session_id.clone(),
1119                    turn_id: completed.turn_id.clone(),
1120                });
1121            }
1122            let prior = sqlx::query(
1123                "SELECT turn_commit_hash, result_json
1124                 FROM lash_runtime_turn_commits
1125                 WHERE session_id = $1 AND turn_id = $2",
1126            )
1127            .bind(&completed.session_id)
1128            .bind(&completed.turn_id)
1129            .fetch_optional(&mut *tx)
1130            .await
1131            .map_err(store_sqlx_error)?;
1132            if let Some(row) = prior {
1133                let hash: String = row.get(0);
1134                let result_json: String = row.get(1);
1135                if hash == completed.turn_commit_hash {
1136                    return store_decode_json(&result_json, "runtime turn commit result");
1137                }
1138                return Err(StoreError::RuntimeTurnCommitConflict {
1139                    session_id: completed.session_id.clone(),
1140                    turn_id: completed.turn_id.clone(),
1141                });
1142            }
1143        }
1144        let actual_revision = existing.as_ref().map_or(0, |meta| meta.head_revision);
1145        if commit.expected_head_revision.is_some()
1146            && commit.expected_head_revision != Some(actual_revision)
1147        {
1148            return Err(StoreError::HeadRevisionConflict {
1149                expected: commit.expected_head_revision,
1150                actual: actual_revision,
1151            });
1152        }
1153        for completed in &commit.completed_queue_claims {
1154            if completed.session_id != commit.session_id {
1155                return Err(StoreError::QueuedWorkClaimExpired {
1156                    session_id: completed.session_id.clone(),
1157                    claim_id: completed.claim_id.clone(),
1158                });
1159            }
1160            ensure_queued_work_completion_tx(&mut tx, completed).await?;
1161        }
1162        let (checkpoint_ref, manifest) = put_checkpoint_tx(&mut tx, &commit.checkpoint).await?;
1163        for entry in &commit.usage_deltas {
1164            sqlx::query("INSERT INTO lash_usage_deltas (session_id, entry_json) VALUES ($1, $2)")
1165                .bind(&commit.session_id)
1166                .bind(encode_json(entry))
1167                .execute(&mut *tx)
1168                .await
1169                .map_err(store_sqlx_error)?;
1170        }
1171        let leaf_node_id = match &commit.graph {
1172            GraphCommitDelta::Unchanged { leaf_node_id } => leaf_node_id.clone(),
1173            GraphCommitDelta::Append {
1174                nodes,
1175                leaf_node_id,
1176            } => {
1177                for node in nodes {
1178                    sqlx::query(
1179                        "INSERT INTO lash_graph_nodes (session_id, node_id, node_json)
1180                         VALUES ($1, $2, $3)
1181                         ON CONFLICT (session_id, node_id) DO UPDATE SET
1182                            node_json = EXCLUDED.node_json,
1183                            tombstoned = FALSE",
1184                    )
1185                    .bind(&commit.session_id)
1186                    .bind(&node.node_id)
1187                    .bind(encode_json(node))
1188                    .execute(&mut *tx)
1189                    .await
1190                    .map_err(store_sqlx_error)?;
1191                }
1192                leaf_node_id.clone()
1193            }
1194            GraphCommitDelta::ReplaceFull(graph) => {
1195                sqlx::query("DELETE FROM lash_graph_nodes WHERE session_id = $1")
1196                    .bind(&commit.session_id)
1197                    .execute(&mut *tx)
1198                    .await
1199                    .map_err(store_sqlx_error)?;
1200                for node in &graph.nodes {
1201                    sqlx::query(
1202                        "INSERT INTO lash_graph_nodes (session_id, node_id, node_json)
1203                         VALUES ($1, $2, $3)",
1204                    )
1205                    .bind(&commit.session_id)
1206                    .bind(&node.node_id)
1207                    .bind(encode_json(node))
1208                    .execute(&mut *tx)
1209                    .await
1210                    .map_err(store_sqlx_error)?;
1211                }
1212                graph.leaf_node_id.clone()
1213            }
1214        };
1215        let graph_node_count: i64 = sqlx::query_scalar(
1216            "SELECT COUNT(*) FROM lash_graph_nodes WHERE session_id = $1 AND tombstoned = FALSE",
1217        )
1218        .bind(&commit.session_id)
1219        .fetch_one(&mut *tx)
1220        .await
1221        .map_err(store_sqlx_error)?;
1222        let next_revision = actual_revision + 1;
1223        let meta = SessionHeadMeta {
1224            session_id: commit.session_id.clone(),
1225            head_revision: next_revision,
1226            config: commit.config.clone(),
1227            agent_frames: commit.agent_frames.clone(),
1228            current_agent_frame_id: commit.current_agent_frame_id.clone(),
1229            checkpoint_ref: Some(checkpoint_ref.clone()),
1230            leaf_node_id,
1231            graph_node_count: graph_node_count as usize,
1232            token_ledger: Vec::new(),
1233        };
1234        // Optimistic CAS on the head revision. The `WHERE head_revision = $5`
1235        // guard makes the write succeed only if no concurrent committer moved the
1236        // head since our unlocked read above. A brand-new session inserts (no
1237        // conflict); an existing one updates only when the revision still matches.
1238        let head_write = sqlx::query(
1239            "INSERT INTO lash_sessions (session_id, head_revision, head_json, checkpoint_ref)
1240             VALUES ($1, $2, $3, $4)
1241             ON CONFLICT (session_id) DO UPDATE SET
1242                head_revision = EXCLUDED.head_revision,
1243                head_json = EXCLUDED.head_json,
1244                checkpoint_ref = EXCLUDED.checkpoint_ref
1245             WHERE lash_sessions.head_revision = $5",
1246        )
1247        .bind(&commit.session_id)
1248        .bind(next_revision as i64)
1249        .bind(encode_json(&meta))
1250        .bind(checkpoint_ref.as_str())
1251        .bind(actual_revision as i64)
1252        .execute(&mut *tx)
1253        .await;
1254        let head_write = match head_write {
1255            Ok(result) => result,
1256            Err(err) if is_contention_error(&err) => {
1257                // The head row is contended by a concurrent committer (lock
1258                // timeout / serialization failure / deadlock). That is a conflict,
1259                // not an opaque backend error: surface it so the caller reloads
1260                // and retries. The tx is now aborted; returning drops it.
1261                return Err(StoreError::HeadRevisionConflict {
1262                    expected: commit.expected_head_revision.or(Some(actual_revision)),
1263                    actual: actual_revision,
1264                });
1265            }
1266            Err(err) => return Err(store_sqlx_error(err)),
1267        };
1268        if head_write.rows_affected() == 0 {
1269            // A concurrent commit won the race: the head no longer matches the
1270            // revision we read. Re-read the now-current revision for an accurate
1271            // report, then drop `tx` (auto-rollback), discarding this attempt's
1272            // node/usage writes; the caller reloads and retries.
1273            let actual_now = sqlx::query_scalar::<_, i64>(
1274                "SELECT head_revision FROM lash_sessions WHERE session_id = $1",
1275            )
1276            .bind(&commit.session_id)
1277            .fetch_optional(&mut *tx)
1278            .await
1279            .map_err(store_sqlx_error)?
1280            .map_or(actual_revision, |revision| revision as u64);
1281            return Err(StoreError::HeadRevisionConflict {
1282                expected: commit.expected_head_revision.or(Some(actual_revision)),
1283                actual: actual_now,
1284            });
1285        }
1286        for completed in &commit.completed_queue_claims {
1287            for batch_id in &completed.batch_ids {
1288                sqlx::query(
1289                    "DELETE FROM lash_queued_work_batches
1290                     WHERE session_id = $1 AND batch_id = $2 AND claim_id = $3 AND claim_token = $4",
1291                )
1292                .bind(&completed.session_id)
1293                .bind(batch_id)
1294                .bind(&completed.claim_id)
1295                .bind(&completed.lease_token)
1296                .execute(&mut *tx)
1297                .await
1298                .map_err(store_sqlx_error)?;
1299            }
1300        }
1301        commit_attachment_refs_tx(
1302            &mut tx,
1303            &commit.session_id,
1304            &commit.committed_attachment_ids,
1305        )
1306        .await?;
1307        let result = RuntimeCommitResult {
1308            head_revision: next_revision,
1309            checkpoint_ref,
1310            manifest,
1311        };
1312        if let Some(completed) = &commit.turn_commit {
1313            sqlx::query(
1314                "INSERT INTO lash_runtime_turn_commits (
1315                    session_id, turn_id, turn_commit_hash, result_json, committed_at_ms
1316                 )
1317                 VALUES ($1, $2, $3, $4, $5)",
1318            )
1319            .bind(&completed.session_id)
1320            .bind(&completed.turn_id)
1321            .bind(&completed.turn_commit_hash)
1322            .bind(encode_json(&result))
1323            .bind(current_epoch_ms() as i64)
1324            .execute(&mut *tx)
1325            .await
1326            .map_err(store_sqlx_error)?;
1327        }
1328        tx.commit().await.map_err(store_sqlx_error)?;
1329        Ok(result)
1330    }
1331
1332    async fn enqueue_queued_work(
1333        &self,
1334        batch: QueuedWorkBatchDraft,
1335    ) -> Result<QueuedWorkBatch, StoreError> {
1336        let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1337        if let Some(source_key) = batch.source_key.as_deref() {
1338            let existing_id: Option<String> = sqlx::query_scalar(
1339                "SELECT batch_id FROM lash_queued_work_batches
1340                 WHERE session_id = $1 AND source_key = $2",
1341            )
1342            .bind(&batch.session_id)
1343            .bind(source_key)
1344            .fetch_optional(&mut *tx)
1345            .await
1346            .map_err(store_sqlx_error)?;
1347            if let Some(batch_id) = existing_id {
1348                let existing = load_queued_batch(&mut tx, &batch_id)
1349                    .await?
1350                    .ok_or_else(|| {
1351                        StoreError::Backend("queued work source row disappeared".to_string())
1352                    })?;
1353                tx.commit().await.map_err(store_sqlx_error)?;
1354                return Ok(existing);
1355            }
1356        }
1357        let now = current_epoch_ms();
1358        let batch_id = format!(
1359            "qwb:{:x}",
1360            Sha256::digest(format!("{}:{:?}:{now}", batch.session_id, batch.source_key).as_bytes())
1361        );
1362        let row = sqlx::query_scalar::<_, i64>(
1363            "INSERT INTO lash_queued_work_batches (
1364                batch_id, session_id, source_key, delivery_policy, slot_policy,
1365                merge_key_json, available_at_ms, enqueued_at_ms
1366             )
1367             VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
1368             RETURNING enqueue_seq",
1369        )
1370        .bind(&batch_id)
1371        .bind(&batch.session_id)
1372        .bind(&batch.source_key)
1373        .bind(batch.delivery_policy.as_str())
1374        .bind(batch.slot_policy.as_str())
1375        .bind(encode_json(&batch.merge_key))
1376        .bind(batch.available_at_ms as i64)
1377        .bind(now as i64)
1378        .fetch_one(&mut *tx)
1379        .await
1380        .map_err(store_sqlx_error)?;
1381        for (index, payload) in batch.payloads.iter().enumerate() {
1382            let item_id = format!("{batch_id}:item:{index}");
1383            sqlx::query(
1384                "INSERT INTO lash_queued_work_items (batch_id, item_index, item_id, payload_json)
1385                 VALUES ($1, $2, $3, $4)",
1386            )
1387            .bind(&batch_id)
1388            .bind(index as i32)
1389            .bind(item_id)
1390            .bind(encode_json(payload))
1391            .execute(&mut *tx)
1392            .await
1393            .map_err(store_sqlx_error)?;
1394        }
1395        let queued = load_queued_batch(&mut tx, &batch_id)
1396            .await?
1397            .ok_or_else(|| StoreError::Backend("queued work insert disappeared".to_string()))?;
1398        debug_assert_eq!(queued.enqueue_seq, row as u64);
1399        tx.commit().await.map_err(store_sqlx_error)?;
1400        Ok(queued)
1401    }
1402
1403    async fn claim_ready_queued_work(
1404        &self,
1405        session_id: &str,
1406        owner_id: &str,
1407        boundary: QueuedWorkClaimBoundary,
1408        lease_ttl_ms: u64,
1409        max_batches: usize,
1410    ) -> Result<Option<QueuedWorkClaim>, StoreError> {
1411        if max_batches == 0 {
1412            return Ok(None);
1413        }
1414        let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1415        let now = current_epoch_ms();
1416        let rows = sqlx::query(
1417            "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1418                    slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1419                    claim_fencing_token
1420             FROM lash_queued_work_batches
1421             WHERE session_id = $1
1422               AND available_at_ms <= $2
1423               AND (claim_token IS NULL OR claim_expires_at_ms <= $2)
1424             ORDER BY enqueue_seq ASC
1425             LIMIT $3
1426             FOR UPDATE SKIP LOCKED",
1427        )
1428        .bind(session_id)
1429        .bind(now as i64)
1430        .bind((max_batches as i64).saturating_add(32))
1431        .fetch_all(&mut *tx)
1432        .await
1433        .map_err(store_sqlx_error)?;
1434        let mut candidates = Vec::new();
1435        for row in rows {
1436            candidates.push(queued_batch_row(row)?);
1437        }
1438        let Some(first_row) = candidates.first() else {
1439            tx.commit().await.map_err(store_sqlx_error)?;
1440            return Ok(None);
1441        };
1442        if boundary == QueuedWorkClaimBoundary::ActiveTurnCheckpoint
1443            && first_row.delivery_policy != DeliveryPolicy::EarliestSafeBoundary
1444        {
1445            tx.commit().await.map_err(store_sqlx_error)?;
1446            return Ok(None);
1447        }
1448        let first_slot = first_row.slot_policy;
1449        let first_delivery = first_row.delivery_policy;
1450        let first_merge = first_row.merge_key.clone();
1451        let mut selected = Vec::new();
1452        for row in candidates {
1453            if selected.len() >= max_batches {
1454                break;
1455            }
1456            if selected.is_empty() {
1457                selected.push(row);
1458                if first_slot == SlotPolicy::Exclusive {
1459                    break;
1460                }
1461                continue;
1462            }
1463            if first_slot != SlotPolicy::Join
1464                || row.slot_policy != SlotPolicy::Join
1465                || row.delivery_policy != first_delivery
1466                || row.merge_key != first_merge
1467            {
1468                break;
1469            }
1470            selected.push(row);
1471        }
1472        let Some(first) = selected.first() else {
1473            tx.commit().await.map_err(store_sqlx_error)?;
1474            return Ok(None);
1475        };
1476        let fencing_token = first.claim_fencing_token.saturating_add(1);
1477        let claim_id = format!("qwc:{}:{fencing_token}", first.enqueue_seq);
1478        let lease_token = format!(
1479            "{:x}",
1480            Sha256::digest(format!("{session_id}:{owner_id}:{claim_id}:{now}").as_bytes())
1481        );
1482        let expires_at = now.saturating_add(lease_ttl_ms);
1483        for row in &selected {
1484            let changed = sqlx::query(
1485                "UPDATE lash_queued_work_batches
1486                 SET claim_id = $3,
1487                     claim_owner_id = $4,
1488                     claim_token = $5,
1489                     claim_fencing_token = claim_fencing_token + 1,
1490                     claim_claimed_at_ms = $6,
1491                     claim_expires_at_ms = $7
1492                 WHERE session_id = $1
1493                   AND batch_id = $2
1494                   AND (claim_token IS NULL OR claim_expires_at_ms <= $6)",
1495            )
1496            .bind(session_id)
1497            .bind(&row.batch_id)
1498            .bind(&claim_id)
1499            .bind(owner_id)
1500            .bind(&lease_token)
1501            .bind(now as i64)
1502            .bind(expires_at as i64)
1503            .execute(&mut *tx)
1504            .await
1505            .map_err(store_sqlx_error)?
1506            .rows_affected();
1507            if changed == 0 {
1508                tx.rollback().await.map_err(store_sqlx_error)?;
1509                return Ok(None);
1510            }
1511        }
1512        let mut batches = Vec::new();
1513        for row in selected {
1514            batches.push(queued_work_batch_from_row(&mut tx, row).await?);
1515        }
1516        tx.commit().await.map_err(store_sqlx_error)?;
1517        Ok(Some(QueuedWorkClaim {
1518            session_id: session_id.to_string(),
1519            claim_id,
1520            owner_id: owner_id.to_string(),
1521            lease_token,
1522            fencing_token,
1523            claimed_at_epoch_ms: now,
1524            expires_at_epoch_ms: expires_at,
1525            batches,
1526        }))
1527    }
1528
1529    async fn renew_queued_work_claim(
1530        &self,
1531        claim: &QueuedWorkClaim,
1532        lease_ttl_ms: u64,
1533    ) -> Result<QueuedWorkClaim, StoreError> {
1534        let expires_at = current_epoch_ms().saturating_add(lease_ttl_ms);
1535        let changed = sqlx::query(
1536            "UPDATE lash_queued_work_batches
1537             SET claim_expires_at_ms = $4
1538             WHERE session_id = $1 AND claim_id = $2 AND claim_token = $3",
1539        )
1540        .bind(&claim.session_id)
1541        .bind(&claim.claim_id)
1542        .bind(&claim.lease_token)
1543        .bind(expires_at as i64)
1544        .execute(&self.pool)
1545        .await
1546        .map_err(store_sqlx_error)?
1547        .rows_affected();
1548        if changed as usize != claim.batches.len() {
1549            return Err(StoreError::QueuedWorkClaimExpired {
1550                session_id: claim.session_id.clone(),
1551                claim_id: claim.claim_id.clone(),
1552            });
1553        }
1554        Ok(QueuedWorkClaim {
1555            expires_at_epoch_ms: expires_at,
1556            ..claim.clone()
1557        })
1558    }
1559
1560    async fn abandon_queued_work_claim(&self, claim: &QueuedWorkClaim) -> Result<(), StoreError> {
1561        sqlx::query(
1562            "UPDATE lash_queued_work_batches
1563             SET claim_id = NULL,
1564                 claim_owner_id = NULL,
1565                 claim_token = NULL,
1566                 claim_claimed_at_ms = 0,
1567                 claim_expires_at_ms = 0
1568             WHERE session_id = $1 AND claim_id = $2 AND claim_token = $3",
1569        )
1570        .bind(&claim.session_id)
1571        .bind(&claim.claim_id)
1572        .bind(&claim.lease_token)
1573        .execute(&self.pool)
1574        .await
1575        .map_err(store_sqlx_error)?;
1576        Ok(())
1577    }
1578
1579    async fn cancel_queued_work_batch(
1580        &self,
1581        session_id: &str,
1582        batch_id: &str,
1583    ) -> Result<Option<QueuedWorkBatch>, StoreError> {
1584        let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1585        let now = current_epoch_ms();
1586        let row = sqlx::query(
1587            "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1588                    slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1589                    claim_fencing_token
1590             FROM lash_queued_work_batches
1591             WHERE session_id = $1
1592               AND batch_id = $2
1593               AND (claim_token IS NULL OR claim_expires_at_ms <= $3)
1594             FOR UPDATE",
1595        )
1596        .bind(session_id)
1597        .bind(batch_id)
1598        .bind(now as i64)
1599        .fetch_optional(&mut *tx)
1600        .await
1601        .map_err(store_sqlx_error)?;
1602        let Some(row) = row else {
1603            tx.commit().await.map_err(store_sqlx_error)?;
1604            return Ok(None);
1605        };
1606        let batch = queued_work_batch_from_row(&mut tx, queued_batch_row(row)?).await?;
1607        sqlx::query("DELETE FROM lash_queued_work_batches WHERE batch_id = $1")
1608            .bind(batch_id)
1609            .execute(&mut *tx)
1610            .await
1611            .map_err(store_sqlx_error)?;
1612        tx.commit().await.map_err(store_sqlx_error)?;
1613        Ok(Some(batch))
1614    }
1615
1616    async fn list_queued_work(&self, session_id: &str) -> Result<Vec<QueuedWorkBatch>, StoreError> {
1617        let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1618        let rows = sqlx::query(
1619            "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1620                    slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1621                    claim_fencing_token
1622             FROM lash_queued_work_batches
1623             WHERE session_id = $1
1624             ORDER BY enqueue_seq ASC",
1625        )
1626        .bind(session_id)
1627        .fetch_all(&mut *tx)
1628        .await
1629        .map_err(store_sqlx_error)?;
1630        let mut batches = Vec::new();
1631        for row in rows {
1632            batches.push(queued_work_batch_from_row(&mut tx, queued_batch_row(row)?).await?);
1633        }
1634        tx.commit().await.map_err(store_sqlx_error)?;
1635        Ok(batches)
1636    }
1637
1638    async fn list_pending_queued_work(
1639        &self,
1640        session_id: &str,
1641    ) -> Result<Vec<QueuedWorkBatch>, StoreError> {
1642        let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1643        let now = current_epoch_ms();
1644        let rows = sqlx::query(
1645            "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1646                    slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1647                    claim_fencing_token
1648             FROM lash_queued_work_batches
1649             WHERE session_id = $1
1650               AND (claim_token IS NULL OR claim_expires_at_ms <= $2)
1651             ORDER BY enqueue_seq ASC",
1652        )
1653        .bind(session_id)
1654        .bind(now as i64)
1655        .fetch_all(&mut *tx)
1656        .await
1657        .map_err(store_sqlx_error)?;
1658        let mut batches = Vec::new();
1659        for row in rows {
1660            batches.push(queued_work_batch_from_row(&mut tx, queued_batch_row(row)?).await?);
1661        }
1662        tx.commit().await.map_err(store_sqlx_error)?;
1663        Ok(batches)
1664    }
1665
1666    async fn save_session_meta(&self, meta: SessionMeta) -> Result<(), StoreError> {
1667        sqlx::query(
1668            "INSERT INTO lash_session_meta (session_id, meta_json)
1669             VALUES ($1, $2)
1670             ON CONFLICT (session_id) DO UPDATE SET meta_json = EXCLUDED.meta_json",
1671        )
1672        .bind(&meta.session_id)
1673        .bind(encode_json(&meta))
1674        .execute(&self.pool)
1675        .await
1676        .map_err(store_sqlx_error)?;
1677        Ok(())
1678    }
1679
1680    async fn load_session_meta(&self) -> Result<Option<SessionMeta>, StoreError> {
1681        let json: Option<String> = if let Some(session_id) = &self.session_id {
1682            sqlx::query_scalar("SELECT meta_json FROM lash_session_meta WHERE session_id = $1")
1683                .bind(session_id)
1684                .fetch_optional(&self.pool)
1685                .await
1686                .map_err(store_sqlx_error)?
1687        } else {
1688            sqlx::query_scalar(
1689                "SELECT meta_json FROM lash_session_meta ORDER BY session_id ASC LIMIT 1",
1690            )
1691            .fetch_optional(&self.pool)
1692            .await
1693            .map_err(store_sqlx_error)?
1694        };
1695        json.map(|json| store_decode_json(&json, "session meta"))
1696            .transpose()
1697    }
1698
1699    async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), StoreError> {
1700        for id in ids {
1701            if let Some(session_id) = &self.session_id {
1702                sqlx::query(
1703                    "UPDATE lash_graph_nodes
1704                     SET tombstoned = TRUE
1705                     WHERE session_id = $1 AND node_id = $2",
1706                )
1707                .bind(session_id)
1708                .bind(id)
1709                .execute(&self.pool)
1710                .await
1711                .map_err(store_sqlx_error)?;
1712            } else {
1713                sqlx::query(
1714                    "UPDATE lash_graph_nodes
1715                     SET tombstoned = TRUE
1716                     WHERE node_id = $1",
1717                )
1718                .bind(id)
1719                .execute(&self.pool)
1720                .await
1721                .map_err(store_sqlx_error)?;
1722            }
1723        }
1724        Ok(())
1725    }
1726
1727    async fn vacuum(&self) -> Result<VacuumReport, StoreError> {
1728        let removed = if let Some(session_id) = &self.session_id {
1729            sqlx::query("DELETE FROM lash_graph_nodes WHERE session_id = $1 AND tombstoned = TRUE")
1730                .bind(session_id)
1731                .execute(&self.pool)
1732                .await
1733                .map_err(store_sqlx_error)?
1734                .rows_affected()
1735        } else {
1736            sqlx::query("DELETE FROM lash_graph_nodes WHERE tombstoned = TRUE")
1737                .execute(&self.pool)
1738                .await
1739                .map_err(store_sqlx_error)?
1740                .rows_affected()
1741        };
1742        Ok(VacuumReport {
1743            removed_node_count: removed as usize,
1744        })
1745    }
1746
1747    async fn gc_unreachable(&self) -> Result<GcReport, StoreError> {
1748        Ok(GcReport::default())
1749    }
1750}
1751
1752fn process_status_label(record: &ProcessRecord) -> &'static str {
1753    record.status.label()
1754}
1755
1756async fn load_process_tx(
1757    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1758    process_id: &str,
1759) -> Result<Option<ProcessRecord>, PluginError> {
1760    let json: Option<String> = sqlx::query_scalar(
1761        "SELECT record_json
1762             FROM lash_processes
1763             WHERE process_id = $1
1764             FOR UPDATE",
1765    )
1766    .bind(process_id)
1767    .fetch_optional(&mut **tx)
1768    .await
1769    .map_err(plugin_sqlx_error)?;
1770    json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
1771        .transpose()
1772}
1773
1774async fn load_process(
1775    pool: &PgPool,
1776    process_id: &str,
1777) -> Result<Option<ProcessRecord>, PluginError> {
1778    let json: Option<String> =
1779        sqlx::query_scalar("SELECT record_json FROM lash_processes WHERE process_id = $1")
1780            .bind(process_id)
1781            .fetch_optional(pool)
1782            .await
1783            .map_err(plugin_sqlx_error)?;
1784    json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
1785        .transpose()
1786}
1787
1788async fn save_process_tx(
1789    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1790    record: &ProcessRecord,
1791) -> Result<(), PluginError> {
1792    sqlx::query(
1793        "UPDATE lash_processes
1794         SET updated_at_ms = $2, status = $3, record_json = $4
1795         WHERE process_id = $1",
1796    )
1797    .bind(&record.id)
1798    .bind(record.updated_at_ms as i64)
1799    .bind(process_status_label(record))
1800    .bind(serde_json::to_string(record).map_err(process_decode_error)?)
1801    .execute(&mut **tx)
1802    .await
1803    .map_err(plugin_sqlx_error)?;
1804    Ok(())
1805}
1806
1807async fn load_event_by_key_tx(
1808    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1809    process_id: &str,
1810    replay_key: &str,
1811) -> Result<Option<(String, ProcessEvent)>, PluginError> {
1812    let row = sqlx::query(
1813        "SELECT payload_hash, event_json
1814         FROM lash_process_events
1815         WHERE process_id = $1 AND idempotency_key = $2",
1816    )
1817    .bind(process_id)
1818    .bind(replay_key)
1819    .fetch_optional(&mut **tx)
1820    .await
1821    .map_err(plugin_sqlx_error)?;
1822    row.map(|row| {
1823        let hash: String = row.get(0);
1824        let json: String = row.get(1);
1825        serde_json::from_str(&json)
1826            .map(|event| (hash, event))
1827            .map_err(process_decode_error)
1828    })
1829    .transpose()
1830}
1831
1832async fn load_process_lease_tx(
1833    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1834    process_id: &str,
1835) -> Result<Option<ProcessLease>, PluginError> {
1836    let row = sqlx::query(
1837        "SELECT lease_owner_id, lease_token, lease_fencing_token,
1838                lease_claimed_at_ms, lease_expires_at_ms
1839         FROM lash_process_leases
1840         WHERE process_id = $1",
1841    )
1842    .bind(process_id)
1843    .fetch_optional(&mut **tx)
1844    .await
1845    .map_err(plugin_sqlx_error)?;
1846    let Some(row) = row else {
1847        return Ok(None);
1848    };
1849    let owner_id: Option<String> = row.get(0);
1850    let lease_token: Option<String> = row.get(1);
1851    let (Some(owner_id), Some(lease_token)) = (owner_id, lease_token) else {
1852        return Ok(None);
1853    };
1854    Ok(Some(ProcessLease {
1855        schema_version: PROCESS_LEASE_SCHEMA_VERSION,
1856        process_id: process_id.to_string(),
1857        owner_id,
1858        lease_token,
1859        fencing_token: row.get::<i64, _>(2) as u64,
1860        claimed_at_epoch_ms: row.get::<i64, _>(3) as u64,
1861        expires_at_epoch_ms: row.get::<i64, _>(4) as u64,
1862    }))
1863}
1864
1865fn process_lease_conflict(process_id: &str, current: &ProcessLease) -> PluginError {
1866    PluginError::Session(format!(
1867        "process `{process_id}` is already leased by `{}` until {}",
1868        current.owner_id, current.expires_at_epoch_ms
1869    ))
1870}
1871
1872fn process_lease_expired(process_id: &str) -> PluginError {
1873    PluginError::Session(format!(
1874        "process lease for `{process_id}` is missing or expired"
1875    ))
1876}
1877
1878fn guard_lease(current: Option<&ProcessLease>, lease_token: &str, now: u64) -> bool {
1879    current
1880        .map(|current| current.lease_token == lease_token && current.expires_at_epoch_ms > now)
1881        .unwrap_or(false)
1882}
1883
1884async fn list_grants_for_scope(
1885    pool: &PgPool,
1886    owner_scope: &ProcessScope,
1887    live_only: bool,
1888) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
1889    let status_clause = if live_only {
1890        "AND p.status = 'running'"
1891    } else {
1892        ""
1893    };
1894    let sql = format!(
1895        "SELECT g.process_id, g.descriptor_json, p.record_json
1896         FROM lash_process_handle_grants g
1897         JOIN lash_processes p ON p.process_id = g.process_id
1898         WHERE g.scope_id = $1 {status_clause}
1899         ORDER BY g.process_id ASC"
1900    );
1901    let rows = sqlx::query(&sql)
1902        .bind(owner_scope.id().as_str())
1903        .fetch_all(pool)
1904        .await
1905        .map_err(plugin_sqlx_error)?;
1906    let mut entries = Vec::new();
1907    for row in rows {
1908        let process_id: String = row.get(0);
1909        let descriptor_json: String = row.get(1);
1910        let record_json: String = row.get(2);
1911        let descriptor: ProcessHandleDescriptor =
1912            serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
1913        let record: ProcessRecord =
1914            serde_json::from_str(&record_json).map_err(process_decode_error)?;
1915        entries.push((
1916            ProcessHandleGrant {
1917                session_id: owner_scope.session_id.clone(),
1918                process_id,
1919                descriptor,
1920            },
1921            record,
1922        ));
1923    }
1924    Ok(entries)
1925}
1926
1927#[async_trait::async_trait]
1928impl ProcessRegistry for PostgresProcessRegistry {
1929    fn durability_tier(&self) -> DurabilityTier {
1930        DurabilityTier::Durable
1931    }
1932
1933    async fn register_process(
1934        &self,
1935        registration: ProcessRegistration,
1936    ) -> Result<ProcessRecord, PluginError> {
1937        let (registration, registration_hash) =
1938            lash_core::runtime::prepare_process_registration(registration)?;
1939        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
1940        if let Some(existing) = load_process_tx(&mut tx, &registration.id).await? {
1941            if existing.registration_hash == registration_hash {
1942                tx.commit().await.map_err(plugin_sqlx_error)?;
1943                return Ok(existing);
1944            }
1945            return Err(PluginError::Session(format!(
1946                "process `{}` registration hash conflict: existing {}, new {}",
1947                registration.id, existing.registration_hash, registration_hash
1948            )));
1949        }
1950        let now = current_epoch_ms();
1951        let record =
1952            ProcessRecord::from_prepared_registration(registration, registration_hash, now);
1953        let record_json = serde_json::to_string(&record).map_err(process_decode_error)?;
1954        sqlx::query(
1955            "INSERT INTO lash_processes (
1956                process_id, registration_hash, owner_scope_id, host_profile_id,
1957                created_at_ms, updated_at_ms, status, record_json
1958             )
1959             VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
1960        )
1961        .bind(&record.id)
1962        .bind(&record.registration_hash)
1963        .bind(record.owner_scope_id().as_str())
1964        .bind(record.host_profile_id())
1965        .bind(record.created_at_ms as i64)
1966        .bind(record.updated_at_ms as i64)
1967        .bind(process_status_label(&record))
1968        .bind(record_json)
1969        .execute(&mut *tx)
1970        .await
1971        .map_err(plugin_sqlx_error)?;
1972        tx.commit().await.map_err(plugin_sqlx_error)?;
1973        Ok(record)
1974    }
1975
1976    async fn set_external_ref(
1977        &self,
1978        process_id: &str,
1979        external_ref: ProcessExternalRef,
1980    ) -> Result<ProcessRecord, PluginError> {
1981        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
1982        let mut record = load_process_tx(&mut tx, process_id)
1983            .await?
1984            .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
1985        record.external_ref = Some(external_ref);
1986        record.updated_at_ms = current_epoch_ms();
1987        save_process_tx(&mut tx, &record).await?;
1988        tx.commit().await.map_err(plugin_sqlx_error)?;
1989        Ok(record)
1990    }
1991
1992    async fn grant_handle(
1993        &self,
1994        owner_scope: &ProcessScope,
1995        process_id: &str,
1996        descriptor: ProcessHandleDescriptor,
1997    ) -> Result<ProcessHandleGrant, PluginError> {
1998        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
1999        if load_process_tx(&mut tx, process_id).await?.is_none() {
2000            return Err(PluginError::Session(format!(
2001                "unknown process `{process_id}`"
2002            )));
2003        }
2004        sqlx::query(
2005            "INSERT INTO lash_process_handle_grants (session_id, scope_id, process_id, descriptor_json)
2006             VALUES ($1, $2, $3, $4)
2007             ON CONFLICT (scope_id, process_id) DO UPDATE SET
2008                session_id = EXCLUDED.session_id,
2009                descriptor_json = EXCLUDED.descriptor_json",
2010        )
2011        .bind(&owner_scope.session_id)
2012        .bind(owner_scope.id().as_str())
2013        .bind(process_id)
2014        .bind(serde_json::to_string(&descriptor).map_err(process_decode_error)?)
2015        .execute(&mut *tx)
2016        .await
2017        .map_err(plugin_sqlx_error)?;
2018        tx.commit().await.map_err(plugin_sqlx_error)?;
2019        Ok(ProcessHandleGrant {
2020            session_id: owner_scope.session_id.clone(),
2021            process_id: process_id.to_string(),
2022            descriptor,
2023        })
2024    }
2025
2026    async fn revoke_handle(
2027        &self,
2028        owner_scope: &ProcessScope,
2029        process_id: &str,
2030    ) -> Result<(), PluginError> {
2031        sqlx::query(
2032            "DELETE FROM lash_process_handle_grants WHERE scope_id = $1 AND process_id = $2",
2033        )
2034        .bind(owner_scope.id().as_str())
2035        .bind(process_id)
2036        .execute(&self.pool)
2037        .await
2038        .map_err(plugin_sqlx_error)?;
2039        Ok(())
2040    }
2041
2042    async fn transfer_handle_grants(
2043        &self,
2044        from_scope: &ProcessScope,
2045        to_scope: &ProcessScope,
2046        process_ids: &[String],
2047    ) -> Result<(), PluginError> {
2048        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2049        for process_id in process_ids {
2050            let descriptor_json: Option<String> = sqlx::query_scalar(
2051                "SELECT descriptor_json FROM lash_process_handle_grants
2052                 WHERE scope_id = $1 AND process_id = $2",
2053            )
2054            .bind(from_scope.id().as_str())
2055            .bind(process_id)
2056            .fetch_optional(&mut *tx)
2057            .await
2058            .map_err(plugin_sqlx_error)?;
2059            let Some(descriptor_json) = descriptor_json else {
2060                return Err(PluginError::Session(format!(
2061                    "process handle `{process_id}` is not granted to session `{}`",
2062                    from_scope.session_id
2063                )));
2064            };
2065            sqlx::query(
2066                "DELETE FROM lash_process_handle_grants WHERE scope_id = $1 AND process_id = $2",
2067            )
2068            .bind(from_scope.id().as_str())
2069            .bind(process_id)
2070            .execute(&mut *tx)
2071            .await
2072            .map_err(plugin_sqlx_error)?;
2073            sqlx::query(
2074                "INSERT INTO lash_process_handle_grants (session_id, scope_id, process_id, descriptor_json)
2075                 VALUES ($1, $2, $3, $4)
2076                 ON CONFLICT (scope_id, process_id) DO UPDATE SET
2077                    session_id = EXCLUDED.session_id,
2078                    descriptor_json = EXCLUDED.descriptor_json",
2079            )
2080            .bind(&to_scope.session_id)
2081            .bind(to_scope.id().as_str())
2082            .bind(process_id)
2083            .bind(descriptor_json)
2084            .execute(&mut *tx)
2085            .await
2086            .map_err(plugin_sqlx_error)?;
2087        }
2088        tx.commit().await.map_err(plugin_sqlx_error)
2089    }
2090
2091    async fn list_handle_grants(
2092        &self,
2093        owner_scope: &ProcessScope,
2094    ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
2095        list_grants_for_scope(&self.pool, owner_scope, false).await
2096    }
2097
2098    async fn list_live_handle_grants(
2099        &self,
2100        owner_scope: &ProcessScope,
2101    ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
2102        list_grants_for_scope(&self.pool, owner_scope, true).await
2103    }
2104
2105    async fn has_handle_grant(
2106        &self,
2107        owner_scope: &ProcessScope,
2108        process_id: &str,
2109    ) -> Result<bool, PluginError> {
2110        let exists: Option<i64> = sqlx::query_scalar(
2111            "SELECT 1::BIGINT FROM lash_process_handle_grants
2112             WHERE scope_id = $1 AND process_id = $2
2113             LIMIT 1",
2114        )
2115        .bind(owner_scope.id().as_str())
2116        .bind(process_id)
2117        .fetch_optional(&self.pool)
2118        .await
2119        .map_err(plugin_sqlx_error)?;
2120        Ok(exists.is_some())
2121    }
2122
2123    async fn handle_grants_for_process(
2124        &self,
2125        process_id: &str,
2126    ) -> Result<Vec<ProcessHandleGrant>, PluginError> {
2127        if load_process(&self.pool, process_id).await?.is_none() {
2128            return Err(PluginError::Session(format!(
2129                "unknown process `{process_id}`"
2130            )));
2131        }
2132        let rows = sqlx::query(
2133            "SELECT session_id, descriptor_json
2134             FROM lash_process_handle_grants
2135             WHERE process_id = $1
2136             ORDER BY session_id ASC, scope_id ASC",
2137        )
2138        .bind(process_id)
2139        .fetch_all(&self.pool)
2140        .await
2141        .map_err(plugin_sqlx_error)?;
2142        let mut grants = Vec::new();
2143        for row in rows {
2144            let descriptor_json: String = row.get(1);
2145            grants.push(ProcessHandleGrant {
2146                session_id: row.get(0),
2147                process_id: process_id.to_string(),
2148                descriptor: serde_json::from_str(&descriptor_json).map_err(process_decode_error)?,
2149            });
2150        }
2151        Ok(grants)
2152    }
2153
2154    async fn delete_session_process_state(
2155        &self,
2156        session_id: &str,
2157    ) -> Result<lash_core::ProcessSessionDeleteReport, PluginError> {
2158        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2159        let rows = sqlx::query(
2160            "SELECT g.process_id, p.record_json
2161             FROM lash_process_handle_grants g
2162             JOIN lash_processes p ON p.process_id = g.process_id
2163             WHERE g.session_id = $1
2164             ORDER BY g.process_id ASC",
2165        )
2166        .bind(session_id)
2167        .fetch_all(&mut *tx)
2168        .await
2169        .map_err(plugin_sqlx_error)?;
2170        let mut removed = Vec::new();
2171        for row in rows {
2172            let process_id: String = row.get(0);
2173            let record_json: String = row.get(1);
2174            let record: ProcessRecord =
2175                serde_json::from_str(&record_json).map_err(process_decode_error)?;
2176            removed.push((process_id, record));
2177        }
2178        let revoked = sqlx::query("DELETE FROM lash_process_handle_grants WHERE session_id = $1")
2179            .bind(session_id)
2180            .execute(&mut *tx)
2181            .await
2182            .map_err(plugin_sqlx_error)?
2183            .rows_affected() as usize;
2184        let mut cancel_process_ids = Vec::new();
2185        let mut preserved_process_ids = Vec::new();
2186        for (process_id, record) in removed {
2187            if record.is_terminal() {
2188                continue;
2189            }
2190            let remaining: i64 = sqlx::query_scalar(
2191                "SELECT COUNT(*) FROM lash_process_handle_grants WHERE process_id = $1",
2192            )
2193            .bind(&process_id)
2194            .fetch_one(&mut *tx)
2195            .await
2196            .map_err(plugin_sqlx_error)?;
2197            if remaining == 0 {
2198                cancel_process_ids.push(process_id);
2199            } else {
2200                preserved_process_ids.push(process_id);
2201            }
2202        }
2203        tx.commit().await.map_err(plugin_sqlx_error)?;
2204        cancel_process_ids.sort();
2205        cancel_process_ids.dedup();
2206        preserved_process_ids.sort();
2207        preserved_process_ids.dedup();
2208        Ok(lash_core::ProcessSessionDeleteReport {
2209            session_id: session_id.to_string(),
2210            revoked_handle_count: revoked,
2211            deleted_wake_count: 0,
2212            cancel_process_ids,
2213            preserved_process_ids,
2214        })
2215    }
2216
2217    async fn append_event(
2218        &self,
2219        process_id: &str,
2220        request: ProcessEventAppendRequest,
2221    ) -> Result<ProcessEventAppendResult, PluginError> {
2222        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2223        let mut record = load_process_tx(&mut tx, process_id)
2224            .await?
2225            .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
2226        let replay_lookup =
2227            if let Some(replay_key) = request.replay.as_ref().map(|r| r.key.as_str()) {
2228                load_event_by_key_tx(&mut tx, process_id, replay_key).await?
2229            } else {
2230                None
2231            };
2232        let sequence: i64 = sqlx::query_scalar(
2233            "SELECT COALESCE(MAX(sequence), 0) + 1 FROM lash_process_events WHERE process_id = $1",
2234        )
2235        .bind(process_id)
2236        .fetch_one(&mut *tx)
2237        .await
2238        .map_err(plugin_sqlx_error)?;
2239        let occurred_at_ms = current_epoch_ms();
2240        let prepared = lash_core::runtime::prepare_process_event_append(
2241            &record,
2242            request,
2243            sequence as u64,
2244            replay_lookup,
2245            occurred_at_ms,
2246        )?;
2247        if prepared.replayed {
2248            let repaired = if let Some(status) = prepared.status_update.clone() {
2249                record.status = status;
2250                record.updated_at_ms = prepared.occurred_at_ms;
2251                save_process_tx(&mut tx, &record).await?;
2252                true
2253            } else {
2254                false
2255            };
2256            tx.commit().await.map_err(plugin_sqlx_error)?;
2257            if repaired {
2258                self.notify.notify_waiters();
2259            }
2260            return Ok(ProcessEventAppendResult {
2261                event: prepared.event,
2262                wake_delivery: prepared.wake_delivery,
2263            });
2264        }
2265        let event = prepared.event;
2266        sqlx::query(
2267            "INSERT INTO lash_process_events (
2268                process_id, sequence, event_type, payload_hash, idempotency_key,
2269                occurred_at_ms, event_json
2270             )
2271             VALUES ($1, $2, $3, $4, $5, $6, $7)",
2272        )
2273        .bind(process_id)
2274        .bind(sequence)
2275        .bind(event.event_type.as_str())
2276        .bind(&prepared.payload_hash)
2277        .bind(event.invocation.replay_key())
2278        .bind(prepared.occurred_at_ms as i64)
2279        .bind(serde_json::to_string(&event).map_err(process_decode_error)?)
2280        .execute(&mut *tx)
2281        .await
2282        .map_err(plugin_sqlx_error)?;
2283        if let Some(status) = prepared.status_update.clone() {
2284            record.status = status;
2285        }
2286        record.updated_at_ms = prepared.occurred_at_ms;
2287        save_process_tx(&mut tx, &record).await?;
2288        tx.commit().await.map_err(plugin_sqlx_error)?;
2289        self.notify.notify_waiters();
2290        Ok(ProcessEventAppendResult {
2291            event,
2292            wake_delivery: prepared.wake_delivery,
2293        })
2294    }
2295
2296    async fn events_after(
2297        &self,
2298        process_id: &str,
2299        after_sequence: u64,
2300    ) -> Result<Vec<ProcessEvent>, PluginError> {
2301        if load_process(&self.pool, process_id).await?.is_none() {
2302            return Err(PluginError::Session(format!(
2303                "unknown process `{process_id}`"
2304            )));
2305        }
2306        let rows = sqlx::query(
2307            "SELECT event_json FROM lash_process_events
2308             WHERE process_id = $1 AND sequence > $2
2309             ORDER BY sequence ASC",
2310        )
2311        .bind(process_id)
2312        .bind(after_sequence as i64)
2313        .fetch_all(&self.pool)
2314        .await
2315        .map_err(plugin_sqlx_error)?;
2316        let mut events = Vec::new();
2317        for row in rows {
2318            let json: String = row.get(0);
2319            events.push(serde_json::from_str(&json).map_err(process_decode_error)?);
2320        }
2321        Ok(events)
2322    }
2323
2324    async fn wake_events_after(
2325        &self,
2326        process_id: &str,
2327        after_sequence: u64,
2328    ) -> Result<Vec<ProcessEvent>, PluginError> {
2329        let rows = sqlx::query("SELECT sequence FROM lash_process_wake_acks WHERE process_id = $1")
2330            .bind(process_id)
2331            .fetch_all(&self.pool)
2332            .await
2333            .map_err(plugin_sqlx_error)?;
2334        let acked = rows
2335            .into_iter()
2336            .map(|row| row.get::<i64, _>(0) as u64)
2337            .collect::<HashSet<_>>();
2338        Ok(self
2339            .events_after(process_id, after_sequence)
2340            .await?
2341            .into_iter()
2342            .filter(|event| event.semantics.wake.is_some() && !acked.contains(&event.sequence))
2343            .collect())
2344    }
2345
2346    async fn wait_event_after(
2347        &self,
2348        process_id: &str,
2349        event_type: &str,
2350        after_sequence: u64,
2351    ) -> Result<ProcessEvent, PluginError> {
2352        loop {
2353            if let Some(event) = self
2354                .events_after(process_id, after_sequence)
2355                .await?
2356                .into_iter()
2357                .find(|event| event.event_type == event_type)
2358            {
2359                return Ok(event);
2360            }
2361            tokio::select! {
2362                _ = self.notify.notified() => {}
2363                _ = tokio::time::sleep(Duration::from_millis(50)) => {}
2364            }
2365        }
2366    }
2367
2368    async fn await_process(&self, process_id: &str) -> Result<ProcessAwaitOutput, PluginError> {
2369        loop {
2370            let record = load_process(&self.pool, process_id)
2371                .await?
2372                .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
2373            if let Some(await_output) = record.status.await_output() {
2374                return Ok(await_output.clone());
2375            }
2376            tokio::select! {
2377                _ = self.notify.notified() => {}
2378                _ = tokio::time::sleep(Duration::from_millis(50)) => {}
2379            }
2380        }
2381    }
2382
2383    async fn complete_process(
2384        &self,
2385        process_id: &str,
2386        await_output: ProcessAwaitOutput,
2387    ) -> Result<ProcessRecord, PluginError> {
2388        let event_type = match await_output.terminal_state() {
2389            lash_core::ProcessTerminalState::Completed => "process.completed",
2390            lash_core::ProcessTerminalState::Failed => "process.failed",
2391            lash_core::ProcessTerminalState::Cancelled => "process.cancelled",
2392        };
2393        self.append_event(
2394            process_id,
2395            ProcessEventAppendRequest::new(
2396                event_type,
2397                serde_json::json!({ "await_output": await_output }),
2398            )
2399            .with_replay_key(format!("process:{process_id}:terminal:{event_type}")),
2400        )
2401        .await?;
2402        load_process(&self.pool, process_id).await?.ok_or_else(|| {
2403            PluginError::Session(format!(
2404                "unknown process `{process_id}` after terminal event"
2405            ))
2406        })
2407    }
2408
2409    async fn get_process(&self, process_id: &str) -> Option<ProcessRecord> {
2410        load_process(&self.pool, process_id).await.ok().flatten()
2411    }
2412
2413    async fn ack_wake(&self, process_id: &str, sequence: u64) -> Result<(), PluginError> {
2414        if load_process(&self.pool, process_id).await?.is_none() {
2415            return Err(PluginError::Session(format!(
2416                "unknown process `{process_id}`"
2417            )));
2418        }
2419        sqlx::query(
2420            "INSERT INTO lash_process_wake_acks (process_id, sequence)
2421             VALUES ($1, $2)
2422             ON CONFLICT DO NOTHING",
2423        )
2424        .bind(process_id)
2425        .bind(sequence as i64)
2426        .execute(&self.pool)
2427        .await
2428        .map_err(plugin_sqlx_error)?;
2429        Ok(())
2430    }
2431
2432    async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, PluginError> {
2433        let rows = sqlx::query(
2434            "SELECT record_json FROM lash_processes
2435             WHERE status = 'running'
2436             ORDER BY process_id ASC",
2437        )
2438        .fetch_all(&self.pool)
2439        .await
2440        .map_err(plugin_sqlx_error)?;
2441        let mut records = Vec::new();
2442        for row in rows {
2443            let json: String = row.get(0);
2444            records.push(serde_json::from_str(&json).map_err(process_decode_error)?);
2445        }
2446        Ok(records)
2447    }
2448
2449    async fn claim_process_lease(
2450        &self,
2451        process_id: &str,
2452        owner_id: &str,
2453        lease_ttl_ms: u64,
2454    ) -> Result<ProcessLease, PluginError> {
2455        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2456        if load_process_tx(&mut tx, process_id).await?.is_none() {
2457            return Err(PluginError::Session(format!(
2458                "unknown process `{process_id}`"
2459            )));
2460        }
2461        let now = current_epoch_ms();
2462        let current = load_process_lease_tx(&mut tx, process_id).await?;
2463        if let Some(current) = current.as_ref()
2464            && current.expires_at_epoch_ms > now
2465            && current.owner_id != owner_id
2466        {
2467            return Err(process_lease_conflict(process_id, current));
2468        }
2469        let existing_fence: Option<i64> = sqlx::query_scalar(
2470            "SELECT lease_fencing_token FROM lash_process_leases WHERE process_id = $1 FOR UPDATE",
2471        )
2472        .bind(process_id)
2473        .fetch_optional(&mut *tx)
2474        .await
2475        .map_err(plugin_sqlx_error)?;
2476        let fencing_token = existing_fence.unwrap_or(0) as u64 + 1;
2477        let lease = ProcessLease {
2478            schema_version: PROCESS_LEASE_SCHEMA_VERSION,
2479            process_id: process_id.to_string(),
2480            owner_id: owner_id.to_string(),
2481            lease_token: format!(
2482                "{:x}",
2483                Sha256::digest(format!("{process_id}:{owner_id}:{now}:{fencing_token}").as_bytes())
2484            ),
2485            fencing_token,
2486            claimed_at_epoch_ms: now,
2487            expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
2488        };
2489        sqlx::query(
2490            "INSERT INTO lash_process_leases (
2491                process_id, lease_owner_id, lease_token, lease_fencing_token,
2492                lease_claimed_at_ms, lease_expires_at_ms
2493             )
2494             VALUES ($1, $2, $3, $4, $5, $6)
2495             ON CONFLICT (process_id) DO UPDATE SET
2496                lease_owner_id = EXCLUDED.lease_owner_id,
2497                lease_token = EXCLUDED.lease_token,
2498                lease_fencing_token = EXCLUDED.lease_fencing_token,
2499                lease_claimed_at_ms = EXCLUDED.lease_claimed_at_ms,
2500                lease_expires_at_ms = EXCLUDED.lease_expires_at_ms",
2501        )
2502        .bind(&lease.process_id)
2503        .bind(&lease.owner_id)
2504        .bind(&lease.lease_token)
2505        .bind(lease.fencing_token as i64)
2506        .bind(lease.claimed_at_epoch_ms as i64)
2507        .bind(lease.expires_at_epoch_ms as i64)
2508        .execute(&mut *tx)
2509        .await
2510        .map_err(plugin_sqlx_error)?;
2511        tx.commit().await.map_err(plugin_sqlx_error)?;
2512        Ok(lease)
2513    }
2514
2515    async fn renew_process_lease(
2516        &self,
2517        lease: &ProcessLease,
2518        lease_ttl_ms: u64,
2519    ) -> Result<ProcessLease, PluginError> {
2520        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2521        let now = current_epoch_ms();
2522        let current = load_process_lease_tx(&mut tx, &lease.process_id).await?;
2523        if !guard_lease(current.as_ref(), &lease.lease_token, now) {
2524            return Err(process_lease_expired(&lease.process_id));
2525        }
2526        let renewed = ProcessLease {
2527            expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
2528            ..lease.clone()
2529        };
2530        sqlx::query(
2531            "UPDATE lash_process_leases
2532             SET lease_expires_at_ms = $2
2533             WHERE process_id = $1 AND lease_token = $3",
2534        )
2535        .bind(&renewed.process_id)
2536        .bind(renewed.expires_at_epoch_ms as i64)
2537        .bind(&renewed.lease_token)
2538        .execute(&mut *tx)
2539        .await
2540        .map_err(plugin_sqlx_error)?;
2541        tx.commit().await.map_err(plugin_sqlx_error)?;
2542        Ok(renewed)
2543    }
2544
2545    async fn complete_process_lease(
2546        &self,
2547        completion: &ProcessLeaseCompletion,
2548    ) -> Result<(), PluginError> {
2549        sqlx::query(
2550            "UPDATE lash_process_leases
2551             SET lease_owner_id = NULL,
2552                 lease_token = NULL,
2553                 lease_claimed_at_ms = 0,
2554                 lease_expires_at_ms = 0
2555             WHERE process_id = $1 AND lease_token = $2",
2556        )
2557        .bind(&completion.process_id)
2558        .bind(&completion.lease_token)
2559        .execute(&self.pool)
2560        .await
2561        .map_err(plugin_sqlx_error)?;
2562        Ok(())
2563    }
2564}
2565
2566#[async_trait::async_trait]
2567impl HostEventStore for PostgresHostEventStore {
2568    fn durability_tier(&self) -> DurabilityTier {
2569        DurabilityTier::Durable
2570    }
2571
2572    async fn register_subscription(
2573        &self,
2574        draft: TriggerSubscriptionDraft,
2575    ) -> Result<TriggerSubscriptionRecord, PluginError> {
2576        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2577        let seq: i64 = sqlx::query_scalar("SELECT nextval('lash_host_event_subscription_seq')")
2578            .fetch_one(&mut *tx)
2579            .await
2580            .map_err(plugin_sqlx_error)?;
2581        let handle = format!("trigger:{seq}");
2582        let subscription_id = format!("subscription:{seq}");
2583        let now = current_epoch_ms();
2584        let record = TriggerSubscriptionRecord {
2585            subscription_id: subscription_id.clone(),
2586            session_id: draft.session_id,
2587            handle,
2588            name: draft.name,
2589            source_type: draft.source_type,
2590            source_key: draft.source_key,
2591            source: draft.source,
2592            event_ty: draft.event_ty,
2593            module_ref: draft.module_ref,
2594            required_surface_ref: draft.required_surface_ref,
2595            process_ref: draft.process_ref,
2596            process_name: draft.process_name,
2597            input_template: draft.input_template,
2598            enabled: true,
2599            created_at_ms: now,
2600            updated_at_ms: now,
2601        };
2602        sqlx::query(
2603            "INSERT INTO lash_host_event_trigger_subscriptions (
2604                subscription_id, session_id, handle, source_type, source_key,
2605                enabled, created_at_ms, updated_at_ms, record_json
2606             )
2607             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
2608        )
2609        .bind(&record.subscription_id)
2610        .bind(&record.session_id)
2611        .bind(&record.handle)
2612        .bind(&record.source_type)
2613        .bind(&record.source_key)
2614        .bind(record.enabled)
2615        .bind(record.created_at_ms as i64)
2616        .bind(record.updated_at_ms as i64)
2617        .bind(serde_json::to_string(&record).map_err(process_decode_error)?)
2618        .execute(&mut *tx)
2619        .await
2620        .map_err(plugin_sqlx_error)?;
2621        tx.commit().await.map_err(plugin_sqlx_error)?;
2622        Ok(record)
2623    }
2624
2625    async fn list_subscriptions(
2626        &self,
2627        filter: TriggerSubscriptionFilter,
2628    ) -> Result<Vec<TriggerSubscriptionRecord>, PluginError> {
2629        let rows = sqlx::query(
2630            "SELECT record_json FROM lash_host_event_trigger_subscriptions
2631             ORDER BY session_id ASC, handle ASC",
2632        )
2633        .fetch_all(&self.pool)
2634        .await
2635        .map_err(plugin_sqlx_error)?;
2636        let mut records = Vec::new();
2637        for row in rows {
2638            let json: String = row.get(0);
2639            let record: TriggerSubscriptionRecord =
2640                serde_json::from_str(&json).map_err(process_decode_error)?;
2641            if filter.matches(&record) {
2642                records.push(record);
2643            }
2644        }
2645        Ok(records)
2646    }
2647
2648    async fn cancel_subscription(
2649        &self,
2650        session_id: &str,
2651        handle: &str,
2652    ) -> Result<bool, PluginError> {
2653        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2654        let json: Option<String> = sqlx::query_scalar(
2655            "SELECT record_json FROM lash_host_event_trigger_subscriptions
2656             WHERE session_id = $1 AND handle = $2
2657             FOR UPDATE",
2658        )
2659        .bind(session_id)
2660        .bind(handle)
2661        .fetch_optional(&mut *tx)
2662        .await
2663        .map_err(plugin_sqlx_error)?;
2664        let Some(json) = json else {
2665            tx.commit().await.map_err(plugin_sqlx_error)?;
2666            return Ok(false);
2667        };
2668        let mut record: TriggerSubscriptionRecord =
2669            serde_json::from_str(&json).map_err(process_decode_error)?;
2670        let changed = record.enabled;
2671        record.enabled = false;
2672        record.updated_at_ms = current_epoch_ms();
2673        sqlx::query(
2674            "UPDATE lash_host_event_trigger_subscriptions
2675             SET enabled = $3, updated_at_ms = $4, record_json = $5
2676             WHERE session_id = $1 AND handle = $2",
2677        )
2678        .bind(session_id)
2679        .bind(handle)
2680        .bind(record.enabled)
2681        .bind(record.updated_at_ms as i64)
2682        .bind(serde_json::to_string(&record).map_err(process_decode_error)?)
2683        .execute(&mut *tx)
2684        .await
2685        .map_err(plugin_sqlx_error)?;
2686        tx.commit().await.map_err(plugin_sqlx_error)?;
2687        Ok(changed)
2688    }
2689
2690    async fn record_occurrence(
2691        &self,
2692        request: HostEventOccurrenceRequest,
2693    ) -> Result<HostEventOccurrenceRecord, PluginError> {
2694        lash_core::validate_host_event_occurrence_request(&request)?;
2695        let request_hash = lash_core::host_event_occurrence_request_hash(&request)?;
2696        let occurrence_id = lash_core::deterministic_occurrence_id(&request)?;
2697        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2698        let existing = sqlx::query(
2699            "SELECT request_hash, record_json
2700             FROM lash_host_event_occurrences
2701             WHERE idempotency_key = $1
2702             FOR UPDATE",
2703        )
2704        .bind(&request.idempotency_key)
2705        .fetch_optional(&mut *tx)
2706        .await
2707        .map_err(plugin_sqlx_error)?;
2708        if let Some(row) = existing {
2709            let existing_hash: String = row.get(0);
2710            let existing_json: String = row.get(1);
2711            if existing_hash != request_hash {
2712                return Err(PluginError::Session(format!(
2713                    "host event occurrence idempotency conflict for `{}`",
2714                    request.idempotency_key
2715                )));
2716            }
2717            let record = serde_json::from_str(&existing_json).map_err(process_decode_error)?;
2718            tx.commit().await.map_err(plugin_sqlx_error)?;
2719            return Ok(record);
2720        }
2721        let record = HostEventOccurrenceRecord {
2722            occurrence_id: occurrence_id.clone(),
2723            source_type: request.source_type,
2724            source_key: request.source_key,
2725            payload: request.payload,
2726            idempotency_key: request.idempotency_key.clone(),
2727            source: request.source,
2728            occurred_at_ms: current_epoch_ms(),
2729        };
2730        sqlx::query(
2731            "INSERT INTO lash_host_event_occurrences (
2732                occurrence_id, idempotency_key, request_hash, source_type, source_key,
2733                occurred_at_ms, record_json
2734             )
2735             VALUES ($1, $2, $3, $4, $5, $6, $7)",
2736        )
2737        .bind(&record.occurrence_id)
2738        .bind(&record.idempotency_key)
2739        .bind(&request_hash)
2740        .bind(&record.source_type)
2741        .bind(&record.source_key)
2742        .bind(record.occurred_at_ms as i64)
2743        .bind(serde_json::to_string(&record).map_err(process_decode_error)?)
2744        .execute(&mut *tx)
2745        .await
2746        .map_err(plugin_sqlx_error)?;
2747        tx.commit().await.map_err(plugin_sqlx_error)?;
2748        Ok(record)
2749    }
2750
2751    async fn reserve_matching_deliveries(
2752        &self,
2753        occurrence_id: &str,
2754    ) -> Result<Vec<TriggerDeliveryReservation>, PluginError> {
2755        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2756        let occurrence_json: Option<String> = sqlx::query_scalar(
2757            "SELECT record_json FROM lash_host_event_occurrences WHERE occurrence_id = $1",
2758        )
2759        .bind(occurrence_id)
2760        .fetch_optional(&mut *tx)
2761        .await
2762        .map_err(plugin_sqlx_error)?;
2763        let Some(occurrence_json) = occurrence_json else {
2764            return Err(PluginError::Session(format!(
2765                "unknown host event occurrence `{occurrence_id}`"
2766            )));
2767        };
2768        let occurrence: HostEventOccurrenceRecord =
2769            serde_json::from_str(&occurrence_json).map_err(process_decode_error)?;
2770        let rows = sqlx::query(
2771            "SELECT record_json FROM lash_host_event_trigger_subscriptions
2772             WHERE enabled = TRUE AND source_type = $1 AND source_key = $2
2773             ORDER BY session_id ASC, handle ASC",
2774        )
2775        .bind(&occurrence.source_type)
2776        .bind(&occurrence.source_key)
2777        .fetch_all(&mut *tx)
2778        .await
2779        .map_err(plugin_sqlx_error)?;
2780        let mut deliveries = Vec::new();
2781        for row in rows {
2782            let json: String = row.get(0);
2783            let subscription: TriggerSubscriptionRecord =
2784                serde_json::from_str(&json).map_err(process_decode_error)?;
2785            let process_id = lash_core::deterministic_delivery_process_id(
2786                &occurrence.occurrence_id,
2787                &subscription.subscription_id,
2788            )?;
2789            let inserted = sqlx::query(
2790                "INSERT INTO lash_host_event_deliveries (
2791                    occurrence_id, subscription_id, process_id, created_at_ms
2792                 )
2793                 VALUES ($1, $2, $3, $4)
2794                 ON CONFLICT DO NOTHING",
2795            )
2796            .bind(&occurrence.occurrence_id)
2797            .bind(&subscription.subscription_id)
2798            .bind(&process_id)
2799            .bind(current_epoch_ms() as i64)
2800            .execute(&mut *tx)
2801            .await
2802            .map_err(plugin_sqlx_error)?
2803            .rows_affected();
2804            if inserted == 0 {
2805                continue;
2806            }
2807            deliveries.push(TriggerDeliveryReservation {
2808                occurrence: occurrence.clone(),
2809                subscription,
2810                process_id,
2811            });
2812        }
2813        tx.commit().await.map_err(plugin_sqlx_error)?;
2814        Ok(deliveries)
2815    }
2816}
2817
2818#[async_trait::async_trait]
2819impl lashlang::LashlangArtifactStore for PostgresLashlangArtifactStore {
2820    fn durability_tier(&self) -> DurabilityTier {
2821        DurabilityTier::Durable
2822    }
2823
2824    async fn put_module_artifact(
2825        &self,
2826        artifact: &lashlang::ModuleArtifact,
2827    ) -> Result<(), lashlang::ArtifactStoreError> {
2828        let bytes = artifact
2829            .to_store_bytes()
2830            .map_err(lashlang::ArtifactStoreError::from)?;
2831        sqlx::query(
2832            "INSERT INTO lash_lashlang_artifacts (module_ref, artifact_bytes)
2833             VALUES ($1, $2)
2834             ON CONFLICT (module_ref) DO UPDATE SET artifact_bytes = EXCLUDED.artifact_bytes",
2835        )
2836        .bind(artifact.module_ref.as_str())
2837        .bind(bytes)
2838        .execute(&self.pool)
2839        .await
2840        .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?;
2841        Ok(())
2842    }
2843
2844    async fn get_module_artifact(
2845        &self,
2846        module_ref: &lashlang::ModuleRef,
2847    ) -> Result<Option<Arc<lashlang::ModuleArtifact>>, lashlang::ArtifactStoreError> {
2848        let bytes: Option<Vec<u8>> = sqlx::query_scalar(
2849            "SELECT artifact_bytes FROM lash_lashlang_artifacts WHERE module_ref = $1",
2850        )
2851        .bind(module_ref.as_str())
2852        .fetch_optional(&self.pool)
2853        .await
2854        .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?;
2855        bytes
2856            .map(|bytes| {
2857                lashlang::ModuleArtifact::from_store_bytes(&bytes)
2858                    .map(Arc::new)
2859                    .map_err(lashlang::ArtifactStoreError::from)
2860            })
2861            .transpose()
2862    }
2863}