Skip to main content

lash_postgres_store/
lib.rs

1//! PostgreSQL durable storage for Lash.
2//!
3//! One [`PostgresStorage`] owns a shared [`sqlx::PgPool`] and creates durable
4//! implementations for the runtime session store, process registry, host-event
5//! store, Lashlang artifact store, and attachment manifest.
6
7use std::collections::HashSet;
8use std::sync::{Arc, OnceLock};
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11use lash_core::runtime::{
12    ProcessHandleGrantEntry, QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim,
13    QueuedWorkClaimBoundary, QueuedWorkCompletion, QueuedWorkItem,
14};
15use lash_core::store::{
16    GraphCommitDelta, HydratedSessionCheckpoint, PersistedSessionRead, RuntimeCommit,
17    RuntimeCommitResult, SessionCheckpoint, SessionHeadMeta,
18};
19use lash_core::{
20    AttachmentId, AttachmentIntent, AttachmentManifest, AttachmentManifestEntry, BlobRef,
21    DeliveryPolicy, DurabilityTier, GcReport, MergeKey, ProcessAwaitOutput, ProcessEvent,
22    ProcessEventAppendRequest, ProcessEventAppendResult, ProcessExternalRef,
23    ProcessHandleDescriptor, ProcessHandleGrant, ProcessLease, ProcessLeaseCompletion,
24    ProcessRecord, ProcessRegistration, ProcessRegistry, ProcessScope, RuntimePersistence,
25    SessionMeta, SessionNodeRecord, SessionReadScope, SessionStoreCreateRequest,
26    SessionStoreFactory, SlotPolicy, StoreError, TokenLedgerEntry, VacuumReport,
27};
28use lash_core::{
29    HostEventOccurrenceRecord, HostEventOccurrenceRequest, HostEventStore, PluginError,
30    TriggerDeliveryReservation, TriggerSubscriptionDraft, TriggerSubscriptionFilter,
31    TriggerSubscriptionRecord,
32};
33use sha2::{Digest, Sha256};
34use sqlx::postgres::{PgPool, PgPoolOptions, PgRow};
35use sqlx::{Executor, Row};
36
37const SCHEMA_COMPONENT: &str = "lash-postgres-store";
38const SCHEMA_VERSION: i32 = 1;
39const PROCESS_LEASE_SCHEMA_VERSION: u32 = lash_core::PROCESS_LEASE_SCHEMA_VERSION;
40
41#[derive(Clone)]
42pub struct PostgresStorage {
43    pool: PgPool,
44}
45
46#[derive(Clone)]
47pub struct PostgresSessionStoreFactory {
48    pool: PgPool,
49}
50
51#[derive(Clone)]
52pub struct PostgresSessionStore {
53    pool: PgPool,
54    /// Explicit session binding for handles created via the factory.
55    session_id: Option<String>,
56    /// In-memory bind-on-first-commit for an *unbound* handle. A session-store
57    /// handle commits to exactly one session; an unbound handle latches the first
58    /// session it commits and rejects others (Postgres is multi-session per
59    /// database, so this can't be inferred from a singleton head row the way the
60    /// single-file SQLite store does). Shared across clones via `Arc`.
61    bound_session: Arc<OnceLock<String>>,
62}
63
64#[derive(Clone)]
65pub struct PostgresProcessRegistry {
66    pool: PgPool,
67    notify: Arc<tokio::sync::Notify>,
68}
69
70#[derive(Clone)]
71pub struct PostgresHostEventStore {
72    pool: PgPool,
73}
74
75#[derive(Clone)]
76pub struct PostgresLashlangArtifactStore {
77    pool: PgPool,
78}
79
80/// Connection-pool and per-connection timeout knobs for [`PostgresStorage`].
81///
82/// Session commits use **optimistic CAS** on the head (`UPDATE … WHERE
83/// head_revision = expected`), not a held `SELECT … FOR UPDATE`, so concurrent
84/// writers never pin a pool connection while blocked on a lock. `lock_timeout` is
85/// defense in depth: it caps how long the single CAS write may wait on the head
86/// row's lock before erroring (surfaced as a retryable conflict), so a pathological
87/// burst can never starve the pool.
88#[derive(Clone, Debug)]
89pub struct PostgresStoreConfig {
90    /// Maximum pooled connections. Default 16.
91    pub max_connections: u32,
92    /// Minimum idle connections kept warm. Default 0.
93    pub min_connections: u32,
94    /// How long `acquire` waits for a free connection before erroring. Default 30s.
95    pub acquire_timeout: Duration,
96    /// Close a connection after this idle period. Default 10m.
97    pub idle_timeout: Option<Duration>,
98    /// Recycle a connection after this lifetime. Default 30m.
99    pub max_lifetime: Option<Duration>,
100    /// Postgres `lock_timeout` applied to every connection. Default 10s.
101    pub lock_timeout: Option<Duration>,
102    /// Postgres `statement_timeout` applied to every connection. Default off.
103    pub statement_timeout: Option<Duration>,
104}
105
106impl Default for PostgresStoreConfig {
107    fn default() -> Self {
108        Self {
109            max_connections: 16,
110            min_connections: 0,
111            acquire_timeout: Duration::from_secs(30),
112            idle_timeout: Some(Duration::from_secs(600)),
113            max_lifetime: Some(Duration::from_secs(1800)),
114            lock_timeout: Some(Duration::from_secs(10)),
115            statement_timeout: None,
116        }
117    }
118}
119
120impl PostgresStorage {
121    /// Connect with [`PostgresStoreConfig::default`] pool/timeout settings.
122    pub async fn connect(database_url: &str) -> Result<Self, StoreError> {
123        Self::connect_with(database_url, PostgresStoreConfig::default()).await
124    }
125
126    /// Connect with explicit pool sizing and per-connection timeouts.
127    pub async fn connect_with(
128        database_url: &str,
129        config: PostgresStoreConfig,
130    ) -> Result<Self, StoreError> {
131        let lock_ms = config.lock_timeout.map(|d| d.as_millis().max(1) as u64);
132        let statement_ms = config
133            .statement_timeout
134            .map(|d| d.as_millis().max(1) as u64);
135        let mut options = PgPoolOptions::new()
136            .max_connections(config.max_connections)
137            .min_connections(config.min_connections)
138            .acquire_timeout(config.acquire_timeout);
139        if let Some(timeout) = config.idle_timeout {
140            options = options.idle_timeout(timeout);
141        }
142        if let Some(timeout) = config.max_lifetime {
143            options = options.max_lifetime(timeout);
144        }
145        let pool = options
146            .after_connect(move |conn, _meta| {
147                Box::pin(async move {
148                    if let Some(ms) = lock_ms {
149                        conn.execute(format!("SET lock_timeout = {ms}").as_str())
150                            .await?;
151                    }
152                    if let Some(ms) = statement_ms {
153                        conn.execute(format!("SET statement_timeout = {ms}").as_str())
154                            .await?;
155                    }
156                    Ok(())
157                })
158            })
159            .connect(database_url)
160            .await
161            .map_err(store_sqlx_error)?;
162        ensure_schema(&pool).await?;
163        Ok(Self { pool })
164    }
165
166    pub fn from_pool(pool: PgPool) -> Self {
167        Self { pool }
168    }
169
170    pub fn pool(&self) -> &PgPool {
171        &self.pool
172    }
173
174    pub fn session_store_factory(&self) -> PostgresSessionStoreFactory {
175        PostgresSessionStoreFactory {
176            pool: self.pool.clone(),
177        }
178    }
179
180    pub fn session_store(&self, session_id: impl Into<String>) -> PostgresSessionStore {
181        PostgresSessionStore {
182            pool: self.pool.clone(),
183            session_id: Some(session_id.into()),
184            bound_session: Arc::new(OnceLock::new()),
185        }
186    }
187
188    pub fn unbound_session_store(&self) -> PostgresSessionStore {
189        PostgresSessionStore {
190            pool: self.pool.clone(),
191            session_id: None,
192            bound_session: Arc::new(OnceLock::new()),
193        }
194    }
195
196    pub fn process_registry(&self) -> PostgresProcessRegistry {
197        PostgresProcessRegistry {
198            pool: self.pool.clone(),
199            notify: Arc::new(tokio::sync::Notify::new()),
200        }
201    }
202
203    pub fn host_event_store(&self) -> PostgresHostEventStore {
204        PostgresHostEventStore {
205            pool: self.pool.clone(),
206        }
207    }
208
209    pub fn lashlang_artifact_store(&self) -> PostgresLashlangArtifactStore {
210        PostgresLashlangArtifactStore {
211            pool: self.pool.clone(),
212        }
213    }
214}
215
216impl PostgresSessionStoreFactory {
217    pub fn new(storage: &PostgresStorage) -> Self {
218        storage.session_store_factory()
219    }
220}
221
222impl PostgresSessionStore {
223    pub fn unbound(storage: &PostgresStorage) -> Self {
224        storage.unbound_session_store()
225    }
226
227    async fn selected_session_id(&self) -> Result<Option<String>, StoreError> {
228        if let Some(session_id) = &self.session_id {
229            return Ok(Some(session_id.clone()));
230        }
231        sqlx::query_scalar("SELECT session_id FROM lash_sessions ORDER BY session_id ASC LIMIT 1")
232            .fetch_optional(&self.pool)
233            .await
234            .map_err(store_sqlx_error)
235    }
236}
237
238async fn ensure_schema(pool: &PgPool) -> Result<(), StoreError> {
239    let mut tx = pool.begin().await.map_err(store_sqlx_error)?;
240    tx.execute("SELECT pg_advisory_xact_lock(715421, 907001)")
241        .await
242        .map_err(store_sqlx_error)?;
243    tx.execute(
244        r#"
245        CREATE TABLE IF NOT EXISTS lash_schema_versions (
246            component TEXT PRIMARY KEY,
247            version INTEGER NOT NULL
248        );
249
250        CREATE TABLE IF NOT EXISTS lash_blobs (
251            hash TEXT PRIMARY KEY,
252            content BYTEA NOT NULL
253        );
254
255        CREATE TABLE IF NOT EXISTS lash_sessions (
256            session_id TEXT PRIMARY KEY,
257            head_revision BIGINT NOT NULL DEFAULT 0,
258            head_json TEXT NOT NULL,
259            checkpoint_ref TEXT
260        );
261
262        CREATE TABLE IF NOT EXISTS lash_graph_nodes (
263            session_id TEXT NOT NULL,
264            seq BIGSERIAL,
265            node_id TEXT NOT NULL,
266            node_json TEXT NOT NULL,
267            tombstoned BOOLEAN NOT NULL DEFAULT FALSE,
268            PRIMARY KEY (session_id, node_id)
269        );
270        CREATE INDEX IF NOT EXISTS idx_lash_graph_nodes_seq
271            ON lash_graph_nodes(session_id, seq);
272
273        CREATE TABLE IF NOT EXISTS lash_usage_deltas (
274            seq BIGSERIAL PRIMARY KEY,
275            session_id TEXT NOT NULL,
276            entry_json TEXT NOT NULL
277        );
278
279        CREATE TABLE IF NOT EXISTS lash_session_meta (
280            session_id TEXT PRIMARY KEY,
281            meta_json TEXT NOT NULL
282        );
283
284        CREATE TABLE IF NOT EXISTS lash_runtime_turn_commits (
285            session_id TEXT NOT NULL,
286            turn_id TEXT NOT NULL,
287            turn_commit_hash TEXT NOT NULL,
288            result_json TEXT NOT NULL,
289            committed_at_ms BIGINT NOT NULL,
290            PRIMARY KEY (session_id, turn_id)
291        );
292
293        CREATE TABLE IF NOT EXISTS lash_queued_work_batches (
294            enqueue_seq BIGSERIAL PRIMARY KEY,
295            batch_id TEXT NOT NULL UNIQUE,
296            session_id TEXT NOT NULL,
297            source_key TEXT,
298            delivery_policy TEXT NOT NULL,
299            slot_policy TEXT NOT NULL,
300            merge_key_json TEXT NOT NULL,
301            available_at_ms BIGINT NOT NULL,
302            enqueued_at_ms BIGINT NOT NULL,
303            claim_id TEXT,
304            claim_owner_id TEXT,
305            claim_token TEXT,
306            claim_fencing_token BIGINT NOT NULL DEFAULT 0,
307            claim_claimed_at_ms BIGINT NOT NULL DEFAULT 0,
308            claim_expires_at_ms BIGINT NOT NULL DEFAULT 0,
309            UNIQUE (session_id, source_key)
310        );
311        CREATE INDEX IF NOT EXISTS idx_lash_queued_work_ready
312            ON lash_queued_work_batches(session_id, available_at_ms, enqueue_seq);
313
314        CREATE TABLE IF NOT EXISTS lash_queued_work_items (
315            batch_id TEXT NOT NULL REFERENCES lash_queued_work_batches(batch_id) ON DELETE CASCADE,
316            item_index INTEGER NOT NULL,
317            item_id TEXT NOT NULL,
318            payload_json TEXT NOT NULL,
319            PRIMARY KEY (batch_id, item_index)
320        );
321
322        CREATE TABLE IF NOT EXISTS lash_attachment_manifest (
323            attachment_id TEXT PRIMARY KEY,
324            session_id TEXT NOT NULL,
325            canonical_uri TEXT NOT NULL,
326            intent_at_ms BIGINT NOT NULL,
327            committed_at_ms BIGINT
328        );
329        CREATE INDEX IF NOT EXISTS idx_lash_attachment_manifest_uncommitted
330            ON lash_attachment_manifest(committed_at_ms)
331            WHERE committed_at_ms IS NULL;
332
333        CREATE TABLE IF NOT EXISTS lash_processes (
334            process_id TEXT PRIMARY KEY,
335            registration_hash TEXT NOT NULL,
336            owner_scope_id TEXT NOT NULL,
337            host_profile_id TEXT NOT NULL,
338            created_at_ms BIGINT NOT NULL,
339            updated_at_ms BIGINT NOT NULL,
340            status TEXT NOT NULL,
341            record_json TEXT NOT NULL
342        );
343        CREATE INDEX IF NOT EXISTS idx_lash_processes_status
344            ON lash_processes(status);
345
346        CREATE TABLE IF NOT EXISTS lash_process_events (
347            process_id TEXT NOT NULL REFERENCES lash_processes(process_id) ON DELETE CASCADE,
348            sequence BIGINT NOT NULL,
349            event_type TEXT NOT NULL,
350            payload_hash TEXT NOT NULL,
351            idempotency_key TEXT,
352            occurred_at_ms BIGINT NOT NULL,
353            event_json TEXT NOT NULL,
354            PRIMARY KEY (process_id, sequence)
355        );
356        CREATE UNIQUE INDEX IF NOT EXISTS idx_lash_process_events_key
357            ON lash_process_events(process_id, idempotency_key)
358            WHERE idempotency_key IS NOT NULL;
359
360        CREATE TABLE IF NOT EXISTS lash_process_wake_acks (
361            process_id TEXT NOT NULL REFERENCES lash_processes(process_id) ON DELETE CASCADE,
362            sequence BIGINT NOT NULL,
363            PRIMARY KEY (process_id, sequence)
364        );
365
366        CREATE TABLE IF NOT EXISTS lash_process_handle_grants (
367            session_id TEXT NOT NULL,
368            scope_id TEXT NOT NULL,
369            process_id TEXT NOT NULL REFERENCES lash_processes(process_id) ON DELETE CASCADE,
370            descriptor_json TEXT NOT NULL,
371            PRIMARY KEY (scope_id, process_id)
372        );
373        CREATE INDEX IF NOT EXISTS idx_lash_process_handle_grants_session
374            ON lash_process_handle_grants(session_id);
375        CREATE INDEX IF NOT EXISTS idx_lash_process_handle_grants_process
376            ON lash_process_handle_grants(process_id);
377
378        CREATE TABLE IF NOT EXISTS lash_process_leases (
379            process_id TEXT PRIMARY KEY REFERENCES lash_processes(process_id) ON DELETE CASCADE,
380            lease_owner_id TEXT,
381            lease_token TEXT,
382            lease_fencing_token BIGINT NOT NULL DEFAULT 0,
383            lease_claimed_at_ms BIGINT NOT NULL DEFAULT 0,
384            lease_expires_at_ms BIGINT NOT NULL DEFAULT 0
385        );
386
387        CREATE SEQUENCE IF NOT EXISTS lash_host_event_subscription_seq;
388        CREATE TABLE IF NOT EXISTS lash_host_event_trigger_subscriptions (
389            subscription_id TEXT PRIMARY KEY,
390            session_id TEXT NOT NULL,
391            handle TEXT NOT NULL,
392            source_type TEXT NOT NULL,
393            source_key TEXT NOT NULL,
394            enabled BOOLEAN NOT NULL,
395            created_at_ms BIGINT NOT NULL,
396            updated_at_ms BIGINT NOT NULL,
397            record_json TEXT NOT NULL,
398            UNIQUE(session_id, handle)
399        );
400        CREATE INDEX IF NOT EXISTS idx_lash_host_event_subscriptions_source
401            ON lash_host_event_trigger_subscriptions(source_type, source_key, enabled);
402
403        CREATE TABLE IF NOT EXISTS lash_host_event_occurrences (
404            occurrence_id TEXT PRIMARY KEY,
405            idempotency_key TEXT NOT NULL UNIQUE,
406            request_hash TEXT NOT NULL,
407            source_type TEXT NOT NULL,
408            source_key TEXT NOT NULL,
409            occurred_at_ms BIGINT NOT NULL,
410            record_json TEXT NOT NULL
411        );
412
413        CREATE TABLE IF NOT EXISTS lash_host_event_deliveries (
414            occurrence_id TEXT NOT NULL REFERENCES lash_host_event_occurrences(occurrence_id) ON DELETE CASCADE,
415            subscription_id TEXT NOT NULL REFERENCES lash_host_event_trigger_subscriptions(subscription_id) ON DELETE CASCADE,
416            process_id TEXT NOT NULL,
417            created_at_ms BIGINT NOT NULL,
418            PRIMARY KEY (occurrence_id, subscription_id)
419        );
420
421        CREATE TABLE IF NOT EXISTS lash_lashlang_artifacts (
422            module_ref TEXT PRIMARY KEY,
423            artifact_bytes BYTEA NOT NULL
424        );
425        "#,
426    )
427    .await
428    .map_err(store_sqlx_error)?;
429
430    let existing: Option<i32> =
431        sqlx::query_scalar("SELECT version FROM lash_schema_versions WHERE component = $1")
432            .bind(SCHEMA_COMPONENT)
433            .fetch_optional(&mut *tx)
434            .await
435            .map_err(store_sqlx_error)?;
436    match existing {
437        Some(version) if version == SCHEMA_VERSION => {}
438        Some(version) => {
439            return Err(StoreError::Backend(format!(
440                "Postgres schema component `{SCHEMA_COMPONENT}` has version {version}, expected {SCHEMA_VERSION}"
441            )));
442        }
443        None => {
444            sqlx::query("INSERT INTO lash_schema_versions (component, version) VALUES ($1, $2)")
445                .bind(SCHEMA_COMPONENT)
446                .bind(SCHEMA_VERSION)
447                .execute(&mut *tx)
448                .await
449                .map_err(store_sqlx_error)?;
450        }
451    }
452    tx.commit().await.map_err(store_sqlx_error)
453}
454
455fn current_epoch_ms() -> u64 {
456    SystemTime::now()
457        .duration_since(UNIX_EPOCH)
458        .unwrap_or_default()
459        .as_millis() as u64
460}
461
462fn current_timestamp_string() -> String {
463    let now = SystemTime::now()
464        .duration_since(UNIX_EPOCH)
465        .unwrap_or_default();
466    format!("unix:{}", now.as_secs())
467}
468
469fn store_sqlx_error(err: sqlx::Error) -> StoreError {
470    StoreError::Backend(err.to_string())
471}
472
473fn plugin_sqlx_error(err: sqlx::Error) -> PluginError {
474    PluginError::Session(err.to_string())
475}
476
477fn process_decode_error(err: serde_json::Error) -> PluginError {
478    PluginError::Session(format!("failed to decode process registry row: {err}"))
479}
480
481fn store_decode_json<T: serde::de::DeserializeOwned>(
482    json: &str,
483    what: &str,
484) -> Result<T, StoreError> {
485    serde_json::from_str(json)
486        .map_err(|err| StoreError::Backend(format!("failed to decode {what}: {err}")))
487}
488
489fn encode_json<T: serde::Serialize>(value: &T) -> String {
490    serde_json::to_string(value).expect("persisted state should serialize")
491}
492
493fn encode_msgpack<T: serde::Serialize>(value: &T) -> Vec<u8> {
494    let mut buf = Vec::with_capacity(1024);
495    rmp_serde::encode::write_named(&mut buf, value).expect("value should serialize");
496    buf
497}
498
499fn decode_msgpack<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Option<T> {
500    rmp_serde::from_slice(bytes).ok()
501}
502
503fn block_on_detached<T: Send + 'static>(
504    future: impl std::future::Future<Output = T> + Send + 'static,
505) -> T {
506    std::thread::spawn(move || {
507        tokio::runtime::Builder::new_current_thread()
508            .enable_all()
509            .build()
510            .expect("postgres manifest runtime")
511            .block_on(future)
512    })
513    .join()
514    .expect("postgres manifest thread")
515}
516
517fn merge_token_ledger_entries(entries: Vec<TokenLedgerEntry>) -> Vec<TokenLedgerEntry> {
518    let mut merged = Vec::<TokenLedgerEntry>::new();
519    for entry in entries {
520        if entry.usage.total() == 0 {
521            continue;
522        }
523        if let Some(existing) = merged
524            .iter_mut()
525            .find(|existing| existing.source == entry.source && existing.model == entry.model)
526        {
527            existing.usage.input_tokens += entry.usage.input_tokens;
528            existing.usage.output_tokens += entry.usage.output_tokens;
529            existing.usage.cached_input_tokens += entry.usage.cached_input_tokens;
530            existing.usage.reasoning_tokens += entry.usage.reasoning_tokens;
531        } else {
532            merged.push(entry);
533        }
534    }
535    merged
536}
537
538#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
539struct SessionCheckpointEnvelope {
540    manifest: SessionCheckpoint,
541    tool_state: Option<lash_core::ToolState>,
542    plugin_snapshot: Option<lash_core::PluginSessionSnapshot>,
543    execution_state: Option<Vec<u8>>,
544}
545
546async fn put_blob_tx(
547    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
548    content: &[u8],
549) -> Result<BlobRef, StoreError> {
550    let hash = format!("{:x}", Sha256::digest(content));
551    sqlx::query(
552        "INSERT INTO lash_blobs (hash, content)
553         VALUES ($1, $2)
554         ON CONFLICT (hash) DO NOTHING",
555    )
556    .bind(&hash)
557    .bind(content)
558    .execute(&mut **tx)
559    .await
560    .map_err(store_sqlx_error)?;
561    Ok(BlobRef(hash))
562}
563
564async fn get_blob(pool: &PgPool, blob_ref: &BlobRef) -> Option<Vec<u8>> {
565    sqlx::query_scalar::<_, Vec<u8>>("SELECT content FROM lash_blobs WHERE hash = $1")
566        .bind(blob_ref.as_str())
567        .fetch_optional(pool)
568        .await
569        .ok()
570        .flatten()
571}
572
573async fn put_checkpoint_tx(
574    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
575    checkpoint: &HydratedSessionCheckpoint,
576) -> Result<(BlobRef, SessionCheckpoint), StoreError> {
577    let manifest = SessionCheckpoint {
578        turn_state: checkpoint.turn_state.clone(),
579        tool_state_ref: checkpoint.tool_state_ref.clone(),
580        plugin_snapshot_ref: checkpoint.plugin_snapshot_ref.clone(),
581        plugin_snapshot_revision: checkpoint.plugin_snapshot_revision,
582        execution_state_ref: checkpoint.execution_state_ref.clone(),
583    };
584    let envelope = SessionCheckpointEnvelope {
585        manifest: manifest.clone(),
586        tool_state: checkpoint.tool_state.clone(),
587        plugin_snapshot: checkpoint.plugin_snapshot.clone(),
588        execution_state: checkpoint.execution_state.clone(),
589    };
590    let bytes = encode_msgpack(&envelope);
591    let checkpoint_ref = put_blob_tx(tx, &bytes).await?;
592    Ok((checkpoint_ref, manifest))
593}
594
595async fn get_checkpoint(pool: &PgPool, blob_ref: &BlobRef) -> Option<HydratedSessionCheckpoint> {
596    let bytes = get_blob(pool, blob_ref).await?;
597    let envelope: SessionCheckpointEnvelope = decode_msgpack(&bytes)?;
598    Some(HydratedSessionCheckpoint {
599        turn_state: envelope.manifest.turn_state,
600        tool_state_ref: envelope.manifest.tool_state_ref,
601        tool_state: envelope.tool_state,
602        plugin_snapshot_ref: envelope.manifest.plugin_snapshot_ref,
603        plugin_snapshot: envelope.plugin_snapshot,
604        plugin_snapshot_revision: envelope.manifest.plugin_snapshot_revision,
605        execution_state_ref: envelope.manifest.execution_state_ref,
606        execution_state: envelope.execution_state,
607    })
608}
609
610async fn load_session_head_meta_tx(
611    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
612    session_id: &str,
613    for_update: bool,
614) -> Result<Option<SessionHeadMeta>, StoreError> {
615    let sql = if for_update {
616        "SELECT head_json, head_revision FROM lash_sessions WHERE session_id = $1 FOR UPDATE"
617    } else {
618        "SELECT head_json, head_revision FROM lash_sessions WHERE session_id = $1"
619    };
620    let row = sqlx::query(sql)
621        .bind(session_id)
622        .fetch_optional(&mut **tx)
623        .await
624        .map_err(store_sqlx_error)?;
625    let Some(row) = row else {
626        return Ok(None);
627    };
628    let head_json: String = row.get(0);
629    let head_revision: i64 = row.get(1);
630    let mut meta: SessionHeadMeta = store_decode_json(&head_json, "session head")?;
631    meta.head_revision = head_revision as u64;
632    Ok(Some(meta))
633}
634
635async fn load_usage_deltas(pool: &PgPool, session_id: &str) -> Vec<TokenLedgerEntry> {
636    let rows = sqlx::query(
637        "SELECT entry_json FROM lash_usage_deltas WHERE session_id = $1 ORDER BY seq ASC",
638    )
639    .bind(session_id)
640    .fetch_all(pool)
641    .await
642    .unwrap_or_default();
643    rows.into_iter()
644        .filter_map(|row| {
645            let json: String = row.get(0);
646            serde_json::from_str(&json).ok()
647        })
648        .collect()
649}
650
651async fn load_graph(
652    pool: &PgPool,
653    session_id: &str,
654    leaf_node_id: Option<String>,
655    active_path: bool,
656) -> Result<lash_core::SessionGraph, StoreError> {
657    let rows = sqlx::query(
658        "SELECT node_json FROM lash_graph_nodes
659         WHERE session_id = $1 AND tombstoned = FALSE
660         ORDER BY seq ASC",
661    )
662    .bind(session_id)
663    .fetch_all(pool)
664    .await
665    .map_err(store_sqlx_error)?;
666    let mut nodes = Vec::<SessionNodeRecord>::new();
667    for row in rows {
668        let json: String = row.get(0);
669        nodes.push(store_decode_json(&json, "session graph node")?);
670    }
671    if active_path {
672        if let Some(leaf) = leaf_node_id.clone() {
673            let wanted = active_path_node_ids(&nodes, &leaf);
674            nodes.retain(|node| wanted.contains(&node.node_id));
675        }
676    }
677    Ok(lash_core::SessionGraph::from_nodes(nodes, leaf_node_id))
678}
679
680fn active_path_node_ids(nodes: &[SessionNodeRecord], leaf_node_id: &str) -> HashSet<String> {
681    let mut parent_by_id = std::collections::BTreeMap::new();
682    for node in nodes {
683        parent_by_id.insert(node.node_id.clone(), node.parent_node_id.clone());
684    }
685    let mut wanted = HashSet::new();
686    let mut cursor = Some(leaf_node_id.to_string());
687    while let Some(node_id) = cursor {
688        if !wanted.insert(node_id.clone()) {
689            break;
690        }
691        cursor = parent_by_id.get(&node_id).cloned().flatten();
692    }
693    wanted
694}
695
696async fn commit_attachment_refs_tx(
697    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
698    session_id: &str,
699    attachment_ids: &[AttachmentId],
700) -> Result<(), StoreError> {
701    if attachment_ids.is_empty() {
702        return Ok(());
703    }
704    let now = current_epoch_ms() as i64;
705    for id in attachment_ids {
706        sqlx::query(
707            "UPDATE lash_attachment_manifest
708             SET committed_at_ms = COALESCE(committed_at_ms, $1)
709             WHERE attachment_id = $2 AND session_id = $3",
710        )
711        .bind(now)
712        .bind(id.as_str())
713        .bind(session_id)
714        .execute(&mut **tx)
715        .await
716        .map_err(store_sqlx_error)?;
717    }
718    Ok(())
719}
720
721impl AttachmentManifest for PostgresSessionStore {
722    fn record_intent(&self, intent: AttachmentIntent) -> Result<(), StoreError> {
723        let pool = self.pool.clone();
724        block_on_detached(async move {
725            sqlx::query(
726                "INSERT INTO lash_attachment_manifest (
727                    attachment_id, session_id, canonical_uri, intent_at_ms, committed_at_ms
728                 )
729                 VALUES ($1, $2, $3, $4, NULL)
730                 ON CONFLICT (attachment_id) DO UPDATE SET
731                    session_id = EXCLUDED.session_id,
732                    canonical_uri = EXCLUDED.canonical_uri,
733                    intent_at_ms = EXCLUDED.intent_at_ms",
734            )
735            .bind(intent.attachment_id.as_str())
736            .bind(intent.session_id)
737            .bind(intent.canonical_uri)
738            .bind(intent.intent_at_epoch_ms as i64)
739            .execute(&pool)
740            .await
741            .map(|_| ())
742            .map_err(store_sqlx_error)
743        })
744    }
745
746    fn commit_refs(
747        &self,
748        session_id: &str,
749        attachment_ids: &[AttachmentId],
750    ) -> Result<(), StoreError> {
751        let pool = self.pool.clone();
752        let session_id = session_id.to_string();
753        let attachment_ids = attachment_ids.to_vec();
754        block_on_detached(async move {
755            let mut tx = pool.begin().await.map_err(store_sqlx_error)?;
756            commit_attachment_refs_tx(&mut tx, &session_id, &attachment_ids).await?;
757            tx.commit().await.map_err(store_sqlx_error)
758        })
759    }
760
761    fn list_uncommitted(
762        &self,
763        older_than_epoch_ms: u64,
764    ) -> Result<Vec<AttachmentManifestEntry>, StoreError> {
765        let pool = self.pool.clone();
766        block_on_detached(async move {
767            let rows = sqlx::query(
768                "SELECT attachment_id, session_id, canonical_uri, intent_at_ms, committed_at_ms
769                 FROM lash_attachment_manifest
770                 WHERE committed_at_ms IS NULL AND intent_at_ms <= $1
771                 ORDER BY attachment_id ASC",
772            )
773            .bind(older_than_epoch_ms as i64)
774            .fetch_all(&pool)
775            .await
776            .map_err(store_sqlx_error)?;
777            Ok(rows
778                .into_iter()
779                .map(|row| AttachmentManifestEntry {
780                    attachment_id: AttachmentId::new(row.get::<String, _>(0)),
781                    session_id: row.get(1),
782                    canonical_uri: row.get(2),
783                    intent_at_epoch_ms: row.get::<i64, _>(3) as u64,
784                    committed_at_epoch_ms: row.get::<Option<i64>, _>(4).map(|value| value as u64),
785                })
786                .collect())
787        })
788    }
789
790    fn forget(&self, attachment_id: &AttachmentId) -> Result<(), StoreError> {
791        let pool = self.pool.clone();
792        let attachment_id = attachment_id.to_string();
793        block_on_detached(async move {
794            sqlx::query("DELETE FROM lash_attachment_manifest WHERE attachment_id = $1")
795                .bind(attachment_id)
796                .execute(&pool)
797                .await
798                .map(|_| ())
799                .map_err(store_sqlx_error)
800        })
801    }
802}
803
804#[async_trait::async_trait]
805impl SessionStoreFactory for PostgresSessionStoreFactory {
806    fn durability_tier(&self) -> DurabilityTier {
807        DurabilityTier::Durable
808    }
809
810    async fn create_store(
811        &self,
812        request: &SessionStoreCreateRequest,
813    ) -> Result<Arc<dyn RuntimePersistence>, String> {
814        let store = PostgresSessionStore {
815            pool: self.pool.clone(),
816            session_id: Some(request.session_id.clone()),
817            bound_session: Arc::new(OnceLock::new()),
818        };
819        if store
820            .load_session_meta()
821            .await
822            .map_err(|err| err.to_string())?
823            .is_none()
824        {
825            store
826                .save_session_meta(SessionMeta {
827                    session_id: request.session_id.clone(),
828                    session_name: request.session_id.clone(),
829                    created_at: current_timestamp_string(),
830                    model: request.policy.model.id.clone(),
831                    cwd: std::env::current_dir()
832                        .ok()
833                        .and_then(|path| path.to_str().map(str::to_string)),
834                    relation: request.relation.clone(),
835                })
836                .await
837                .map_err(|err| err.to_string())?;
838        }
839        Ok(Arc::new(store))
840    }
841
842    async fn delete_session(&self, session_id: &str) -> Result<(), String> {
843        let mut tx = self.pool.begin().await.map_err(|err| err.to_string())?;
844        for sql in [
845            "DELETE FROM lash_queued_work_items WHERE batch_id IN (SELECT batch_id FROM lash_queued_work_batches WHERE session_id = $1)",
846            "DELETE FROM lash_queued_work_batches WHERE session_id = $1",
847            "DELETE FROM lash_usage_deltas WHERE session_id = $1",
848            "DELETE FROM lash_graph_nodes WHERE session_id = $1",
849            "DELETE FROM lash_runtime_turn_commits WHERE session_id = $1",
850            "DELETE FROM lash_session_meta WHERE session_id = $1",
851            "DELETE FROM lash_sessions WHERE session_id = $1",
852            "DELETE FROM lash_attachment_manifest WHERE session_id = $1",
853        ] {
854            sqlx::query(sql)
855                .bind(session_id)
856                .execute(&mut *tx)
857                .await
858                .map_err(|err| err.to_string())?;
859        }
860        tx.commit().await.map_err(|err| err.to_string())
861    }
862}
863
864#[derive(Clone, Debug)]
865struct QueuedBatchRow {
866    enqueue_seq: u64,
867    batch_id: String,
868    session_id: String,
869    source_key: Option<String>,
870    delivery_policy: DeliveryPolicy,
871    slot_policy: SlotPolicy,
872    merge_key: MergeKey,
873    available_at_ms: u64,
874    enqueued_at_ms: u64,
875    claim_fencing_token: u64,
876}
877
878fn queued_batch_row(row: PgRow) -> Result<QueuedBatchRow, StoreError> {
879    let delivery_policy =
880        DeliveryPolicy::from_wire_str(row.get::<String, _>("delivery_policy").as_str())
881            .ok_or_else(|| {
882                StoreError::Backend("invalid queued work delivery policy".to_string())
883            })?;
884    let slot_policy = SlotPolicy::from_wire_str(row.get::<String, _>("slot_policy").as_str())
885        .ok_or_else(|| StoreError::Backend("invalid queued work slot policy".to_string()))?;
886    let merge_json: String = row.get("merge_key_json");
887    Ok(QueuedBatchRow {
888        enqueue_seq: row.get::<i64, _>("enqueue_seq") as u64,
889        batch_id: row.get("batch_id"),
890        session_id: row.get("session_id"),
891        source_key: row.get("source_key"),
892        delivery_policy,
893        slot_policy,
894        merge_key: store_decode_json(&merge_json, "queued work merge key")?,
895        available_at_ms: row.get::<i64, _>("available_at_ms") as u64,
896        enqueued_at_ms: row.get::<i64, _>("enqueued_at_ms") as u64,
897        claim_fencing_token: row.get::<i64, _>("claim_fencing_token") as u64,
898    })
899}
900
901async fn load_queued_batch(
902    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
903    batch_id: &str,
904) -> Result<Option<QueuedWorkBatch>, StoreError> {
905    let row = sqlx::query(
906        "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
907                slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
908                claim_fencing_token
909         FROM lash_queued_work_batches
910         WHERE batch_id = $1",
911    )
912    .bind(batch_id)
913    .fetch_optional(&mut **tx)
914    .await
915    .map_err(store_sqlx_error)?;
916    let Some(row) = row else {
917        return Ok(None);
918    };
919    let row = queued_batch_row(row)?;
920    queued_work_batch_from_row(tx, row).await.map(Some)
921}
922
923async fn queued_work_batch_from_row(
924    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
925    row: QueuedBatchRow,
926) -> Result<QueuedWorkBatch, StoreError> {
927    let item_rows = sqlx::query(
928        "SELECT item_id, payload_json
929         FROM lash_queued_work_items
930         WHERE batch_id = $1
931         ORDER BY item_index ASC",
932    )
933    .bind(&row.batch_id)
934    .fetch_all(&mut **tx)
935    .await
936    .map_err(store_sqlx_error)?;
937    let mut items = Vec::new();
938    for item in item_rows {
939        let payload_json: String = item.get(1);
940        items.push(QueuedWorkItem {
941            item_id: item.get(0),
942            payload: store_decode_json(&payload_json, "queued work payload")?,
943        });
944    }
945    Ok(QueuedWorkBatch {
946        batch_id: row.batch_id,
947        session_id: row.session_id,
948        enqueue_seq: row.enqueue_seq,
949        source_key: row.source_key,
950        delivery_policy: row.delivery_policy,
951        slot_policy: row.slot_policy,
952        merge_key: row.merge_key,
953        available_at_ms: row.available_at_ms,
954        enqueued_at_ms: row.enqueued_at_ms,
955        items,
956    })
957}
958
959async fn ensure_queued_work_completion_tx(
960    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
961    completed: &QueuedWorkCompletion,
962) -> Result<(), StoreError> {
963    for batch_id in &completed.batch_ids {
964        let exists: Option<i64> = sqlx::query_scalar(
965            "SELECT 1::BIGINT FROM lash_queued_work_batches
966             WHERE session_id = $1
967               AND batch_id = $2
968               AND claim_id = $3
969               AND claim_token = $4
970             LIMIT 1",
971        )
972        .bind(&completed.session_id)
973        .bind(batch_id)
974        .bind(&completed.claim_id)
975        .bind(&completed.lease_token)
976        .fetch_optional(&mut **tx)
977        .await
978        .map_err(store_sqlx_error)?;
979        if exists.is_none() {
980            return Err(StoreError::QueuedWorkClaimExpired {
981                session_id: completed.session_id.clone(),
982                claim_id: completed.claim_id.clone(),
983            });
984        }
985    }
986    Ok(())
987}
988
989#[async_trait::async_trait]
990impl RuntimePersistence for PostgresSessionStore {
991    fn durability_tier(&self) -> DurabilityTier {
992        DurabilityTier::Durable
993    }
994
995    async fn load_session(
996        &self,
997        scope: SessionReadScope,
998    ) -> Result<Option<PersistedSessionRead>, StoreError> {
999        let Some(session_id) = self.selected_session_id().await? else {
1000            return Ok(None);
1001        };
1002        let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1003        let Some(meta) = load_session_head_meta_tx(&mut tx, &session_id, false).await? else {
1004            return Ok(None);
1005        };
1006        tx.commit().await.map_err(store_sqlx_error)?;
1007        let leaf_node_id = match &scope {
1008            SessionReadScope::FullGraph => meta.leaf_node_id.clone(),
1009            SessionReadScope::ActivePath { leaf_node_id } => {
1010                leaf_node_id.clone().or_else(|| meta.leaf_node_id.clone())
1011            }
1012        };
1013        let graph = load_graph(
1014            &self.pool,
1015            &session_id,
1016            leaf_node_id.clone(),
1017            matches!(scope, SessionReadScope::ActivePath { .. }),
1018        )
1019        .await?;
1020        let checkpoint = match meta.checkpoint_ref.as_ref() {
1021            Some(blob_ref) => get_checkpoint(&self.pool, blob_ref).await,
1022            None => None,
1023        };
1024        Ok(Some(PersistedSessionRead {
1025            session_id: meta.session_id,
1026            head_revision: meta.head_revision,
1027            config: meta.config,
1028            agent_frames: meta.agent_frames,
1029            current_agent_frame_id: meta.current_agent_frame_id,
1030            graph,
1031            checkpoint_ref: meta.checkpoint_ref,
1032            checkpoint,
1033            token_ledger: merge_token_ledger_entries(
1034                load_usage_deltas(&self.pool, &session_id).await,
1035            ),
1036        }))
1037    }
1038
1039    async fn load_node(&self, node_id: &str) -> Result<Option<SessionNodeRecord>, StoreError> {
1040        let json: Option<String> = if let Some(session_id) = &self.session_id {
1041            sqlx::query_scalar(
1042                "SELECT node_json FROM lash_graph_nodes
1043                 WHERE session_id = $1 AND node_id = $2 AND tombstoned = FALSE",
1044            )
1045            .bind(session_id)
1046            .bind(node_id)
1047            .fetch_optional(&self.pool)
1048            .await
1049            .map_err(store_sqlx_error)?
1050        } else {
1051            sqlx::query_scalar(
1052                "SELECT node_json FROM lash_graph_nodes
1053                 WHERE node_id = $1 AND tombstoned = FALSE
1054                 ORDER BY session_id ASC
1055                 LIMIT 1",
1056            )
1057            .bind(node_id)
1058            .fetch_optional(&self.pool)
1059            .await
1060            .map_err(store_sqlx_error)?
1061        };
1062        json.map(|json| store_decode_json(&json, "session graph node"))
1063            .transpose()
1064    }
1065
1066    async fn commit_runtime_state(
1067        &self,
1068        commit: RuntimeCommit,
1069    ) -> Result<RuntimeCommitResult, StoreError> {
1070        let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1071        // Read the head WITHOUT a lock; the conditional CAS write below is the
1072        // serialization point (optimistic concurrency), so no pessimistic
1073        // `FOR UPDATE` lock is held across the rest of this transaction.
1074        let existing = load_session_head_meta_tx(&mut tx, &commit.session_id, false).await?;
1075        if let Some(bound_session_id) = existing.as_ref().map(|meta| meta.session_id.as_str())
1076            && bound_session_id != commit.session_id
1077        {
1078            return Err(StoreError::SessionBindingMismatch {
1079                bound_session_id: bound_session_id.to_string(),
1080                attempted_session_id: commit.session_id,
1081            });
1082        }
1083        // A session-store handle commits to exactly one session. An explicit
1084        // binding (`self.session_id`) is authoritative; otherwise the handle binds
1085        // to the first session it commits and rejects any other thereafter.
1086        let effective_binding = self
1087            .session_id
1088            .clone()
1089            .or_else(|| self.bound_session.get().cloned());
1090        if let Some(bound_session_id) = &effective_binding
1091            && commit.session_id != *bound_session_id
1092        {
1093            return Err(StoreError::SessionBindingMismatch {
1094                bound_session_id: bound_session_id.clone(),
1095                attempted_session_id: commit.session_id,
1096            });
1097        }
1098        if self.session_id.is_none() {
1099            let _ = self.bound_session.set(commit.session_id.clone());
1100        }
1101        if let Some(completed) = &commit.turn_commit {
1102            if completed.session_id != commit.session_id {
1103                return Err(StoreError::RuntimeTurnCommitConflict {
1104                    session_id: completed.session_id.clone(),
1105                    turn_id: completed.turn_id.clone(),
1106                });
1107            }
1108            let prior = sqlx::query(
1109                "SELECT turn_commit_hash, result_json
1110                 FROM lash_runtime_turn_commits
1111                 WHERE session_id = $1 AND turn_id = $2",
1112            )
1113            .bind(&completed.session_id)
1114            .bind(&completed.turn_id)
1115            .fetch_optional(&mut *tx)
1116            .await
1117            .map_err(store_sqlx_error)?;
1118            if let Some(row) = prior {
1119                let hash: String = row.get(0);
1120                let result_json: String = row.get(1);
1121                if hash == completed.turn_commit_hash {
1122                    return store_decode_json(&result_json, "runtime turn commit result");
1123                }
1124                return Err(StoreError::RuntimeTurnCommitConflict {
1125                    session_id: completed.session_id.clone(),
1126                    turn_id: completed.turn_id.clone(),
1127                });
1128            }
1129        }
1130        let actual_revision = existing.as_ref().map_or(0, |meta| meta.head_revision);
1131        if commit.expected_head_revision.is_some()
1132            && commit.expected_head_revision != Some(actual_revision)
1133        {
1134            return Err(StoreError::HeadRevisionConflict {
1135                expected: commit.expected_head_revision,
1136                actual: actual_revision,
1137            });
1138        }
1139        for completed in &commit.completed_queue_claims {
1140            if completed.session_id != commit.session_id {
1141                return Err(StoreError::QueuedWorkClaimExpired {
1142                    session_id: completed.session_id.clone(),
1143                    claim_id: completed.claim_id.clone(),
1144                });
1145            }
1146            ensure_queued_work_completion_tx(&mut tx, completed).await?;
1147        }
1148        let (checkpoint_ref, manifest) = put_checkpoint_tx(&mut tx, &commit.checkpoint).await?;
1149        for entry in &commit.usage_deltas {
1150            sqlx::query("INSERT INTO lash_usage_deltas (session_id, entry_json) VALUES ($1, $2)")
1151                .bind(&commit.session_id)
1152                .bind(encode_json(entry))
1153                .execute(&mut *tx)
1154                .await
1155                .map_err(store_sqlx_error)?;
1156        }
1157        let leaf_node_id = match &commit.graph {
1158            GraphCommitDelta::Unchanged { leaf_node_id } => leaf_node_id.clone(),
1159            GraphCommitDelta::Append {
1160                nodes,
1161                leaf_node_id,
1162            } => {
1163                for node in nodes {
1164                    sqlx::query(
1165                        "INSERT INTO lash_graph_nodes (session_id, node_id, node_json)
1166                         VALUES ($1, $2, $3)
1167                         ON CONFLICT (session_id, node_id) DO UPDATE SET
1168                            node_json = EXCLUDED.node_json,
1169                            tombstoned = FALSE",
1170                    )
1171                    .bind(&commit.session_id)
1172                    .bind(&node.node_id)
1173                    .bind(encode_json(node))
1174                    .execute(&mut *tx)
1175                    .await
1176                    .map_err(store_sqlx_error)?;
1177                }
1178                leaf_node_id.clone()
1179            }
1180            GraphCommitDelta::ReplaceFull(graph) => {
1181                sqlx::query("DELETE FROM lash_graph_nodes WHERE session_id = $1")
1182                    .bind(&commit.session_id)
1183                    .execute(&mut *tx)
1184                    .await
1185                    .map_err(store_sqlx_error)?;
1186                for node in &graph.nodes {
1187                    sqlx::query(
1188                        "INSERT INTO lash_graph_nodes (session_id, node_id, node_json)
1189                         VALUES ($1, $2, $3)",
1190                    )
1191                    .bind(&commit.session_id)
1192                    .bind(&node.node_id)
1193                    .bind(encode_json(node))
1194                    .execute(&mut *tx)
1195                    .await
1196                    .map_err(store_sqlx_error)?;
1197                }
1198                graph.leaf_node_id.clone()
1199            }
1200        };
1201        let graph_node_count: i64 = sqlx::query_scalar(
1202            "SELECT COUNT(*) FROM lash_graph_nodes WHERE session_id = $1 AND tombstoned = FALSE",
1203        )
1204        .bind(&commit.session_id)
1205        .fetch_one(&mut *tx)
1206        .await
1207        .map_err(store_sqlx_error)?;
1208        let next_revision = actual_revision + 1;
1209        let meta = SessionHeadMeta {
1210            session_id: commit.session_id.clone(),
1211            head_revision: next_revision,
1212            config: commit.config.clone(),
1213            agent_frames: commit.agent_frames.clone(),
1214            current_agent_frame_id: commit.current_agent_frame_id.clone(),
1215            checkpoint_ref: Some(checkpoint_ref.clone()),
1216            leaf_node_id,
1217            graph_node_count: graph_node_count as usize,
1218            token_ledger: Vec::new(),
1219        };
1220        // Optimistic CAS on the head revision. The `WHERE head_revision = $5`
1221        // guard makes the write succeed only if no concurrent committer moved the
1222        // head since our unlocked read above. A brand-new session inserts (no
1223        // conflict); an existing one updates only when the revision still matches.
1224        let head_write = sqlx::query(
1225            "INSERT INTO lash_sessions (session_id, head_revision, head_json, checkpoint_ref)
1226             VALUES ($1, $2, $3, $4)
1227             ON CONFLICT (session_id) DO UPDATE SET
1228                head_revision = EXCLUDED.head_revision,
1229                head_json = EXCLUDED.head_json,
1230                checkpoint_ref = EXCLUDED.checkpoint_ref
1231             WHERE lash_sessions.head_revision = $5",
1232        )
1233        .bind(&commit.session_id)
1234        .bind(next_revision as i64)
1235        .bind(encode_json(&meta))
1236        .bind(checkpoint_ref.as_str())
1237        .bind(actual_revision as i64)
1238        .execute(&mut *tx)
1239        .await
1240        .map_err(store_sqlx_error)?;
1241        if head_write.rows_affected() == 0 {
1242            // A concurrent commit won the race: the head no longer matches the
1243            // revision we read. Returning drops `tx` (auto-rollback), discarding
1244            // this attempt's node/usage writes; the caller reloads and retries.
1245            return Err(StoreError::HeadRevisionConflict {
1246                expected: commit.expected_head_revision.or(Some(actual_revision)),
1247                actual: actual_revision,
1248            });
1249        }
1250        for completed in &commit.completed_queue_claims {
1251            for batch_id in &completed.batch_ids {
1252                sqlx::query(
1253                    "DELETE FROM lash_queued_work_batches
1254                     WHERE session_id = $1 AND batch_id = $2 AND claim_id = $3 AND claim_token = $4",
1255                )
1256                .bind(&completed.session_id)
1257                .bind(batch_id)
1258                .bind(&completed.claim_id)
1259                .bind(&completed.lease_token)
1260                .execute(&mut *tx)
1261                .await
1262                .map_err(store_sqlx_error)?;
1263            }
1264        }
1265        commit_attachment_refs_tx(
1266            &mut tx,
1267            &commit.session_id,
1268            &commit.committed_attachment_ids,
1269        )
1270        .await?;
1271        let result = RuntimeCommitResult {
1272            head_revision: next_revision,
1273            checkpoint_ref,
1274            manifest,
1275        };
1276        if let Some(completed) = &commit.turn_commit {
1277            sqlx::query(
1278                "INSERT INTO lash_runtime_turn_commits (
1279                    session_id, turn_id, turn_commit_hash, result_json, committed_at_ms
1280                 )
1281                 VALUES ($1, $2, $3, $4, $5)",
1282            )
1283            .bind(&completed.session_id)
1284            .bind(&completed.turn_id)
1285            .bind(&completed.turn_commit_hash)
1286            .bind(encode_json(&result))
1287            .bind(current_epoch_ms() as i64)
1288            .execute(&mut *tx)
1289            .await
1290            .map_err(store_sqlx_error)?;
1291        }
1292        tx.commit().await.map_err(store_sqlx_error)?;
1293        Ok(result)
1294    }
1295
1296    async fn enqueue_queued_work(
1297        &self,
1298        batch: QueuedWorkBatchDraft,
1299    ) -> Result<QueuedWorkBatch, StoreError> {
1300        let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1301        if let Some(source_key) = batch.source_key.as_deref() {
1302            let existing_id: Option<String> = sqlx::query_scalar(
1303                "SELECT batch_id FROM lash_queued_work_batches
1304                 WHERE session_id = $1 AND source_key = $2",
1305            )
1306            .bind(&batch.session_id)
1307            .bind(source_key)
1308            .fetch_optional(&mut *tx)
1309            .await
1310            .map_err(store_sqlx_error)?;
1311            if let Some(batch_id) = existing_id {
1312                let existing = load_queued_batch(&mut tx, &batch_id)
1313                    .await?
1314                    .ok_or_else(|| {
1315                        StoreError::Backend("queued work source row disappeared".to_string())
1316                    })?;
1317                tx.commit().await.map_err(store_sqlx_error)?;
1318                return Ok(existing);
1319            }
1320        }
1321        let now = current_epoch_ms();
1322        let batch_id = format!(
1323            "qwb:{:x}",
1324            Sha256::digest(format!("{}:{:?}:{now}", batch.session_id, batch.source_key).as_bytes())
1325        );
1326        let row = sqlx::query_scalar::<_, i64>(
1327            "INSERT INTO lash_queued_work_batches (
1328                batch_id, session_id, source_key, delivery_policy, slot_policy,
1329                merge_key_json, available_at_ms, enqueued_at_ms
1330             )
1331             VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
1332             RETURNING enqueue_seq",
1333        )
1334        .bind(&batch_id)
1335        .bind(&batch.session_id)
1336        .bind(&batch.source_key)
1337        .bind(batch.delivery_policy.as_str())
1338        .bind(batch.slot_policy.as_str())
1339        .bind(encode_json(&batch.merge_key))
1340        .bind(batch.available_at_ms as i64)
1341        .bind(now as i64)
1342        .fetch_one(&mut *tx)
1343        .await
1344        .map_err(store_sqlx_error)?;
1345        for (index, payload) in batch.payloads.iter().enumerate() {
1346            let item_id = format!("{batch_id}:item:{index}");
1347            sqlx::query(
1348                "INSERT INTO lash_queued_work_items (batch_id, item_index, item_id, payload_json)
1349                 VALUES ($1, $2, $3, $4)",
1350            )
1351            .bind(&batch_id)
1352            .bind(index as i32)
1353            .bind(item_id)
1354            .bind(encode_json(payload))
1355            .execute(&mut *tx)
1356            .await
1357            .map_err(store_sqlx_error)?;
1358        }
1359        let queued = load_queued_batch(&mut tx, &batch_id)
1360            .await?
1361            .ok_or_else(|| StoreError::Backend("queued work insert disappeared".to_string()))?;
1362        debug_assert_eq!(queued.enqueue_seq, row as u64);
1363        tx.commit().await.map_err(store_sqlx_error)?;
1364        Ok(queued)
1365    }
1366
1367    async fn claim_ready_queued_work(
1368        &self,
1369        session_id: &str,
1370        owner_id: &str,
1371        boundary: QueuedWorkClaimBoundary,
1372        lease_ttl_ms: u64,
1373        max_batches: usize,
1374    ) -> Result<Option<QueuedWorkClaim>, StoreError> {
1375        if max_batches == 0 {
1376            return Ok(None);
1377        }
1378        let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1379        let now = current_epoch_ms();
1380        let rows = sqlx::query(
1381            "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1382                    slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1383                    claim_fencing_token
1384             FROM lash_queued_work_batches
1385             WHERE session_id = $1
1386               AND available_at_ms <= $2
1387               AND (claim_token IS NULL OR claim_expires_at_ms <= $2)
1388             ORDER BY enqueue_seq ASC
1389             LIMIT $3
1390             FOR UPDATE SKIP LOCKED",
1391        )
1392        .bind(session_id)
1393        .bind(now as i64)
1394        .bind((max_batches as i64).saturating_add(32))
1395        .fetch_all(&mut *tx)
1396        .await
1397        .map_err(store_sqlx_error)?;
1398        let mut candidates = Vec::new();
1399        for row in rows {
1400            candidates.push(queued_batch_row(row)?);
1401        }
1402        let Some(first_row) = candidates.first() else {
1403            tx.commit().await.map_err(store_sqlx_error)?;
1404            return Ok(None);
1405        };
1406        if boundary == QueuedWorkClaimBoundary::ActiveTurnCheckpoint
1407            && first_row.delivery_policy != DeliveryPolicy::EarliestSafeBoundary
1408        {
1409            tx.commit().await.map_err(store_sqlx_error)?;
1410            return Ok(None);
1411        }
1412        let first_slot = first_row.slot_policy;
1413        let first_delivery = first_row.delivery_policy;
1414        let first_merge = first_row.merge_key.clone();
1415        let mut selected = Vec::new();
1416        for row in candidates {
1417            if selected.len() >= max_batches {
1418                break;
1419            }
1420            if selected.is_empty() {
1421                selected.push(row);
1422                if first_slot == SlotPolicy::Exclusive {
1423                    break;
1424                }
1425                continue;
1426            }
1427            if first_slot != SlotPolicy::Join
1428                || row.slot_policy != SlotPolicy::Join
1429                || row.delivery_policy != first_delivery
1430                || row.merge_key != first_merge
1431            {
1432                break;
1433            }
1434            selected.push(row);
1435        }
1436        let Some(first) = selected.first() else {
1437            tx.commit().await.map_err(store_sqlx_error)?;
1438            return Ok(None);
1439        };
1440        let fencing_token = first.claim_fencing_token.saturating_add(1);
1441        let claim_id = format!("qwc:{}:{fencing_token}", first.enqueue_seq);
1442        let lease_token = format!(
1443            "{:x}",
1444            Sha256::digest(format!("{session_id}:{owner_id}:{claim_id}:{now}").as_bytes())
1445        );
1446        let expires_at = now.saturating_add(lease_ttl_ms);
1447        for row in &selected {
1448            let changed = sqlx::query(
1449                "UPDATE lash_queued_work_batches
1450                 SET claim_id = $3,
1451                     claim_owner_id = $4,
1452                     claim_token = $5,
1453                     claim_fencing_token = claim_fencing_token + 1,
1454                     claim_claimed_at_ms = $6,
1455                     claim_expires_at_ms = $7
1456                 WHERE session_id = $1
1457                   AND batch_id = $2
1458                   AND (claim_token IS NULL OR claim_expires_at_ms <= $6)",
1459            )
1460            .bind(session_id)
1461            .bind(&row.batch_id)
1462            .bind(&claim_id)
1463            .bind(owner_id)
1464            .bind(&lease_token)
1465            .bind(now as i64)
1466            .bind(expires_at as i64)
1467            .execute(&mut *tx)
1468            .await
1469            .map_err(store_sqlx_error)?
1470            .rows_affected();
1471            if changed == 0 {
1472                tx.rollback().await.map_err(store_sqlx_error)?;
1473                return Ok(None);
1474            }
1475        }
1476        let mut batches = Vec::new();
1477        for row in selected {
1478            batches.push(queued_work_batch_from_row(&mut tx, row).await?);
1479        }
1480        tx.commit().await.map_err(store_sqlx_error)?;
1481        Ok(Some(QueuedWorkClaim {
1482            session_id: session_id.to_string(),
1483            claim_id,
1484            owner_id: owner_id.to_string(),
1485            lease_token,
1486            fencing_token,
1487            claimed_at_epoch_ms: now,
1488            expires_at_epoch_ms: expires_at,
1489            batches,
1490        }))
1491    }
1492
1493    async fn renew_queued_work_claim(
1494        &self,
1495        claim: &QueuedWorkClaim,
1496        lease_ttl_ms: u64,
1497    ) -> Result<QueuedWorkClaim, StoreError> {
1498        let expires_at = current_epoch_ms().saturating_add(lease_ttl_ms);
1499        let changed = sqlx::query(
1500            "UPDATE lash_queued_work_batches
1501             SET claim_expires_at_ms = $4
1502             WHERE session_id = $1 AND claim_id = $2 AND claim_token = $3",
1503        )
1504        .bind(&claim.session_id)
1505        .bind(&claim.claim_id)
1506        .bind(&claim.lease_token)
1507        .bind(expires_at as i64)
1508        .execute(&self.pool)
1509        .await
1510        .map_err(store_sqlx_error)?
1511        .rows_affected();
1512        if changed as usize != claim.batches.len() {
1513            return Err(StoreError::QueuedWorkClaimExpired {
1514                session_id: claim.session_id.clone(),
1515                claim_id: claim.claim_id.clone(),
1516            });
1517        }
1518        Ok(QueuedWorkClaim {
1519            expires_at_epoch_ms: expires_at,
1520            ..claim.clone()
1521        })
1522    }
1523
1524    async fn abandon_queued_work_claim(&self, claim: &QueuedWorkClaim) -> Result<(), StoreError> {
1525        sqlx::query(
1526            "UPDATE lash_queued_work_batches
1527             SET claim_id = NULL,
1528                 claim_owner_id = NULL,
1529                 claim_token = NULL,
1530                 claim_claimed_at_ms = 0,
1531                 claim_expires_at_ms = 0
1532             WHERE session_id = $1 AND claim_id = $2 AND claim_token = $3",
1533        )
1534        .bind(&claim.session_id)
1535        .bind(&claim.claim_id)
1536        .bind(&claim.lease_token)
1537        .execute(&self.pool)
1538        .await
1539        .map_err(store_sqlx_error)?;
1540        Ok(())
1541    }
1542
1543    async fn cancel_queued_work_batch(
1544        &self,
1545        session_id: &str,
1546        batch_id: &str,
1547    ) -> Result<Option<QueuedWorkBatch>, StoreError> {
1548        let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1549        let now = current_epoch_ms();
1550        let row = sqlx::query(
1551            "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1552                    slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1553                    claim_fencing_token
1554             FROM lash_queued_work_batches
1555             WHERE session_id = $1
1556               AND batch_id = $2
1557               AND (claim_token IS NULL OR claim_expires_at_ms <= $3)
1558             FOR UPDATE",
1559        )
1560        .bind(session_id)
1561        .bind(batch_id)
1562        .bind(now as i64)
1563        .fetch_optional(&mut *tx)
1564        .await
1565        .map_err(store_sqlx_error)?;
1566        let Some(row) = row else {
1567            tx.commit().await.map_err(store_sqlx_error)?;
1568            return Ok(None);
1569        };
1570        let batch = queued_work_batch_from_row(&mut tx, queued_batch_row(row)?).await?;
1571        sqlx::query("DELETE FROM lash_queued_work_batches WHERE batch_id = $1")
1572            .bind(batch_id)
1573            .execute(&mut *tx)
1574            .await
1575            .map_err(store_sqlx_error)?;
1576        tx.commit().await.map_err(store_sqlx_error)?;
1577        Ok(Some(batch))
1578    }
1579
1580    async fn list_queued_work(&self, session_id: &str) -> Result<Vec<QueuedWorkBatch>, StoreError> {
1581        let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1582        let rows = sqlx::query(
1583            "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1584                    slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1585                    claim_fencing_token
1586             FROM lash_queued_work_batches
1587             WHERE session_id = $1
1588             ORDER BY enqueue_seq ASC",
1589        )
1590        .bind(session_id)
1591        .fetch_all(&mut *tx)
1592        .await
1593        .map_err(store_sqlx_error)?;
1594        let mut batches = Vec::new();
1595        for row in rows {
1596            batches.push(queued_work_batch_from_row(&mut tx, queued_batch_row(row)?).await?);
1597        }
1598        tx.commit().await.map_err(store_sqlx_error)?;
1599        Ok(batches)
1600    }
1601
1602    async fn list_pending_queued_work(
1603        &self,
1604        session_id: &str,
1605    ) -> Result<Vec<QueuedWorkBatch>, StoreError> {
1606        let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1607        let now = current_epoch_ms();
1608        let rows = sqlx::query(
1609            "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1610                    slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1611                    claim_fencing_token
1612             FROM lash_queued_work_batches
1613             WHERE session_id = $1
1614               AND (claim_token IS NULL OR claim_expires_at_ms <= $2)
1615             ORDER BY enqueue_seq ASC",
1616        )
1617        .bind(session_id)
1618        .bind(now as i64)
1619        .fetch_all(&mut *tx)
1620        .await
1621        .map_err(store_sqlx_error)?;
1622        let mut batches = Vec::new();
1623        for row in rows {
1624            batches.push(queued_work_batch_from_row(&mut tx, queued_batch_row(row)?).await?);
1625        }
1626        tx.commit().await.map_err(store_sqlx_error)?;
1627        Ok(batches)
1628    }
1629
1630    async fn save_session_meta(&self, meta: SessionMeta) -> Result<(), StoreError> {
1631        sqlx::query(
1632            "INSERT INTO lash_session_meta (session_id, meta_json)
1633             VALUES ($1, $2)
1634             ON CONFLICT (session_id) DO UPDATE SET meta_json = EXCLUDED.meta_json",
1635        )
1636        .bind(&meta.session_id)
1637        .bind(encode_json(&meta))
1638        .execute(&self.pool)
1639        .await
1640        .map_err(store_sqlx_error)?;
1641        Ok(())
1642    }
1643
1644    async fn load_session_meta(&self) -> Result<Option<SessionMeta>, StoreError> {
1645        let json: Option<String> = if let Some(session_id) = &self.session_id {
1646            sqlx::query_scalar("SELECT meta_json FROM lash_session_meta WHERE session_id = $1")
1647                .bind(session_id)
1648                .fetch_optional(&self.pool)
1649                .await
1650                .map_err(store_sqlx_error)?
1651        } else {
1652            sqlx::query_scalar(
1653                "SELECT meta_json FROM lash_session_meta ORDER BY session_id ASC LIMIT 1",
1654            )
1655            .fetch_optional(&self.pool)
1656            .await
1657            .map_err(store_sqlx_error)?
1658        };
1659        json.map(|json| store_decode_json(&json, "session meta"))
1660            .transpose()
1661    }
1662
1663    async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), StoreError> {
1664        for id in ids {
1665            if let Some(session_id) = &self.session_id {
1666                sqlx::query(
1667                    "UPDATE lash_graph_nodes
1668                     SET tombstoned = TRUE
1669                     WHERE session_id = $1 AND node_id = $2",
1670                )
1671                .bind(session_id)
1672                .bind(id)
1673                .execute(&self.pool)
1674                .await
1675                .map_err(store_sqlx_error)?;
1676            } else {
1677                sqlx::query(
1678                    "UPDATE lash_graph_nodes
1679                     SET tombstoned = TRUE
1680                     WHERE node_id = $1",
1681                )
1682                .bind(id)
1683                .execute(&self.pool)
1684                .await
1685                .map_err(store_sqlx_error)?;
1686            }
1687        }
1688        Ok(())
1689    }
1690
1691    async fn vacuum(&self) -> Result<VacuumReport, StoreError> {
1692        let removed = if let Some(session_id) = &self.session_id {
1693            sqlx::query("DELETE FROM lash_graph_nodes WHERE session_id = $1 AND tombstoned = TRUE")
1694                .bind(session_id)
1695                .execute(&self.pool)
1696                .await
1697                .map_err(store_sqlx_error)?
1698                .rows_affected()
1699        } else {
1700            sqlx::query("DELETE FROM lash_graph_nodes WHERE tombstoned = TRUE")
1701                .execute(&self.pool)
1702                .await
1703                .map_err(store_sqlx_error)?
1704                .rows_affected()
1705        };
1706        Ok(VacuumReport {
1707            removed_node_count: removed as usize,
1708        })
1709    }
1710
1711    async fn gc_unreachable(&self) -> Result<GcReport, StoreError> {
1712        Ok(GcReport::default())
1713    }
1714}
1715
1716fn process_status_label(record: &ProcessRecord) -> &'static str {
1717    record.status.label()
1718}
1719
1720async fn load_process_tx(
1721    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1722    process_id: &str,
1723) -> Result<Option<ProcessRecord>, PluginError> {
1724    let json: Option<String> = sqlx::query_scalar(
1725        "SELECT record_json
1726             FROM lash_processes
1727             WHERE process_id = $1
1728             FOR UPDATE",
1729    )
1730    .bind(process_id)
1731    .fetch_optional(&mut **tx)
1732    .await
1733    .map_err(plugin_sqlx_error)?;
1734    json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
1735        .transpose()
1736}
1737
1738async fn load_process(
1739    pool: &PgPool,
1740    process_id: &str,
1741) -> Result<Option<ProcessRecord>, PluginError> {
1742    let json: Option<String> =
1743        sqlx::query_scalar("SELECT record_json FROM lash_processes WHERE process_id = $1")
1744            .bind(process_id)
1745            .fetch_optional(pool)
1746            .await
1747            .map_err(plugin_sqlx_error)?;
1748    json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
1749        .transpose()
1750}
1751
1752async fn save_process_tx(
1753    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1754    record: &ProcessRecord,
1755) -> Result<(), PluginError> {
1756    sqlx::query(
1757        "UPDATE lash_processes
1758         SET updated_at_ms = $2, status = $3, record_json = $4
1759         WHERE process_id = $1",
1760    )
1761    .bind(&record.id)
1762    .bind(record.updated_at_ms as i64)
1763    .bind(process_status_label(record))
1764    .bind(serde_json::to_string(record).map_err(process_decode_error)?)
1765    .execute(&mut **tx)
1766    .await
1767    .map_err(plugin_sqlx_error)?;
1768    Ok(())
1769}
1770
1771async fn load_event_by_key_tx(
1772    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1773    process_id: &str,
1774    replay_key: &str,
1775) -> Result<Option<(String, ProcessEvent)>, PluginError> {
1776    let row = sqlx::query(
1777        "SELECT payload_hash, event_json
1778         FROM lash_process_events
1779         WHERE process_id = $1 AND idempotency_key = $2",
1780    )
1781    .bind(process_id)
1782    .bind(replay_key)
1783    .fetch_optional(&mut **tx)
1784    .await
1785    .map_err(plugin_sqlx_error)?;
1786    row.map(|row| {
1787        let hash: String = row.get(0);
1788        let json: String = row.get(1);
1789        serde_json::from_str(&json)
1790            .map(|event| (hash, event))
1791            .map_err(process_decode_error)
1792    })
1793    .transpose()
1794}
1795
1796async fn load_process_lease_tx(
1797    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1798    process_id: &str,
1799) -> Result<Option<ProcessLease>, PluginError> {
1800    let row = sqlx::query(
1801        "SELECT lease_owner_id, lease_token, lease_fencing_token,
1802                lease_claimed_at_ms, lease_expires_at_ms
1803         FROM lash_process_leases
1804         WHERE process_id = $1",
1805    )
1806    .bind(process_id)
1807    .fetch_optional(&mut **tx)
1808    .await
1809    .map_err(plugin_sqlx_error)?;
1810    let Some(row) = row else {
1811        return Ok(None);
1812    };
1813    let owner_id: Option<String> = row.get(0);
1814    let lease_token: Option<String> = row.get(1);
1815    let (Some(owner_id), Some(lease_token)) = (owner_id, lease_token) else {
1816        return Ok(None);
1817    };
1818    Ok(Some(ProcessLease {
1819        schema_version: PROCESS_LEASE_SCHEMA_VERSION,
1820        process_id: process_id.to_string(),
1821        owner_id,
1822        lease_token,
1823        fencing_token: row.get::<i64, _>(2) as u64,
1824        claimed_at_epoch_ms: row.get::<i64, _>(3) as u64,
1825        expires_at_epoch_ms: row.get::<i64, _>(4) as u64,
1826    }))
1827}
1828
1829fn process_lease_conflict(process_id: &str, current: &ProcessLease) -> PluginError {
1830    PluginError::Session(format!(
1831        "process `{process_id}` is already leased by `{}` until {}",
1832        current.owner_id, current.expires_at_epoch_ms
1833    ))
1834}
1835
1836fn process_lease_expired(process_id: &str) -> PluginError {
1837    PluginError::Session(format!(
1838        "process lease for `{process_id}` is missing or expired"
1839    ))
1840}
1841
1842fn guard_lease(current: Option<&ProcessLease>, lease_token: &str, now: u64) -> bool {
1843    current
1844        .map(|current| current.lease_token == lease_token && current.expires_at_epoch_ms > now)
1845        .unwrap_or(false)
1846}
1847
1848async fn list_grants_for_scope(
1849    pool: &PgPool,
1850    owner_scope: &ProcessScope,
1851    live_only: bool,
1852) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
1853    let status_clause = if live_only {
1854        "AND p.status = 'running'"
1855    } else {
1856        ""
1857    };
1858    let sql = format!(
1859        "SELECT g.process_id, g.descriptor_json, p.record_json
1860         FROM lash_process_handle_grants g
1861         JOIN lash_processes p ON p.process_id = g.process_id
1862         WHERE g.scope_id = $1 {status_clause}
1863         ORDER BY g.process_id ASC"
1864    );
1865    let rows = sqlx::query(&sql)
1866        .bind(owner_scope.id().as_str())
1867        .fetch_all(pool)
1868        .await
1869        .map_err(plugin_sqlx_error)?;
1870    let mut entries = Vec::new();
1871    for row in rows {
1872        let process_id: String = row.get(0);
1873        let descriptor_json: String = row.get(1);
1874        let record_json: String = row.get(2);
1875        let descriptor: ProcessHandleDescriptor =
1876            serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
1877        let record: ProcessRecord =
1878            serde_json::from_str(&record_json).map_err(process_decode_error)?;
1879        entries.push((
1880            ProcessHandleGrant {
1881                session_id: owner_scope.session_id.clone(),
1882                process_id,
1883                descriptor,
1884            },
1885            record,
1886        ));
1887    }
1888    Ok(entries)
1889}
1890
1891#[async_trait::async_trait]
1892impl ProcessRegistry for PostgresProcessRegistry {
1893    fn durability_tier(&self) -> DurabilityTier {
1894        DurabilityTier::Durable
1895    }
1896
1897    async fn register_process(
1898        &self,
1899        registration: ProcessRegistration,
1900    ) -> Result<ProcessRecord, PluginError> {
1901        let (registration, registration_hash) =
1902            lash_core::runtime::prepare_process_registration(registration)?;
1903        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
1904        if let Some(existing) = load_process_tx(&mut tx, &registration.id).await? {
1905            if existing.registration_hash == registration_hash {
1906                tx.commit().await.map_err(plugin_sqlx_error)?;
1907                return Ok(existing);
1908            }
1909            return Err(PluginError::Session(format!(
1910                "process `{}` registration hash conflict: existing {}, new {}",
1911                registration.id, existing.registration_hash, registration_hash
1912            )));
1913        }
1914        let now = current_epoch_ms();
1915        let record =
1916            ProcessRecord::from_prepared_registration(registration, registration_hash, now);
1917        let record_json = serde_json::to_string(&record).map_err(process_decode_error)?;
1918        sqlx::query(
1919            "INSERT INTO lash_processes (
1920                process_id, registration_hash, owner_scope_id, host_profile_id,
1921                created_at_ms, updated_at_ms, status, record_json
1922             )
1923             VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
1924        )
1925        .bind(&record.id)
1926        .bind(&record.registration_hash)
1927        .bind(record.owner_scope_id().as_str())
1928        .bind(record.host_profile_id())
1929        .bind(record.created_at_ms as i64)
1930        .bind(record.updated_at_ms as i64)
1931        .bind(process_status_label(&record))
1932        .bind(record_json)
1933        .execute(&mut *tx)
1934        .await
1935        .map_err(plugin_sqlx_error)?;
1936        tx.commit().await.map_err(plugin_sqlx_error)?;
1937        Ok(record)
1938    }
1939
1940    async fn set_external_ref(
1941        &self,
1942        process_id: &str,
1943        external_ref: ProcessExternalRef,
1944    ) -> Result<ProcessRecord, PluginError> {
1945        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
1946        let mut record = load_process_tx(&mut tx, process_id)
1947            .await?
1948            .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
1949        record.external_ref = Some(external_ref);
1950        record.updated_at_ms = current_epoch_ms();
1951        save_process_tx(&mut tx, &record).await?;
1952        tx.commit().await.map_err(plugin_sqlx_error)?;
1953        Ok(record)
1954    }
1955
1956    async fn grant_handle(
1957        &self,
1958        owner_scope: &ProcessScope,
1959        process_id: &str,
1960        descriptor: ProcessHandleDescriptor,
1961    ) -> Result<ProcessHandleGrant, PluginError> {
1962        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
1963        if load_process_tx(&mut tx, process_id).await?.is_none() {
1964            return Err(PluginError::Session(format!(
1965                "unknown process `{process_id}`"
1966            )));
1967        }
1968        sqlx::query(
1969            "INSERT INTO lash_process_handle_grants (session_id, scope_id, process_id, descriptor_json)
1970             VALUES ($1, $2, $3, $4)
1971             ON CONFLICT (scope_id, process_id) DO UPDATE SET
1972                session_id = EXCLUDED.session_id,
1973                descriptor_json = EXCLUDED.descriptor_json",
1974        )
1975        .bind(&owner_scope.session_id)
1976        .bind(owner_scope.id().as_str())
1977        .bind(process_id)
1978        .bind(serde_json::to_string(&descriptor).map_err(process_decode_error)?)
1979        .execute(&mut *tx)
1980        .await
1981        .map_err(plugin_sqlx_error)?;
1982        tx.commit().await.map_err(plugin_sqlx_error)?;
1983        Ok(ProcessHandleGrant {
1984            session_id: owner_scope.session_id.clone(),
1985            process_id: process_id.to_string(),
1986            descriptor,
1987        })
1988    }
1989
1990    async fn revoke_handle(
1991        &self,
1992        owner_scope: &ProcessScope,
1993        process_id: &str,
1994    ) -> Result<(), PluginError> {
1995        sqlx::query(
1996            "DELETE FROM lash_process_handle_grants WHERE scope_id = $1 AND process_id = $2",
1997        )
1998        .bind(owner_scope.id().as_str())
1999        .bind(process_id)
2000        .execute(&self.pool)
2001        .await
2002        .map_err(plugin_sqlx_error)?;
2003        Ok(())
2004    }
2005
2006    async fn transfer_handle_grants(
2007        &self,
2008        from_scope: &ProcessScope,
2009        to_scope: &ProcessScope,
2010        process_ids: &[String],
2011    ) -> Result<(), PluginError> {
2012        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2013        for process_id in process_ids {
2014            let descriptor_json: Option<String> = sqlx::query_scalar(
2015                "SELECT descriptor_json FROM lash_process_handle_grants
2016                 WHERE scope_id = $1 AND process_id = $2",
2017            )
2018            .bind(from_scope.id().as_str())
2019            .bind(process_id)
2020            .fetch_optional(&mut *tx)
2021            .await
2022            .map_err(plugin_sqlx_error)?;
2023            let Some(descriptor_json) = descriptor_json else {
2024                return Err(PluginError::Session(format!(
2025                    "process handle `{process_id}` is not granted to session `{}`",
2026                    from_scope.session_id
2027                )));
2028            };
2029            sqlx::query(
2030                "DELETE FROM lash_process_handle_grants WHERE scope_id = $1 AND process_id = $2",
2031            )
2032            .bind(from_scope.id().as_str())
2033            .bind(process_id)
2034            .execute(&mut *tx)
2035            .await
2036            .map_err(plugin_sqlx_error)?;
2037            sqlx::query(
2038                "INSERT INTO lash_process_handle_grants (session_id, scope_id, process_id, descriptor_json)
2039                 VALUES ($1, $2, $3, $4)
2040                 ON CONFLICT (scope_id, process_id) DO UPDATE SET
2041                    session_id = EXCLUDED.session_id,
2042                    descriptor_json = EXCLUDED.descriptor_json",
2043            )
2044            .bind(&to_scope.session_id)
2045            .bind(to_scope.id().as_str())
2046            .bind(process_id)
2047            .bind(descriptor_json)
2048            .execute(&mut *tx)
2049            .await
2050            .map_err(plugin_sqlx_error)?;
2051        }
2052        tx.commit().await.map_err(plugin_sqlx_error)
2053    }
2054
2055    async fn list_handle_grants(
2056        &self,
2057        owner_scope: &ProcessScope,
2058    ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
2059        list_grants_for_scope(&self.pool, owner_scope, false).await
2060    }
2061
2062    async fn list_live_handle_grants(
2063        &self,
2064        owner_scope: &ProcessScope,
2065    ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
2066        list_grants_for_scope(&self.pool, owner_scope, true).await
2067    }
2068
2069    async fn has_handle_grant(
2070        &self,
2071        owner_scope: &ProcessScope,
2072        process_id: &str,
2073    ) -> Result<bool, PluginError> {
2074        let exists: Option<i64> = sqlx::query_scalar(
2075            "SELECT 1::BIGINT FROM lash_process_handle_grants
2076             WHERE scope_id = $1 AND process_id = $2
2077             LIMIT 1",
2078        )
2079        .bind(owner_scope.id().as_str())
2080        .bind(process_id)
2081        .fetch_optional(&self.pool)
2082        .await
2083        .map_err(plugin_sqlx_error)?;
2084        Ok(exists.is_some())
2085    }
2086
2087    async fn handle_grants_for_process(
2088        &self,
2089        process_id: &str,
2090    ) -> Result<Vec<ProcessHandleGrant>, PluginError> {
2091        if load_process(&self.pool, process_id).await?.is_none() {
2092            return Err(PluginError::Session(format!(
2093                "unknown process `{process_id}`"
2094            )));
2095        }
2096        let rows = sqlx::query(
2097            "SELECT session_id, descriptor_json
2098             FROM lash_process_handle_grants
2099             WHERE process_id = $1
2100             ORDER BY session_id ASC, scope_id ASC",
2101        )
2102        .bind(process_id)
2103        .fetch_all(&self.pool)
2104        .await
2105        .map_err(plugin_sqlx_error)?;
2106        let mut grants = Vec::new();
2107        for row in rows {
2108            let descriptor_json: String = row.get(1);
2109            grants.push(ProcessHandleGrant {
2110                session_id: row.get(0),
2111                process_id: process_id.to_string(),
2112                descriptor: serde_json::from_str(&descriptor_json).map_err(process_decode_error)?,
2113            });
2114        }
2115        Ok(grants)
2116    }
2117
2118    async fn delete_session_process_state(
2119        &self,
2120        session_id: &str,
2121    ) -> Result<lash_core::ProcessSessionDeleteReport, PluginError> {
2122        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2123        let rows = sqlx::query(
2124            "SELECT g.process_id, p.record_json
2125             FROM lash_process_handle_grants g
2126             JOIN lash_processes p ON p.process_id = g.process_id
2127             WHERE g.session_id = $1
2128             ORDER BY g.process_id ASC",
2129        )
2130        .bind(session_id)
2131        .fetch_all(&mut *tx)
2132        .await
2133        .map_err(plugin_sqlx_error)?;
2134        let mut removed = Vec::new();
2135        for row in rows {
2136            let process_id: String = row.get(0);
2137            let record_json: String = row.get(1);
2138            let record: ProcessRecord =
2139                serde_json::from_str(&record_json).map_err(process_decode_error)?;
2140            removed.push((process_id, record));
2141        }
2142        let revoked = sqlx::query("DELETE FROM lash_process_handle_grants WHERE session_id = $1")
2143            .bind(session_id)
2144            .execute(&mut *tx)
2145            .await
2146            .map_err(plugin_sqlx_error)?
2147            .rows_affected() as usize;
2148        let mut cancel_process_ids = Vec::new();
2149        let mut preserved_process_ids = Vec::new();
2150        for (process_id, record) in removed {
2151            if record.is_terminal() {
2152                continue;
2153            }
2154            let remaining: i64 = sqlx::query_scalar(
2155                "SELECT COUNT(*) FROM lash_process_handle_grants WHERE process_id = $1",
2156            )
2157            .bind(&process_id)
2158            .fetch_one(&mut *tx)
2159            .await
2160            .map_err(plugin_sqlx_error)?;
2161            if remaining == 0 {
2162                cancel_process_ids.push(process_id);
2163            } else {
2164                preserved_process_ids.push(process_id);
2165            }
2166        }
2167        tx.commit().await.map_err(plugin_sqlx_error)?;
2168        cancel_process_ids.sort();
2169        cancel_process_ids.dedup();
2170        preserved_process_ids.sort();
2171        preserved_process_ids.dedup();
2172        Ok(lash_core::ProcessSessionDeleteReport {
2173            session_id: session_id.to_string(),
2174            revoked_handle_count: revoked,
2175            deleted_wake_count: 0,
2176            cancel_process_ids,
2177            preserved_process_ids,
2178        })
2179    }
2180
2181    async fn append_event(
2182        &self,
2183        process_id: &str,
2184        request: ProcessEventAppendRequest,
2185    ) -> Result<ProcessEventAppendResult, PluginError> {
2186        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2187        let mut record = load_process_tx(&mut tx, process_id)
2188            .await?
2189            .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
2190        let replay_lookup =
2191            if let Some(replay_key) = request.replay.as_ref().map(|r| r.key.as_str()) {
2192                load_event_by_key_tx(&mut tx, process_id, replay_key).await?
2193            } else {
2194                None
2195            };
2196        let sequence: i64 = sqlx::query_scalar(
2197            "SELECT COALESCE(MAX(sequence), 0) + 1 FROM lash_process_events WHERE process_id = $1",
2198        )
2199        .bind(process_id)
2200        .fetch_one(&mut *tx)
2201        .await
2202        .map_err(plugin_sqlx_error)?;
2203        let occurred_at_ms = current_epoch_ms();
2204        let prepared = lash_core::runtime::prepare_process_event_append(
2205            &record,
2206            request,
2207            sequence as u64,
2208            replay_lookup,
2209            occurred_at_ms,
2210        )?;
2211        if prepared.replayed {
2212            let repaired = if let Some(status) = prepared.status_update.clone() {
2213                record.status = status;
2214                record.updated_at_ms = prepared.occurred_at_ms;
2215                save_process_tx(&mut tx, &record).await?;
2216                true
2217            } else {
2218                false
2219            };
2220            tx.commit().await.map_err(plugin_sqlx_error)?;
2221            if repaired {
2222                self.notify.notify_waiters();
2223            }
2224            return Ok(ProcessEventAppendResult {
2225                event: prepared.event,
2226                wake_delivery: prepared.wake_delivery,
2227            });
2228        }
2229        let event = prepared.event;
2230        sqlx::query(
2231            "INSERT INTO lash_process_events (
2232                process_id, sequence, event_type, payload_hash, idempotency_key,
2233                occurred_at_ms, event_json
2234             )
2235             VALUES ($1, $2, $3, $4, $5, $6, $7)",
2236        )
2237        .bind(process_id)
2238        .bind(sequence)
2239        .bind(event.event_type.as_str())
2240        .bind(&prepared.payload_hash)
2241        .bind(event.invocation.replay_key())
2242        .bind(prepared.occurred_at_ms as i64)
2243        .bind(serde_json::to_string(&event).map_err(process_decode_error)?)
2244        .execute(&mut *tx)
2245        .await
2246        .map_err(plugin_sqlx_error)?;
2247        if let Some(status) = prepared.status_update.clone() {
2248            record.status = status;
2249        }
2250        record.updated_at_ms = prepared.occurred_at_ms;
2251        save_process_tx(&mut tx, &record).await?;
2252        tx.commit().await.map_err(plugin_sqlx_error)?;
2253        self.notify.notify_waiters();
2254        Ok(ProcessEventAppendResult {
2255            event,
2256            wake_delivery: prepared.wake_delivery,
2257        })
2258    }
2259
2260    async fn events_after(
2261        &self,
2262        process_id: &str,
2263        after_sequence: u64,
2264    ) -> Result<Vec<ProcessEvent>, PluginError> {
2265        if load_process(&self.pool, process_id).await?.is_none() {
2266            return Err(PluginError::Session(format!(
2267                "unknown process `{process_id}`"
2268            )));
2269        }
2270        let rows = sqlx::query(
2271            "SELECT event_json FROM lash_process_events
2272             WHERE process_id = $1 AND sequence > $2
2273             ORDER BY sequence ASC",
2274        )
2275        .bind(process_id)
2276        .bind(after_sequence as i64)
2277        .fetch_all(&self.pool)
2278        .await
2279        .map_err(plugin_sqlx_error)?;
2280        let mut events = Vec::new();
2281        for row in rows {
2282            let json: String = row.get(0);
2283            events.push(serde_json::from_str(&json).map_err(process_decode_error)?);
2284        }
2285        Ok(events)
2286    }
2287
2288    async fn wake_events_after(
2289        &self,
2290        process_id: &str,
2291        after_sequence: u64,
2292    ) -> Result<Vec<ProcessEvent>, PluginError> {
2293        let rows = sqlx::query("SELECT sequence FROM lash_process_wake_acks WHERE process_id = $1")
2294            .bind(process_id)
2295            .fetch_all(&self.pool)
2296            .await
2297            .map_err(plugin_sqlx_error)?;
2298        let acked = rows
2299            .into_iter()
2300            .map(|row| row.get::<i64, _>(0) as u64)
2301            .collect::<HashSet<_>>();
2302        Ok(self
2303            .events_after(process_id, after_sequence)
2304            .await?
2305            .into_iter()
2306            .filter(|event| event.semantics.wake.is_some() && !acked.contains(&event.sequence))
2307            .collect())
2308    }
2309
2310    async fn wait_event_after(
2311        &self,
2312        process_id: &str,
2313        event_type: &str,
2314        after_sequence: u64,
2315    ) -> Result<ProcessEvent, PluginError> {
2316        loop {
2317            if let Some(event) = self
2318                .events_after(process_id, after_sequence)
2319                .await?
2320                .into_iter()
2321                .find(|event| event.event_type == event_type)
2322            {
2323                return Ok(event);
2324            }
2325            tokio::select! {
2326                _ = self.notify.notified() => {}
2327                _ = tokio::time::sleep(Duration::from_millis(50)) => {}
2328            }
2329        }
2330    }
2331
2332    async fn await_process(&self, process_id: &str) -> Result<ProcessAwaitOutput, PluginError> {
2333        loop {
2334            let record = load_process(&self.pool, process_id)
2335                .await?
2336                .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
2337            if let Some(await_output) = record.status.await_output() {
2338                return Ok(await_output.clone());
2339            }
2340            tokio::select! {
2341                _ = self.notify.notified() => {}
2342                _ = tokio::time::sleep(Duration::from_millis(50)) => {}
2343            }
2344        }
2345    }
2346
2347    async fn complete_process(
2348        &self,
2349        process_id: &str,
2350        await_output: ProcessAwaitOutput,
2351    ) -> Result<ProcessRecord, PluginError> {
2352        let event_type = match await_output.terminal_state() {
2353            lash_core::ProcessTerminalState::Completed => "process.completed",
2354            lash_core::ProcessTerminalState::Failed => "process.failed",
2355            lash_core::ProcessTerminalState::Cancelled => "process.cancelled",
2356        };
2357        self.append_event(
2358            process_id,
2359            ProcessEventAppendRequest::new(
2360                event_type,
2361                serde_json::json!({ "await_output": await_output }),
2362            )
2363            .with_replay_key(format!("process:{process_id}:terminal:{event_type}")),
2364        )
2365        .await?;
2366        load_process(&self.pool, process_id).await?.ok_or_else(|| {
2367            PluginError::Session(format!(
2368                "unknown process `{process_id}` after terminal event"
2369            ))
2370        })
2371    }
2372
2373    async fn get_process(&self, process_id: &str) -> Option<ProcessRecord> {
2374        load_process(&self.pool, process_id).await.ok().flatten()
2375    }
2376
2377    async fn ack_wake(&self, process_id: &str, sequence: u64) -> Result<(), PluginError> {
2378        if load_process(&self.pool, process_id).await?.is_none() {
2379            return Err(PluginError::Session(format!(
2380                "unknown process `{process_id}`"
2381            )));
2382        }
2383        sqlx::query(
2384            "INSERT INTO lash_process_wake_acks (process_id, sequence)
2385             VALUES ($1, $2)
2386             ON CONFLICT DO NOTHING",
2387        )
2388        .bind(process_id)
2389        .bind(sequence as i64)
2390        .execute(&self.pool)
2391        .await
2392        .map_err(plugin_sqlx_error)?;
2393        Ok(())
2394    }
2395
2396    async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, PluginError> {
2397        let rows = sqlx::query(
2398            "SELECT record_json FROM lash_processes
2399             WHERE status = 'running'
2400             ORDER BY process_id ASC",
2401        )
2402        .fetch_all(&self.pool)
2403        .await
2404        .map_err(plugin_sqlx_error)?;
2405        let mut records = Vec::new();
2406        for row in rows {
2407            let json: String = row.get(0);
2408            records.push(serde_json::from_str(&json).map_err(process_decode_error)?);
2409        }
2410        Ok(records)
2411    }
2412
2413    async fn claim_process_lease(
2414        &self,
2415        process_id: &str,
2416        owner_id: &str,
2417        lease_ttl_ms: u64,
2418    ) -> Result<ProcessLease, PluginError> {
2419        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2420        if load_process_tx(&mut tx, process_id).await?.is_none() {
2421            return Err(PluginError::Session(format!(
2422                "unknown process `{process_id}`"
2423            )));
2424        }
2425        let now = current_epoch_ms();
2426        let current = load_process_lease_tx(&mut tx, process_id).await?;
2427        if let Some(current) = current.as_ref()
2428            && current.expires_at_epoch_ms > now
2429            && current.owner_id != owner_id
2430        {
2431            return Err(process_lease_conflict(process_id, current));
2432        }
2433        let existing_fence: Option<i64> = sqlx::query_scalar(
2434            "SELECT lease_fencing_token FROM lash_process_leases WHERE process_id = $1 FOR UPDATE",
2435        )
2436        .bind(process_id)
2437        .fetch_optional(&mut *tx)
2438        .await
2439        .map_err(plugin_sqlx_error)?;
2440        let fencing_token = existing_fence.unwrap_or(0) as u64 + 1;
2441        let lease = ProcessLease {
2442            schema_version: PROCESS_LEASE_SCHEMA_VERSION,
2443            process_id: process_id.to_string(),
2444            owner_id: owner_id.to_string(),
2445            lease_token: format!(
2446                "{:x}",
2447                Sha256::digest(format!("{process_id}:{owner_id}:{now}:{fencing_token}").as_bytes())
2448            ),
2449            fencing_token,
2450            claimed_at_epoch_ms: now,
2451            expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
2452        };
2453        sqlx::query(
2454            "INSERT INTO lash_process_leases (
2455                process_id, lease_owner_id, lease_token, lease_fencing_token,
2456                lease_claimed_at_ms, lease_expires_at_ms
2457             )
2458             VALUES ($1, $2, $3, $4, $5, $6)
2459             ON CONFLICT (process_id) DO UPDATE SET
2460                lease_owner_id = EXCLUDED.lease_owner_id,
2461                lease_token = EXCLUDED.lease_token,
2462                lease_fencing_token = EXCLUDED.lease_fencing_token,
2463                lease_claimed_at_ms = EXCLUDED.lease_claimed_at_ms,
2464                lease_expires_at_ms = EXCLUDED.lease_expires_at_ms",
2465        )
2466        .bind(&lease.process_id)
2467        .bind(&lease.owner_id)
2468        .bind(&lease.lease_token)
2469        .bind(lease.fencing_token as i64)
2470        .bind(lease.claimed_at_epoch_ms as i64)
2471        .bind(lease.expires_at_epoch_ms as i64)
2472        .execute(&mut *tx)
2473        .await
2474        .map_err(plugin_sqlx_error)?;
2475        tx.commit().await.map_err(plugin_sqlx_error)?;
2476        Ok(lease)
2477    }
2478
2479    async fn renew_process_lease(
2480        &self,
2481        lease: &ProcessLease,
2482        lease_ttl_ms: u64,
2483    ) -> Result<ProcessLease, PluginError> {
2484        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2485        let now = current_epoch_ms();
2486        let current = load_process_lease_tx(&mut tx, &lease.process_id).await?;
2487        if !guard_lease(current.as_ref(), &lease.lease_token, now) {
2488            return Err(process_lease_expired(&lease.process_id));
2489        }
2490        let renewed = ProcessLease {
2491            expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
2492            ..lease.clone()
2493        };
2494        sqlx::query(
2495            "UPDATE lash_process_leases
2496             SET lease_expires_at_ms = $2
2497             WHERE process_id = $1 AND lease_token = $3",
2498        )
2499        .bind(&renewed.process_id)
2500        .bind(renewed.expires_at_epoch_ms as i64)
2501        .bind(&renewed.lease_token)
2502        .execute(&mut *tx)
2503        .await
2504        .map_err(plugin_sqlx_error)?;
2505        tx.commit().await.map_err(plugin_sqlx_error)?;
2506        Ok(renewed)
2507    }
2508
2509    async fn complete_process_lease(
2510        &self,
2511        completion: &ProcessLeaseCompletion,
2512    ) -> Result<(), PluginError> {
2513        sqlx::query(
2514            "UPDATE lash_process_leases
2515             SET lease_owner_id = NULL,
2516                 lease_token = NULL,
2517                 lease_claimed_at_ms = 0,
2518                 lease_expires_at_ms = 0
2519             WHERE process_id = $1 AND lease_token = $2",
2520        )
2521        .bind(&completion.process_id)
2522        .bind(&completion.lease_token)
2523        .execute(&self.pool)
2524        .await
2525        .map_err(plugin_sqlx_error)?;
2526        Ok(())
2527    }
2528}
2529
2530#[async_trait::async_trait]
2531impl HostEventStore for PostgresHostEventStore {
2532    fn durability_tier(&self) -> DurabilityTier {
2533        DurabilityTier::Durable
2534    }
2535
2536    async fn register_subscription(
2537        &self,
2538        draft: TriggerSubscriptionDraft,
2539    ) -> Result<TriggerSubscriptionRecord, PluginError> {
2540        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2541        let seq: i64 = sqlx::query_scalar("SELECT nextval('lash_host_event_subscription_seq')")
2542            .fetch_one(&mut *tx)
2543            .await
2544            .map_err(plugin_sqlx_error)?;
2545        let handle = format!("trigger:{seq}");
2546        let subscription_id = format!("subscription:{seq}");
2547        let now = current_epoch_ms();
2548        let record = TriggerSubscriptionRecord {
2549            subscription_id: subscription_id.clone(),
2550            session_id: draft.session_id,
2551            handle,
2552            name: draft.name,
2553            source_type: draft.source_type,
2554            source_key: draft.source_key,
2555            source: draft.source,
2556            event_ty: draft.event_ty,
2557            module_ref: draft.module_ref,
2558            required_surface_ref: draft.required_surface_ref,
2559            process_ref: draft.process_ref,
2560            process_name: draft.process_name,
2561            input_template: draft.input_template,
2562            enabled: true,
2563            created_at_ms: now,
2564            updated_at_ms: now,
2565        };
2566        sqlx::query(
2567            "INSERT INTO lash_host_event_trigger_subscriptions (
2568                subscription_id, session_id, handle, source_type, source_key,
2569                enabled, created_at_ms, updated_at_ms, record_json
2570             )
2571             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
2572        )
2573        .bind(&record.subscription_id)
2574        .bind(&record.session_id)
2575        .bind(&record.handle)
2576        .bind(&record.source_type)
2577        .bind(&record.source_key)
2578        .bind(record.enabled)
2579        .bind(record.created_at_ms as i64)
2580        .bind(record.updated_at_ms as i64)
2581        .bind(serde_json::to_string(&record).map_err(process_decode_error)?)
2582        .execute(&mut *tx)
2583        .await
2584        .map_err(plugin_sqlx_error)?;
2585        tx.commit().await.map_err(plugin_sqlx_error)?;
2586        Ok(record)
2587    }
2588
2589    async fn list_subscriptions(
2590        &self,
2591        filter: TriggerSubscriptionFilter,
2592    ) -> Result<Vec<TriggerSubscriptionRecord>, PluginError> {
2593        let rows = sqlx::query(
2594            "SELECT record_json FROM lash_host_event_trigger_subscriptions
2595             ORDER BY session_id ASC, handle ASC",
2596        )
2597        .fetch_all(&self.pool)
2598        .await
2599        .map_err(plugin_sqlx_error)?;
2600        let mut records = Vec::new();
2601        for row in rows {
2602            let json: String = row.get(0);
2603            let record: TriggerSubscriptionRecord =
2604                serde_json::from_str(&json).map_err(process_decode_error)?;
2605            if filter.matches(&record) {
2606                records.push(record);
2607            }
2608        }
2609        Ok(records)
2610    }
2611
2612    async fn cancel_subscription(
2613        &self,
2614        session_id: &str,
2615        handle: &str,
2616    ) -> Result<bool, PluginError> {
2617        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2618        let json: Option<String> = sqlx::query_scalar(
2619            "SELECT record_json FROM lash_host_event_trigger_subscriptions
2620             WHERE session_id = $1 AND handle = $2
2621             FOR UPDATE",
2622        )
2623        .bind(session_id)
2624        .bind(handle)
2625        .fetch_optional(&mut *tx)
2626        .await
2627        .map_err(plugin_sqlx_error)?;
2628        let Some(json) = json else {
2629            tx.commit().await.map_err(plugin_sqlx_error)?;
2630            return Ok(false);
2631        };
2632        let mut record: TriggerSubscriptionRecord =
2633            serde_json::from_str(&json).map_err(process_decode_error)?;
2634        let changed = record.enabled;
2635        record.enabled = false;
2636        record.updated_at_ms = current_epoch_ms();
2637        sqlx::query(
2638            "UPDATE lash_host_event_trigger_subscriptions
2639             SET enabled = $3, updated_at_ms = $4, record_json = $5
2640             WHERE session_id = $1 AND handle = $2",
2641        )
2642        .bind(session_id)
2643        .bind(handle)
2644        .bind(record.enabled)
2645        .bind(record.updated_at_ms as i64)
2646        .bind(serde_json::to_string(&record).map_err(process_decode_error)?)
2647        .execute(&mut *tx)
2648        .await
2649        .map_err(plugin_sqlx_error)?;
2650        tx.commit().await.map_err(plugin_sqlx_error)?;
2651        Ok(changed)
2652    }
2653
2654    async fn record_occurrence(
2655        &self,
2656        request: HostEventOccurrenceRequest,
2657    ) -> Result<HostEventOccurrenceRecord, PluginError> {
2658        lash_core::validate_host_event_occurrence_request(&request)?;
2659        let request_hash = lash_core::host_event_occurrence_request_hash(&request)?;
2660        let occurrence_id = lash_core::deterministic_occurrence_id(&request)?;
2661        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2662        let existing = sqlx::query(
2663            "SELECT request_hash, record_json
2664             FROM lash_host_event_occurrences
2665             WHERE idempotency_key = $1
2666             FOR UPDATE",
2667        )
2668        .bind(&request.idempotency_key)
2669        .fetch_optional(&mut *tx)
2670        .await
2671        .map_err(plugin_sqlx_error)?;
2672        if let Some(row) = existing {
2673            let existing_hash: String = row.get(0);
2674            let existing_json: String = row.get(1);
2675            if existing_hash != request_hash {
2676                return Err(PluginError::Session(format!(
2677                    "host event occurrence idempotency conflict for `{}`",
2678                    request.idempotency_key
2679                )));
2680            }
2681            let record = serde_json::from_str(&existing_json).map_err(process_decode_error)?;
2682            tx.commit().await.map_err(plugin_sqlx_error)?;
2683            return Ok(record);
2684        }
2685        let record = HostEventOccurrenceRecord {
2686            occurrence_id: occurrence_id.clone(),
2687            source_type: request.source_type,
2688            source_key: request.source_key,
2689            payload: request.payload,
2690            idempotency_key: request.idempotency_key.clone(),
2691            source: request.source,
2692            occurred_at_ms: current_epoch_ms(),
2693        };
2694        sqlx::query(
2695            "INSERT INTO lash_host_event_occurrences (
2696                occurrence_id, idempotency_key, request_hash, source_type, source_key,
2697                occurred_at_ms, record_json
2698             )
2699             VALUES ($1, $2, $3, $4, $5, $6, $7)",
2700        )
2701        .bind(&record.occurrence_id)
2702        .bind(&record.idempotency_key)
2703        .bind(&request_hash)
2704        .bind(&record.source_type)
2705        .bind(&record.source_key)
2706        .bind(record.occurred_at_ms as i64)
2707        .bind(serde_json::to_string(&record).map_err(process_decode_error)?)
2708        .execute(&mut *tx)
2709        .await
2710        .map_err(plugin_sqlx_error)?;
2711        tx.commit().await.map_err(plugin_sqlx_error)?;
2712        Ok(record)
2713    }
2714
2715    async fn reserve_matching_deliveries(
2716        &self,
2717        occurrence_id: &str,
2718    ) -> Result<Vec<TriggerDeliveryReservation>, PluginError> {
2719        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2720        let occurrence_json: Option<String> = sqlx::query_scalar(
2721            "SELECT record_json FROM lash_host_event_occurrences WHERE occurrence_id = $1",
2722        )
2723        .bind(occurrence_id)
2724        .fetch_optional(&mut *tx)
2725        .await
2726        .map_err(plugin_sqlx_error)?;
2727        let Some(occurrence_json) = occurrence_json else {
2728            return Err(PluginError::Session(format!(
2729                "unknown host event occurrence `{occurrence_id}`"
2730            )));
2731        };
2732        let occurrence: HostEventOccurrenceRecord =
2733            serde_json::from_str(&occurrence_json).map_err(process_decode_error)?;
2734        let rows = sqlx::query(
2735            "SELECT record_json FROM lash_host_event_trigger_subscriptions
2736             WHERE enabled = TRUE AND source_type = $1 AND source_key = $2
2737             ORDER BY session_id ASC, handle ASC",
2738        )
2739        .bind(&occurrence.source_type)
2740        .bind(&occurrence.source_key)
2741        .fetch_all(&mut *tx)
2742        .await
2743        .map_err(plugin_sqlx_error)?;
2744        let mut deliveries = Vec::new();
2745        for row in rows {
2746            let json: String = row.get(0);
2747            let subscription: TriggerSubscriptionRecord =
2748                serde_json::from_str(&json).map_err(process_decode_error)?;
2749            let process_id = lash_core::deterministic_delivery_process_id(
2750                &occurrence.occurrence_id,
2751                &subscription.subscription_id,
2752            )?;
2753            let inserted = sqlx::query(
2754                "INSERT INTO lash_host_event_deliveries (
2755                    occurrence_id, subscription_id, process_id, created_at_ms
2756                 )
2757                 VALUES ($1, $2, $3, $4)
2758                 ON CONFLICT DO NOTHING",
2759            )
2760            .bind(&occurrence.occurrence_id)
2761            .bind(&subscription.subscription_id)
2762            .bind(&process_id)
2763            .bind(current_epoch_ms() as i64)
2764            .execute(&mut *tx)
2765            .await
2766            .map_err(plugin_sqlx_error)?
2767            .rows_affected();
2768            if inserted == 0 {
2769                continue;
2770            }
2771            deliveries.push(TriggerDeliveryReservation {
2772                occurrence: occurrence.clone(),
2773                subscription,
2774                process_id,
2775            });
2776        }
2777        tx.commit().await.map_err(plugin_sqlx_error)?;
2778        Ok(deliveries)
2779    }
2780}
2781
2782#[async_trait::async_trait]
2783impl lashlang::LashlangArtifactStore for PostgresLashlangArtifactStore {
2784    fn durability_tier(&self) -> DurabilityTier {
2785        DurabilityTier::Durable
2786    }
2787
2788    async fn put_module_artifact(
2789        &self,
2790        artifact: &lashlang::ModuleArtifact,
2791    ) -> Result<(), lashlang::ArtifactStoreError> {
2792        let bytes = artifact
2793            .to_store_bytes()
2794            .map_err(lashlang::ArtifactStoreError::from)?;
2795        sqlx::query(
2796            "INSERT INTO lash_lashlang_artifacts (module_ref, artifact_bytes)
2797             VALUES ($1, $2)
2798             ON CONFLICT (module_ref) DO UPDATE SET artifact_bytes = EXCLUDED.artifact_bytes",
2799        )
2800        .bind(artifact.module_ref.as_str())
2801        .bind(bytes)
2802        .execute(&self.pool)
2803        .await
2804        .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?;
2805        Ok(())
2806    }
2807
2808    async fn get_module_artifact(
2809        &self,
2810        module_ref: &lashlang::ModuleRef,
2811    ) -> Result<Option<Arc<lashlang::ModuleArtifact>>, lashlang::ArtifactStoreError> {
2812        let bytes: Option<Vec<u8>> = sqlx::query_scalar(
2813            "SELECT artifact_bytes FROM lash_lashlang_artifacts WHERE module_ref = $1",
2814        )
2815        .bind(module_ref.as_str())
2816        .fetch_optional(&self.pool)
2817        .await
2818        .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?;
2819        bytes
2820            .map(|bytes| {
2821                lashlang::ModuleArtifact::from_store_bytes(&bytes)
2822                    .map(Arc::new)
2823                    .map_err(lashlang::ArtifactStoreError::from)
2824            })
2825            .transpose()
2826    }
2827}