Skip to main content

lash_postgres_store/
lib.rs

1//! PostgreSQL durable storage for Lash.
2//!
3//! One [`PostgresStorage`] owns a shared [`sqlx::PgPool`] and creates durable
4//! implementations for the runtime session store, process registry, host-event
5//! store, Lashlang artifact store, and attachment manifest.
6
7use std::collections::HashSet;
8use std::sync::{Arc, OnceLock};
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11use lash_core::runtime::{
12    ProcessHandleGrantEntry, QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim,
13    QueuedWorkClaimBoundary, QueuedWorkCompletion, QueuedWorkItem,
14};
15use lash_core::store::{
16    GraphCommitDelta, HydratedSessionCheckpoint, PersistedSessionRead, RuntimeCommit,
17    RuntimeCommitResult, SessionCheckpoint, SessionHeadMeta,
18};
19use lash_core::{
20    AttachmentId, AttachmentIntent, AttachmentManifest, AttachmentManifestEntry, BlobRef,
21    DeliveryPolicy, DurabilityTier, GcReport, MergeKey, ProcessAwaitOutput, ProcessEvent,
22    ProcessEventAppendRequest, ProcessEventAppendResult, ProcessExternalRef,
23    ProcessHandleDescriptor, ProcessHandleGrant, ProcessLease, ProcessLeaseCompletion,
24    ProcessRecord, ProcessRegistration, ProcessRegistry, ProcessScope, RuntimePersistence,
25    SessionMeta, SessionNodeRecord, SessionReadScope, SessionStoreCreateRequest,
26    SessionStoreFactory, SlotPolicy, StoreError, TokenLedgerEntry, VacuumReport,
27};
28use lash_core::{
29    HostEventOccurrenceRecord, HostEventOccurrenceRequest, HostEventStore, PluginError,
30    TriggerDeliveryReservation, TriggerSubscriptionDraft, TriggerSubscriptionFilter,
31    TriggerSubscriptionRecord,
32};
33use sha2::{Digest, Sha256};
34use sqlx::postgres::{PgPool, PgPoolOptions, PgRow};
35use sqlx::{Executor, Row};
36
37const SCHEMA_COMPONENT: &str = "lash-postgres-store";
38const SCHEMA_VERSION: i32 = 1;
39const PROCESS_LEASE_SCHEMA_VERSION: u32 = lash_core::PROCESS_LEASE_SCHEMA_VERSION;
40
41#[derive(Clone)]
42pub struct PostgresStorage {
43    pool: PgPool,
44}
45
46#[derive(Clone)]
47pub struct PostgresSessionStoreFactory {
48    pool: PgPool,
49}
50
51#[derive(Clone)]
52pub struct PostgresSessionStore {
53    pool: PgPool,
54    /// Explicit session binding for handles created via the factory.
55    session_id: Option<String>,
56    /// In-memory bind-on-first-commit for an *unbound* handle. A session-store
57    /// handle commits to exactly one session; an unbound handle latches the first
58    /// session it commits and rejects others (Postgres is multi-session per
59    /// database, so this can't be inferred from a singleton head row the way the
60    /// single-file SQLite store does). Shared across clones via `Arc`.
61    bound_session: Arc<OnceLock<String>>,
62}
63
64#[derive(Clone)]
65pub struct PostgresProcessRegistry {
66    pool: PgPool,
67    notify: Arc<tokio::sync::Notify>,
68}
69
70#[derive(Clone)]
71pub struct PostgresHostEventStore {
72    pool: PgPool,
73}
74
75#[derive(Clone)]
76pub struct PostgresLashlangArtifactStore {
77    pool: PgPool,
78}
79
80/// Connection-pool and per-connection timeout knobs for [`PostgresStorage`].
81///
82/// Session commits use **optimistic CAS** on the head (`UPDATE … WHERE
83/// head_revision = expected`), not a held `SELECT … FOR UPDATE`, so concurrent
84/// writers never pin a pool connection while blocked on a lock. `lock_timeout` is
85/// defense in depth: it caps how long the single CAS write may wait on the head
86/// row's lock before erroring (surfaced as a retryable conflict), so a pathological
87/// burst can never starve the pool.
88#[derive(Clone, Debug)]
89pub struct PostgresStoreConfig {
90    /// Maximum pooled connections. Default 16.
91    pub max_connections: u32,
92    /// Minimum idle connections kept warm. Default 0.
93    pub min_connections: u32,
94    /// How long `acquire` waits for a free connection before erroring. Default 30s.
95    pub acquire_timeout: Duration,
96    /// Close a connection after this idle period. Default 10m.
97    pub idle_timeout: Option<Duration>,
98    /// Recycle a connection after this lifetime. Default 30m.
99    pub max_lifetime: Option<Duration>,
100    /// Postgres `lock_timeout` applied to every connection. Default 10s.
101    pub lock_timeout: Option<Duration>,
102    /// Postgres `statement_timeout` applied to every connection. Default 30s — a
103    /// backstop so a wedged query can never hold a connection indefinitely.
104    pub statement_timeout: Option<Duration>,
105}
106
107impl Default for PostgresStoreConfig {
108    fn default() -> Self {
109        Self {
110            max_connections: 16,
111            min_connections: 0,
112            acquire_timeout: Duration::from_secs(30),
113            idle_timeout: Some(Duration::from_secs(600)),
114            max_lifetime: Some(Duration::from_secs(1800)),
115            lock_timeout: Some(Duration::from_secs(10)),
116            statement_timeout: Some(Duration::from_secs(30)),
117        }
118    }
119}
120
121impl PostgresStorage {
122    /// Connect with [`PostgresStoreConfig::default`] pool/timeout settings.
123    pub async fn connect(database_url: &str) -> Result<Self, StoreError> {
124        Self::connect_with(database_url, PostgresStoreConfig::default()).await
125    }
126
127    /// Connect with explicit pool sizing and per-connection timeouts.
128    pub async fn connect_with(
129        database_url: &str,
130        config: PostgresStoreConfig,
131    ) -> Result<Self, StoreError> {
132        let lock_ms = config.lock_timeout.map(|d| d.as_millis().max(1) as u64);
133        let statement_ms = config
134            .statement_timeout
135            .map(|d| d.as_millis().max(1) as u64);
136        let mut options = PgPoolOptions::new()
137            .max_connections(config.max_connections)
138            .min_connections(config.min_connections)
139            .acquire_timeout(config.acquire_timeout);
140        if let Some(timeout) = config.idle_timeout {
141            options = options.idle_timeout(timeout);
142        }
143        if let Some(timeout) = config.max_lifetime {
144            options = options.max_lifetime(timeout);
145        }
146        let pool = options
147            .after_connect(move |conn, _meta| {
148                Box::pin(async move {
149                    if let Some(ms) = lock_ms {
150                        conn.execute(format!("SET lock_timeout = {ms}").as_str())
151                            .await?;
152                    }
153                    if let Some(ms) = statement_ms {
154                        conn.execute(format!("SET statement_timeout = {ms}").as_str())
155                            .await?;
156                    }
157                    Ok(())
158                })
159            })
160            .connect(database_url)
161            .await
162            .map_err(store_sqlx_error)?;
163        ensure_schema(&pool).await?;
164        Ok(Self { pool })
165    }
166
167    pub fn from_pool(pool: PgPool) -> Self {
168        Self { pool }
169    }
170
171    pub fn pool(&self) -> &PgPool {
172        &self.pool
173    }
174
175    pub fn session_store_factory(&self) -> PostgresSessionStoreFactory {
176        PostgresSessionStoreFactory {
177            pool: self.pool.clone(),
178        }
179    }
180
181    pub fn session_store(&self, session_id: impl Into<String>) -> PostgresSessionStore {
182        PostgresSessionStore {
183            pool: self.pool.clone(),
184            session_id: Some(session_id.into()),
185            bound_session: Arc::new(OnceLock::new()),
186        }
187    }
188
189    pub fn unbound_session_store(&self) -> PostgresSessionStore {
190        PostgresSessionStore {
191            pool: self.pool.clone(),
192            session_id: None,
193            bound_session: Arc::new(OnceLock::new()),
194        }
195    }
196
197    pub fn process_registry(&self) -> PostgresProcessRegistry {
198        PostgresProcessRegistry {
199            pool: self.pool.clone(),
200            notify: Arc::new(tokio::sync::Notify::new()),
201        }
202    }
203
204    pub fn host_event_store(&self) -> PostgresHostEventStore {
205        PostgresHostEventStore {
206            pool: self.pool.clone(),
207        }
208    }
209
210    pub fn lashlang_artifact_store(&self) -> PostgresLashlangArtifactStore {
211        PostgresLashlangArtifactStore {
212            pool: self.pool.clone(),
213        }
214    }
215}
216
217impl PostgresSessionStoreFactory {
218    pub fn new(storage: &PostgresStorage) -> Self {
219        storage.session_store_factory()
220    }
221}
222
223impl PostgresSessionStore {
224    pub fn unbound(storage: &PostgresStorage) -> Self {
225        storage.unbound_session_store()
226    }
227
228    async fn selected_session_id(&self) -> Result<Option<String>, StoreError> {
229        if let Some(session_id) = &self.session_id {
230            return Ok(Some(session_id.clone()));
231        }
232        sqlx::query_scalar("SELECT session_id FROM lash_sessions ORDER BY session_id ASC LIMIT 1")
233            .fetch_optional(&self.pool)
234            .await
235            .map_err(store_sqlx_error)
236    }
237}
238
239async fn ensure_schema(pool: &PgPool) -> Result<(), StoreError> {
240    let mut tx = pool.begin().await.map_err(store_sqlx_error)?;
241    tx.execute("SELECT pg_advisory_xact_lock(715421, 907001)")
242        .await
243        .map_err(store_sqlx_error)?;
244    tx.execute(
245        r#"
246        CREATE TABLE IF NOT EXISTS lash_schema_versions (
247            component TEXT PRIMARY KEY,
248            version INTEGER NOT NULL
249        );
250
251        CREATE TABLE IF NOT EXISTS lash_blobs (
252            hash TEXT PRIMARY KEY,
253            content BYTEA NOT NULL
254        );
255
256        CREATE TABLE IF NOT EXISTS lash_sessions (
257            session_id TEXT PRIMARY KEY,
258            head_revision BIGINT NOT NULL DEFAULT 0,
259            head_json TEXT NOT NULL,
260            checkpoint_ref TEXT
261        );
262
263        CREATE TABLE IF NOT EXISTS lash_graph_nodes (
264            session_id TEXT NOT NULL,
265            seq BIGSERIAL,
266            node_id TEXT NOT NULL,
267            node_json TEXT NOT NULL,
268            tombstoned BOOLEAN NOT NULL DEFAULT FALSE,
269            PRIMARY KEY (session_id, node_id)
270        );
271        CREATE INDEX IF NOT EXISTS idx_lash_graph_nodes_seq
272            ON lash_graph_nodes(session_id, seq);
273
274        CREATE TABLE IF NOT EXISTS lash_usage_deltas (
275            seq BIGSERIAL PRIMARY KEY,
276            session_id TEXT NOT NULL,
277            entry_json TEXT NOT NULL
278        );
279
280        CREATE TABLE IF NOT EXISTS lash_session_meta (
281            session_id TEXT PRIMARY KEY,
282            meta_json TEXT NOT NULL
283        );
284
285        CREATE TABLE IF NOT EXISTS lash_runtime_turn_commits (
286            session_id TEXT NOT NULL,
287            turn_id TEXT NOT NULL,
288            turn_commit_hash TEXT NOT NULL,
289            result_json TEXT NOT NULL,
290            committed_at_ms BIGINT NOT NULL,
291            PRIMARY KEY (session_id, turn_id)
292        );
293
294        CREATE TABLE IF NOT EXISTS lash_queued_work_batches (
295            enqueue_seq BIGSERIAL PRIMARY KEY,
296            batch_id TEXT NOT NULL UNIQUE,
297            session_id TEXT NOT NULL,
298            source_key TEXT,
299            delivery_policy TEXT NOT NULL,
300            slot_policy TEXT NOT NULL,
301            merge_key_json TEXT NOT NULL,
302            available_at_ms BIGINT NOT NULL,
303            enqueued_at_ms BIGINT NOT NULL,
304            claim_id TEXT,
305            claim_owner_id TEXT,
306            claim_token TEXT,
307            claim_fencing_token BIGINT NOT NULL DEFAULT 0,
308            claim_claimed_at_ms BIGINT NOT NULL DEFAULT 0,
309            claim_expires_at_ms BIGINT NOT NULL DEFAULT 0,
310            UNIQUE (session_id, source_key)
311        );
312        CREATE INDEX IF NOT EXISTS idx_lash_queued_work_ready
313            ON lash_queued_work_batches(session_id, available_at_ms, enqueue_seq);
314
315        CREATE TABLE IF NOT EXISTS lash_queued_work_items (
316            batch_id TEXT NOT NULL REFERENCES lash_queued_work_batches(batch_id) ON DELETE CASCADE,
317            item_index INTEGER NOT NULL,
318            item_id TEXT NOT NULL,
319            payload_json TEXT NOT NULL,
320            PRIMARY KEY (batch_id, item_index)
321        );
322
323        CREATE TABLE IF NOT EXISTS lash_attachment_manifest (
324            attachment_id TEXT PRIMARY KEY,
325            session_id TEXT NOT NULL,
326            canonical_uri TEXT NOT NULL,
327            intent_at_ms BIGINT NOT NULL,
328            committed_at_ms BIGINT
329        );
330        CREATE INDEX IF NOT EXISTS idx_lash_attachment_manifest_uncommitted
331            ON lash_attachment_manifest(committed_at_ms)
332            WHERE committed_at_ms IS NULL;
333
334        CREATE TABLE IF NOT EXISTS lash_processes (
335            process_id TEXT PRIMARY KEY,
336            registration_hash TEXT NOT NULL,
337            owner_scope_id TEXT NOT NULL,
338            host_profile_id TEXT NOT NULL,
339            created_at_ms BIGINT NOT NULL,
340            updated_at_ms BIGINT NOT NULL,
341            status TEXT NOT NULL,
342            record_json TEXT NOT NULL
343        );
344        CREATE INDEX IF NOT EXISTS idx_lash_processes_status
345            ON lash_processes(status);
346
347        CREATE TABLE IF NOT EXISTS lash_process_events (
348            process_id TEXT NOT NULL REFERENCES lash_processes(process_id) ON DELETE CASCADE,
349            sequence BIGINT NOT NULL,
350            event_type TEXT NOT NULL,
351            payload_hash TEXT NOT NULL,
352            idempotency_key TEXT,
353            occurred_at_ms BIGINT NOT NULL,
354            event_json TEXT NOT NULL,
355            PRIMARY KEY (process_id, sequence)
356        );
357        CREATE UNIQUE INDEX IF NOT EXISTS idx_lash_process_events_key
358            ON lash_process_events(process_id, idempotency_key)
359            WHERE idempotency_key IS NOT NULL;
360
361        CREATE TABLE IF NOT EXISTS lash_process_wake_acks (
362            process_id TEXT NOT NULL REFERENCES lash_processes(process_id) ON DELETE CASCADE,
363            sequence BIGINT NOT NULL,
364            PRIMARY KEY (process_id, sequence)
365        );
366
367        CREATE TABLE IF NOT EXISTS lash_process_handle_grants (
368            session_id TEXT NOT NULL,
369            scope_id TEXT NOT NULL,
370            process_id TEXT NOT NULL REFERENCES lash_processes(process_id) ON DELETE CASCADE,
371            descriptor_json TEXT NOT NULL,
372            PRIMARY KEY (scope_id, process_id)
373        );
374        CREATE INDEX IF NOT EXISTS idx_lash_process_handle_grants_session
375            ON lash_process_handle_grants(session_id);
376        CREATE INDEX IF NOT EXISTS idx_lash_process_handle_grants_process
377            ON lash_process_handle_grants(process_id);
378
379        CREATE TABLE IF NOT EXISTS lash_process_leases (
380            process_id TEXT PRIMARY KEY REFERENCES lash_processes(process_id) ON DELETE CASCADE,
381            lease_owner_id TEXT,
382            lease_token TEXT,
383            lease_fencing_token BIGINT NOT NULL DEFAULT 0,
384            lease_claimed_at_ms BIGINT NOT NULL DEFAULT 0,
385            lease_expires_at_ms BIGINT NOT NULL DEFAULT 0
386        );
387
388        CREATE SEQUENCE IF NOT EXISTS lash_host_event_subscription_seq;
389        CREATE TABLE IF NOT EXISTS lash_host_event_trigger_subscriptions (
390            subscription_id TEXT PRIMARY KEY,
391            session_id TEXT NOT NULL,
392            handle TEXT NOT NULL,
393            source_type TEXT NOT NULL,
394            source_key TEXT NOT NULL,
395            enabled BOOLEAN NOT NULL,
396            created_at_ms BIGINT NOT NULL,
397            updated_at_ms BIGINT NOT NULL,
398            record_json TEXT NOT NULL,
399            UNIQUE(session_id, handle)
400        );
401        CREATE INDEX IF NOT EXISTS idx_lash_host_event_subscriptions_source
402            ON lash_host_event_trigger_subscriptions(source_type, source_key, enabled);
403
404        CREATE TABLE IF NOT EXISTS lash_host_event_occurrences (
405            occurrence_id TEXT PRIMARY KEY,
406            idempotency_key TEXT NOT NULL UNIQUE,
407            request_hash TEXT NOT NULL,
408            source_type TEXT NOT NULL,
409            source_key TEXT NOT NULL,
410            occurred_at_ms BIGINT NOT NULL,
411            record_json TEXT NOT NULL
412        );
413
414        CREATE TABLE IF NOT EXISTS lash_host_event_deliveries (
415            occurrence_id TEXT NOT NULL REFERENCES lash_host_event_occurrences(occurrence_id) ON DELETE CASCADE,
416            subscription_id TEXT NOT NULL REFERENCES lash_host_event_trigger_subscriptions(subscription_id) ON DELETE CASCADE,
417            process_id TEXT NOT NULL,
418            created_at_ms BIGINT NOT NULL,
419            PRIMARY KEY (occurrence_id, subscription_id)
420        );
421
422        CREATE TABLE IF NOT EXISTS lash_lashlang_artifacts (
423            module_ref TEXT PRIMARY KEY,
424            artifact_bytes BYTEA NOT NULL
425        );
426        "#,
427    )
428    .await
429    .map_err(store_sqlx_error)?;
430
431    let existing: Option<i32> =
432        sqlx::query_scalar("SELECT version FROM lash_schema_versions WHERE component = $1")
433            .bind(SCHEMA_COMPONENT)
434            .fetch_optional(&mut *tx)
435            .await
436            .map_err(store_sqlx_error)?;
437    match existing {
438        Some(version) if version == SCHEMA_VERSION => {}
439        Some(version) => {
440            return Err(StoreError::Backend(format!(
441                "Postgres schema component `{SCHEMA_COMPONENT}` has version {version}, expected {SCHEMA_VERSION}"
442            )));
443        }
444        None => {
445            sqlx::query("INSERT INTO lash_schema_versions (component, version) VALUES ($1, $2)")
446                .bind(SCHEMA_COMPONENT)
447                .bind(SCHEMA_VERSION)
448                .execute(&mut *tx)
449                .await
450                .map_err(store_sqlx_error)?;
451        }
452    }
453    tx.commit().await.map_err(store_sqlx_error)
454}
455
456fn current_epoch_ms() -> u64 {
457    SystemTime::now()
458        .duration_since(UNIX_EPOCH)
459        .unwrap_or_default()
460        .as_millis() as u64
461}
462
463fn current_timestamp_string() -> String {
464    let now = SystemTime::now()
465        .duration_since(UNIX_EPOCH)
466        .unwrap_or_default();
467    format!("unix:{}", now.as_secs())
468}
469
470fn store_sqlx_error(err: sqlx::Error) -> StoreError {
471    StoreError::Backend(err.to_string())
472}
473
474/// Postgres SQLSTATEs that signal transient write contention rather than a hard
475/// failure: serialization failure, deadlock, and lock-acquisition timeout. On the
476/// session head these all mean "a concurrent committer got there first" — i.e. a
477/// revision conflict the caller should reload-and-retry, not a backend error.
478fn is_contention_error(err: &sqlx::Error) -> bool {
479    matches!(
480        err.as_database_error().and_then(|db| db.code()).as_deref(),
481        Some("40001" | "40P01" | "55P03")
482    )
483}
484
485fn plugin_sqlx_error(err: sqlx::Error) -> PluginError {
486    PluginError::Session(err.to_string())
487}
488
489fn process_decode_error(err: serde_json::Error) -> PluginError {
490    PluginError::Session(format!("failed to decode process registry row: {err}"))
491}
492
493fn store_decode_json<T: serde::de::DeserializeOwned>(
494    json: &str,
495    what: &str,
496) -> Result<T, StoreError> {
497    serde_json::from_str(json)
498        .map_err(|err| StoreError::Backend(format!("failed to decode {what}: {err}")))
499}
500
501fn encode_json<T: serde::Serialize>(value: &T) -> String {
502    serde_json::to_string(value).expect("persisted state should serialize")
503}
504
505fn encode_msgpack<T: serde::Serialize>(value: &T) -> Vec<u8> {
506    let mut buf = Vec::with_capacity(1024);
507    rmp_serde::encode::write_named(&mut buf, value).expect("value should serialize");
508    buf
509}
510
511fn decode_msgpack<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Option<T> {
512    rmp_serde::from_slice(bytes).ok()
513}
514
515fn block_on_detached<T: Send + 'static>(
516    future: impl std::future::Future<Output = T> + Send + 'static,
517) -> T {
518    std::thread::spawn(move || {
519        tokio::runtime::Builder::new_current_thread()
520            .enable_all()
521            .build()
522            .expect("postgres manifest runtime")
523            .block_on(future)
524    })
525    .join()
526    .expect("postgres manifest thread")
527}
528
529fn merge_token_ledger_entries(entries: Vec<TokenLedgerEntry>) -> Vec<TokenLedgerEntry> {
530    let mut merged = Vec::<TokenLedgerEntry>::new();
531    for entry in entries {
532        if entry.usage.total() == 0 {
533            continue;
534        }
535        if let Some(existing) = merged
536            .iter_mut()
537            .find(|existing| existing.source == entry.source && existing.model == entry.model)
538        {
539            existing.usage.input_tokens += entry.usage.input_tokens;
540            existing.usage.output_tokens += entry.usage.output_tokens;
541            existing.usage.cached_input_tokens += entry.usage.cached_input_tokens;
542            existing.usage.reasoning_tokens += entry.usage.reasoning_tokens;
543        } else {
544            merged.push(entry);
545        }
546    }
547    merged
548}
549
550#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
551struct SessionCheckpointEnvelope {
552    manifest: SessionCheckpoint,
553    tool_state: Option<lash_core::ToolState>,
554    plugin_snapshot: Option<lash_core::PluginSessionSnapshot>,
555    execution_state: Option<Vec<u8>>,
556}
557
558async fn put_blob_tx(
559    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
560    content: &[u8],
561) -> Result<BlobRef, StoreError> {
562    let hash = format!("{:x}", Sha256::digest(content));
563    sqlx::query(
564        "INSERT INTO lash_blobs (hash, content)
565         VALUES ($1, $2)
566         ON CONFLICT (hash) DO NOTHING",
567    )
568    .bind(&hash)
569    .bind(content)
570    .execute(&mut **tx)
571    .await
572    .map_err(store_sqlx_error)?;
573    Ok(BlobRef(hash))
574}
575
576async fn get_blob(pool: &PgPool, blob_ref: &BlobRef) -> Option<Vec<u8>> {
577    sqlx::query_scalar::<_, Vec<u8>>("SELECT content FROM lash_blobs WHERE hash = $1")
578        .bind(blob_ref.as_str())
579        .fetch_optional(pool)
580        .await
581        .ok()
582        .flatten()
583}
584
585async fn put_checkpoint_tx(
586    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
587    checkpoint: &HydratedSessionCheckpoint,
588) -> Result<(BlobRef, SessionCheckpoint), StoreError> {
589    let manifest = SessionCheckpoint {
590        turn_state: checkpoint.turn_state.clone(),
591        tool_state_ref: checkpoint.tool_state_ref.clone(),
592        plugin_snapshot_ref: checkpoint.plugin_snapshot_ref.clone(),
593        plugin_snapshot_revision: checkpoint.plugin_snapshot_revision,
594        execution_state_ref: checkpoint.execution_state_ref.clone(),
595    };
596    let envelope = SessionCheckpointEnvelope {
597        manifest: manifest.clone(),
598        tool_state: checkpoint.tool_state.clone(),
599        plugin_snapshot: checkpoint.plugin_snapshot.clone(),
600        execution_state: checkpoint.execution_state.clone(),
601    };
602    let bytes = encode_msgpack(&envelope);
603    let checkpoint_ref = put_blob_tx(tx, &bytes).await?;
604    Ok((checkpoint_ref, manifest))
605}
606
607async fn get_checkpoint(pool: &PgPool, blob_ref: &BlobRef) -> Option<HydratedSessionCheckpoint> {
608    let bytes = get_blob(pool, blob_ref).await?;
609    let envelope: SessionCheckpointEnvelope = decode_msgpack(&bytes)?;
610    Some(HydratedSessionCheckpoint {
611        turn_state: envelope.manifest.turn_state,
612        tool_state_ref: envelope.manifest.tool_state_ref,
613        tool_state: envelope.tool_state,
614        plugin_snapshot_ref: envelope.manifest.plugin_snapshot_ref,
615        plugin_snapshot: envelope.plugin_snapshot,
616        plugin_snapshot_revision: envelope.manifest.plugin_snapshot_revision,
617        execution_state_ref: envelope.manifest.execution_state_ref,
618        execution_state: envelope.execution_state,
619    })
620}
621
622async fn load_session_head_meta_tx(
623    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
624    session_id: &str,
625    for_update: bool,
626) -> Result<Option<SessionHeadMeta>, StoreError> {
627    let sql = if for_update {
628        "SELECT head_json, head_revision FROM lash_sessions WHERE session_id = $1 FOR UPDATE"
629    } else {
630        "SELECT head_json, head_revision FROM lash_sessions WHERE session_id = $1"
631    };
632    let row = sqlx::query(sql)
633        .bind(session_id)
634        .fetch_optional(&mut **tx)
635        .await
636        .map_err(store_sqlx_error)?;
637    let Some(row) = row else {
638        return Ok(None);
639    };
640    let head_json: String = row.get(0);
641    let head_revision: i64 = row.get(1);
642    let mut meta: SessionHeadMeta = store_decode_json(&head_json, "session head")?;
643    meta.head_revision = head_revision as u64;
644    Ok(Some(meta))
645}
646
647async fn load_usage_deltas(pool: &PgPool, session_id: &str) -> Vec<TokenLedgerEntry> {
648    let rows = sqlx::query(
649        "SELECT entry_json FROM lash_usage_deltas WHERE session_id = $1 ORDER BY seq ASC",
650    )
651    .bind(session_id)
652    .fetch_all(pool)
653    .await
654    .unwrap_or_default();
655    rows.into_iter()
656        .filter_map(|row| {
657            let json: String = row.get(0);
658            serde_json::from_str(&json).ok()
659        })
660        .collect()
661}
662
663async fn load_graph(
664    pool: &PgPool,
665    session_id: &str,
666    leaf_node_id: Option<String>,
667    active_path: bool,
668) -> Result<lash_core::SessionGraph, StoreError> {
669    let rows = sqlx::query(
670        "SELECT node_json FROM lash_graph_nodes
671         WHERE session_id = $1 AND tombstoned = FALSE
672         ORDER BY seq ASC",
673    )
674    .bind(session_id)
675    .fetch_all(pool)
676    .await
677    .map_err(store_sqlx_error)?;
678    let mut nodes = Vec::<SessionNodeRecord>::new();
679    for row in rows {
680        let json: String = row.get(0);
681        nodes.push(store_decode_json(&json, "session graph node")?);
682    }
683    if active_path {
684        if let Some(leaf) = leaf_node_id.clone() {
685            let wanted = active_path_node_ids(&nodes, &leaf);
686            nodes.retain(|node| wanted.contains(&node.node_id));
687        }
688    }
689    Ok(lash_core::SessionGraph::from_nodes(nodes, leaf_node_id))
690}
691
692fn active_path_node_ids(nodes: &[SessionNodeRecord], leaf_node_id: &str) -> HashSet<String> {
693    let mut parent_by_id = std::collections::BTreeMap::new();
694    for node in nodes {
695        parent_by_id.insert(node.node_id.clone(), node.parent_node_id.clone());
696    }
697    let mut wanted = HashSet::new();
698    let mut cursor = Some(leaf_node_id.to_string());
699    while let Some(node_id) = cursor {
700        if !wanted.insert(node_id.clone()) {
701            break;
702        }
703        cursor = parent_by_id.get(&node_id).cloned().flatten();
704    }
705    wanted
706}
707
708async fn commit_attachment_refs_tx(
709    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
710    session_id: &str,
711    attachment_ids: &[AttachmentId],
712) -> Result<(), StoreError> {
713    if attachment_ids.is_empty() {
714        return Ok(());
715    }
716    let now = current_epoch_ms() as i64;
717    for id in attachment_ids {
718        sqlx::query(
719            "UPDATE lash_attachment_manifest
720             SET committed_at_ms = COALESCE(committed_at_ms, $1)
721             WHERE attachment_id = $2 AND session_id = $3",
722        )
723        .bind(now)
724        .bind(id.as_str())
725        .bind(session_id)
726        .execute(&mut **tx)
727        .await
728        .map_err(store_sqlx_error)?;
729    }
730    Ok(())
731}
732
733impl AttachmentManifest for PostgresSessionStore {
734    fn record_intent(&self, intent: AttachmentIntent) -> Result<(), StoreError> {
735        let pool = self.pool.clone();
736        block_on_detached(async move {
737            sqlx::query(
738                "INSERT INTO lash_attachment_manifest (
739                    attachment_id, session_id, canonical_uri, intent_at_ms, committed_at_ms
740                 )
741                 VALUES ($1, $2, $3, $4, NULL)
742                 ON CONFLICT (attachment_id) DO UPDATE SET
743                    session_id = EXCLUDED.session_id,
744                    canonical_uri = EXCLUDED.canonical_uri,
745                    intent_at_ms = EXCLUDED.intent_at_ms",
746            )
747            .bind(intent.attachment_id.as_str())
748            .bind(intent.session_id)
749            .bind(intent.canonical_uri)
750            .bind(intent.intent_at_epoch_ms as i64)
751            .execute(&pool)
752            .await
753            .map(|_| ())
754            .map_err(store_sqlx_error)
755        })
756    }
757
758    fn commit_refs(
759        &self,
760        session_id: &str,
761        attachment_ids: &[AttachmentId],
762    ) -> Result<(), StoreError> {
763        let pool = self.pool.clone();
764        let session_id = session_id.to_string();
765        let attachment_ids = attachment_ids.to_vec();
766        block_on_detached(async move {
767            let mut tx = pool.begin().await.map_err(store_sqlx_error)?;
768            commit_attachment_refs_tx(&mut tx, &session_id, &attachment_ids).await?;
769            tx.commit().await.map_err(store_sqlx_error)
770        })
771    }
772
773    fn list_uncommitted(
774        &self,
775        older_than_epoch_ms: u64,
776    ) -> Result<Vec<AttachmentManifestEntry>, StoreError> {
777        let pool = self.pool.clone();
778        block_on_detached(async move {
779            let rows = sqlx::query(
780                "SELECT attachment_id, session_id, canonical_uri, intent_at_ms, committed_at_ms
781                 FROM lash_attachment_manifest
782                 WHERE committed_at_ms IS NULL AND intent_at_ms <= $1
783                 ORDER BY attachment_id ASC",
784            )
785            .bind(older_than_epoch_ms as i64)
786            .fetch_all(&pool)
787            .await
788            .map_err(store_sqlx_error)?;
789            Ok(rows
790                .into_iter()
791                .map(|row| AttachmentManifestEntry {
792                    attachment_id: AttachmentId::new(row.get::<String, _>(0)),
793                    session_id: row.get(1),
794                    canonical_uri: row.get(2),
795                    intent_at_epoch_ms: row.get::<i64, _>(3) as u64,
796                    committed_at_epoch_ms: row.get::<Option<i64>, _>(4).map(|value| value as u64),
797                })
798                .collect())
799        })
800    }
801
802    fn forget(&self, attachment_id: &AttachmentId) -> Result<(), StoreError> {
803        let pool = self.pool.clone();
804        let attachment_id = attachment_id.to_string();
805        block_on_detached(async move {
806            sqlx::query("DELETE FROM lash_attachment_manifest WHERE attachment_id = $1")
807                .bind(attachment_id)
808                .execute(&pool)
809                .await
810                .map(|_| ())
811                .map_err(store_sqlx_error)
812        })
813    }
814}
815
816#[async_trait::async_trait]
817impl SessionStoreFactory for PostgresSessionStoreFactory {
818    fn durability_tier(&self) -> DurabilityTier {
819        DurabilityTier::Durable
820    }
821
822    async fn create_store(
823        &self,
824        request: &SessionStoreCreateRequest,
825    ) -> Result<Arc<dyn RuntimePersistence>, String> {
826        let store = PostgresSessionStore {
827            pool: self.pool.clone(),
828            session_id: Some(request.session_id.clone()),
829            bound_session: Arc::new(OnceLock::new()),
830        };
831        if store
832            .load_session_meta()
833            .await
834            .map_err(|err| err.to_string())?
835            .is_none()
836        {
837            store
838                .save_session_meta(SessionMeta {
839                    session_id: request.session_id.clone(),
840                    session_name: request.session_id.clone(),
841                    created_at: current_timestamp_string(),
842                    model: request.policy.model.id.clone(),
843                    cwd: std::env::current_dir()
844                        .ok()
845                        .and_then(|path| path.to_str().map(str::to_string)),
846                    relation: request.relation.clone(),
847                })
848                .await
849                .map_err(|err| err.to_string())?;
850        }
851        Ok(Arc::new(store))
852    }
853
854    async fn delete_session(&self, session_id: &str) -> Result<(), String> {
855        let mut tx = self.pool.begin().await.map_err(|err| err.to_string())?;
856        for sql in [
857            "DELETE FROM lash_queued_work_items WHERE batch_id IN (SELECT batch_id FROM lash_queued_work_batches WHERE session_id = $1)",
858            "DELETE FROM lash_queued_work_batches WHERE session_id = $1",
859            "DELETE FROM lash_usage_deltas WHERE session_id = $1",
860            "DELETE FROM lash_graph_nodes WHERE session_id = $1",
861            "DELETE FROM lash_runtime_turn_commits WHERE session_id = $1",
862            "DELETE FROM lash_session_meta WHERE session_id = $1",
863            "DELETE FROM lash_sessions WHERE session_id = $1",
864            "DELETE FROM lash_attachment_manifest WHERE session_id = $1",
865        ] {
866            sqlx::query(sql)
867                .bind(session_id)
868                .execute(&mut *tx)
869                .await
870                .map_err(|err| err.to_string())?;
871        }
872        tx.commit().await.map_err(|err| err.to_string())
873    }
874}
875
876#[derive(Clone, Debug)]
877struct QueuedBatchRow {
878    enqueue_seq: u64,
879    batch_id: String,
880    session_id: String,
881    source_key: Option<String>,
882    delivery_policy: DeliveryPolicy,
883    slot_policy: SlotPolicy,
884    merge_key: MergeKey,
885    available_at_ms: u64,
886    enqueued_at_ms: u64,
887    claim_fencing_token: u64,
888}
889
890fn queued_batch_row(row: PgRow) -> Result<QueuedBatchRow, StoreError> {
891    let delivery_policy =
892        DeliveryPolicy::from_wire_str(row.get::<String, _>("delivery_policy").as_str())
893            .ok_or_else(|| {
894                StoreError::Backend("invalid queued work delivery policy".to_string())
895            })?;
896    let slot_policy = SlotPolicy::from_wire_str(row.get::<String, _>("slot_policy").as_str())
897        .ok_or_else(|| StoreError::Backend("invalid queued work slot policy".to_string()))?;
898    let merge_json: String = row.get("merge_key_json");
899    Ok(QueuedBatchRow {
900        enqueue_seq: row.get::<i64, _>("enqueue_seq") as u64,
901        batch_id: row.get("batch_id"),
902        session_id: row.get("session_id"),
903        source_key: row.get("source_key"),
904        delivery_policy,
905        slot_policy,
906        merge_key: store_decode_json(&merge_json, "queued work merge key")?,
907        available_at_ms: row.get::<i64, _>("available_at_ms") as u64,
908        enqueued_at_ms: row.get::<i64, _>("enqueued_at_ms") as u64,
909        claim_fencing_token: row.get::<i64, _>("claim_fencing_token") as u64,
910    })
911}
912
913async fn load_queued_batch(
914    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
915    batch_id: &str,
916) -> Result<Option<QueuedWorkBatch>, StoreError> {
917    let row = sqlx::query(
918        "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
919                slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
920                claim_fencing_token
921         FROM lash_queued_work_batches
922         WHERE batch_id = $1",
923    )
924    .bind(batch_id)
925    .fetch_optional(&mut **tx)
926    .await
927    .map_err(store_sqlx_error)?;
928    let Some(row) = row else {
929        return Ok(None);
930    };
931    let row = queued_batch_row(row)?;
932    queued_work_batch_from_row(tx, row).await.map(Some)
933}
934
935async fn queued_work_batch_from_row(
936    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
937    row: QueuedBatchRow,
938) -> Result<QueuedWorkBatch, StoreError> {
939    let item_rows = sqlx::query(
940        "SELECT item_id, payload_json
941         FROM lash_queued_work_items
942         WHERE batch_id = $1
943         ORDER BY item_index ASC",
944    )
945    .bind(&row.batch_id)
946    .fetch_all(&mut **tx)
947    .await
948    .map_err(store_sqlx_error)?;
949    let mut items = Vec::new();
950    for item in item_rows {
951        let payload_json: String = item.get(1);
952        items.push(QueuedWorkItem {
953            item_id: item.get(0),
954            payload: store_decode_json(&payload_json, "queued work payload")?,
955        });
956    }
957    Ok(QueuedWorkBatch {
958        batch_id: row.batch_id,
959        session_id: row.session_id,
960        enqueue_seq: row.enqueue_seq,
961        source_key: row.source_key,
962        delivery_policy: row.delivery_policy,
963        slot_policy: row.slot_policy,
964        merge_key: row.merge_key,
965        available_at_ms: row.available_at_ms,
966        enqueued_at_ms: row.enqueued_at_ms,
967        items,
968    })
969}
970
971async fn ensure_queued_work_completion_tx(
972    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
973    completed: &QueuedWorkCompletion,
974) -> Result<(), StoreError> {
975    for batch_id in &completed.batch_ids {
976        let exists: Option<i64> = sqlx::query_scalar(
977            "SELECT 1::BIGINT FROM lash_queued_work_batches
978             WHERE session_id = $1
979               AND batch_id = $2
980               AND claim_id = $3
981               AND claim_token = $4
982             LIMIT 1",
983        )
984        .bind(&completed.session_id)
985        .bind(batch_id)
986        .bind(&completed.claim_id)
987        .bind(&completed.lease_token)
988        .fetch_optional(&mut **tx)
989        .await
990        .map_err(store_sqlx_error)?;
991        if exists.is_none() {
992            return Err(StoreError::QueuedWorkClaimExpired {
993                session_id: completed.session_id.clone(),
994                claim_id: completed.claim_id.clone(),
995            });
996        }
997    }
998    Ok(())
999}
1000
1001#[async_trait::async_trait]
1002impl RuntimePersistence for PostgresSessionStore {
1003    fn durability_tier(&self) -> DurabilityTier {
1004        DurabilityTier::Durable
1005    }
1006
1007    async fn load_session(
1008        &self,
1009        scope: SessionReadScope,
1010    ) -> Result<Option<PersistedSessionRead>, StoreError> {
1011        let Some(session_id) = self.selected_session_id().await? else {
1012            return Ok(None);
1013        };
1014        let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1015        let Some(meta) = load_session_head_meta_tx(&mut tx, &session_id, false).await? else {
1016            return Ok(None);
1017        };
1018        tx.commit().await.map_err(store_sqlx_error)?;
1019        let leaf_node_id = match &scope {
1020            SessionReadScope::FullGraph => meta.leaf_node_id.clone(),
1021            SessionReadScope::ActivePath { leaf_node_id } => {
1022                leaf_node_id.clone().or_else(|| meta.leaf_node_id.clone())
1023            }
1024        };
1025        let graph = load_graph(
1026            &self.pool,
1027            &session_id,
1028            leaf_node_id.clone(),
1029            matches!(scope, SessionReadScope::ActivePath { .. }),
1030        )
1031        .await?;
1032        let checkpoint = match meta.checkpoint_ref.as_ref() {
1033            Some(blob_ref) => get_checkpoint(&self.pool, blob_ref).await,
1034            None => None,
1035        };
1036        Ok(Some(PersistedSessionRead {
1037            session_id: meta.session_id,
1038            head_revision: meta.head_revision,
1039            config: meta.config,
1040            agent_frames: meta.agent_frames,
1041            current_agent_frame_id: meta.current_agent_frame_id,
1042            graph,
1043            checkpoint_ref: meta.checkpoint_ref,
1044            checkpoint,
1045            token_ledger: merge_token_ledger_entries(
1046                load_usage_deltas(&self.pool, &session_id).await,
1047            ),
1048        }))
1049    }
1050
1051    async fn load_node(&self, node_id: &str) -> Result<Option<SessionNodeRecord>, StoreError> {
1052        let json: Option<String> = if let Some(session_id) = &self.session_id {
1053            sqlx::query_scalar(
1054                "SELECT node_json FROM lash_graph_nodes
1055                 WHERE session_id = $1 AND node_id = $2 AND tombstoned = FALSE",
1056            )
1057            .bind(session_id)
1058            .bind(node_id)
1059            .fetch_optional(&self.pool)
1060            .await
1061            .map_err(store_sqlx_error)?
1062        } else {
1063            sqlx::query_scalar(
1064                "SELECT node_json FROM lash_graph_nodes
1065                 WHERE node_id = $1 AND tombstoned = FALSE
1066                 ORDER BY session_id ASC
1067                 LIMIT 1",
1068            )
1069            .bind(node_id)
1070            .fetch_optional(&self.pool)
1071            .await
1072            .map_err(store_sqlx_error)?
1073        };
1074        json.map(|json| store_decode_json(&json, "session graph node"))
1075            .transpose()
1076    }
1077
1078    async fn commit_runtime_state(
1079        &self,
1080        commit: RuntimeCommit,
1081    ) -> Result<RuntimeCommitResult, StoreError> {
1082        let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1083        // Read the head WITHOUT a lock; the conditional CAS write below is the
1084        // serialization point (optimistic concurrency), so no pessimistic
1085        // `FOR UPDATE` lock is held across the rest of this transaction.
1086        let existing = load_session_head_meta_tx(&mut tx, &commit.session_id, false).await?;
1087        if let Some(bound_session_id) = existing.as_ref().map(|meta| meta.session_id.as_str())
1088            && bound_session_id != commit.session_id
1089        {
1090            return Err(StoreError::SessionBindingMismatch {
1091                bound_session_id: bound_session_id.to_string(),
1092                attempted_session_id: commit.session_id,
1093            });
1094        }
1095        // A session-store handle commits to exactly one session. An explicit
1096        // binding (`self.session_id`) is authoritative; otherwise the handle binds
1097        // to the first session it commits and rejects any other thereafter.
1098        let effective_binding = self
1099            .session_id
1100            .clone()
1101            .or_else(|| self.bound_session.get().cloned());
1102        if let Some(bound_session_id) = &effective_binding
1103            && commit.session_id != *bound_session_id
1104        {
1105            return Err(StoreError::SessionBindingMismatch {
1106                bound_session_id: bound_session_id.clone(),
1107                attempted_session_id: commit.session_id,
1108            });
1109        }
1110        if self.session_id.is_none() {
1111            let _ = self.bound_session.set(commit.session_id.clone());
1112        }
1113        if let Some(completed) = &commit.turn_commit {
1114            if completed.session_id != commit.session_id {
1115                return Err(StoreError::RuntimeTurnCommitConflict {
1116                    session_id: completed.session_id.clone(),
1117                    turn_id: completed.turn_id.clone(),
1118                });
1119            }
1120            let prior = sqlx::query(
1121                "SELECT turn_commit_hash, result_json
1122                 FROM lash_runtime_turn_commits
1123                 WHERE session_id = $1 AND turn_id = $2",
1124            )
1125            .bind(&completed.session_id)
1126            .bind(&completed.turn_id)
1127            .fetch_optional(&mut *tx)
1128            .await
1129            .map_err(store_sqlx_error)?;
1130            if let Some(row) = prior {
1131                let hash: String = row.get(0);
1132                let result_json: String = row.get(1);
1133                if hash == completed.turn_commit_hash {
1134                    return store_decode_json(&result_json, "runtime turn commit result");
1135                }
1136                return Err(StoreError::RuntimeTurnCommitConflict {
1137                    session_id: completed.session_id.clone(),
1138                    turn_id: completed.turn_id.clone(),
1139                });
1140            }
1141        }
1142        let actual_revision = existing.as_ref().map_or(0, |meta| meta.head_revision);
1143        if commit.expected_head_revision.is_some()
1144            && commit.expected_head_revision != Some(actual_revision)
1145        {
1146            return Err(StoreError::HeadRevisionConflict {
1147                expected: commit.expected_head_revision,
1148                actual: actual_revision,
1149            });
1150        }
1151        for completed in &commit.completed_queue_claims {
1152            if completed.session_id != commit.session_id {
1153                return Err(StoreError::QueuedWorkClaimExpired {
1154                    session_id: completed.session_id.clone(),
1155                    claim_id: completed.claim_id.clone(),
1156                });
1157            }
1158            ensure_queued_work_completion_tx(&mut tx, completed).await?;
1159        }
1160        let (checkpoint_ref, manifest) = put_checkpoint_tx(&mut tx, &commit.checkpoint).await?;
1161        for entry in &commit.usage_deltas {
1162            sqlx::query("INSERT INTO lash_usage_deltas (session_id, entry_json) VALUES ($1, $2)")
1163                .bind(&commit.session_id)
1164                .bind(encode_json(entry))
1165                .execute(&mut *tx)
1166                .await
1167                .map_err(store_sqlx_error)?;
1168        }
1169        let leaf_node_id = match &commit.graph {
1170            GraphCommitDelta::Unchanged { leaf_node_id } => leaf_node_id.clone(),
1171            GraphCommitDelta::Append {
1172                nodes,
1173                leaf_node_id,
1174            } => {
1175                for node in nodes {
1176                    sqlx::query(
1177                        "INSERT INTO lash_graph_nodes (session_id, node_id, node_json)
1178                         VALUES ($1, $2, $3)
1179                         ON CONFLICT (session_id, node_id) DO UPDATE SET
1180                            node_json = EXCLUDED.node_json,
1181                            tombstoned = FALSE",
1182                    )
1183                    .bind(&commit.session_id)
1184                    .bind(&node.node_id)
1185                    .bind(encode_json(node))
1186                    .execute(&mut *tx)
1187                    .await
1188                    .map_err(store_sqlx_error)?;
1189                }
1190                leaf_node_id.clone()
1191            }
1192            GraphCommitDelta::ReplaceFull(graph) => {
1193                sqlx::query("DELETE FROM lash_graph_nodes WHERE session_id = $1")
1194                    .bind(&commit.session_id)
1195                    .execute(&mut *tx)
1196                    .await
1197                    .map_err(store_sqlx_error)?;
1198                for node in &graph.nodes {
1199                    sqlx::query(
1200                        "INSERT INTO lash_graph_nodes (session_id, node_id, node_json)
1201                         VALUES ($1, $2, $3)",
1202                    )
1203                    .bind(&commit.session_id)
1204                    .bind(&node.node_id)
1205                    .bind(encode_json(node))
1206                    .execute(&mut *tx)
1207                    .await
1208                    .map_err(store_sqlx_error)?;
1209                }
1210                graph.leaf_node_id.clone()
1211            }
1212        };
1213        let graph_node_count: i64 = sqlx::query_scalar(
1214            "SELECT COUNT(*) FROM lash_graph_nodes WHERE session_id = $1 AND tombstoned = FALSE",
1215        )
1216        .bind(&commit.session_id)
1217        .fetch_one(&mut *tx)
1218        .await
1219        .map_err(store_sqlx_error)?;
1220        let next_revision = actual_revision + 1;
1221        let meta = SessionHeadMeta {
1222            session_id: commit.session_id.clone(),
1223            head_revision: next_revision,
1224            config: commit.config.clone(),
1225            agent_frames: commit.agent_frames.clone(),
1226            current_agent_frame_id: commit.current_agent_frame_id.clone(),
1227            checkpoint_ref: Some(checkpoint_ref.clone()),
1228            leaf_node_id,
1229            graph_node_count: graph_node_count as usize,
1230            token_ledger: Vec::new(),
1231        };
1232        // Optimistic CAS on the head revision. The `WHERE head_revision = $5`
1233        // guard makes the write succeed only if no concurrent committer moved the
1234        // head since our unlocked read above. A brand-new session inserts (no
1235        // conflict); an existing one updates only when the revision still matches.
1236        let head_write = sqlx::query(
1237            "INSERT INTO lash_sessions (session_id, head_revision, head_json, checkpoint_ref)
1238             VALUES ($1, $2, $3, $4)
1239             ON CONFLICT (session_id) DO UPDATE SET
1240                head_revision = EXCLUDED.head_revision,
1241                head_json = EXCLUDED.head_json,
1242                checkpoint_ref = EXCLUDED.checkpoint_ref
1243             WHERE lash_sessions.head_revision = $5",
1244        )
1245        .bind(&commit.session_id)
1246        .bind(next_revision as i64)
1247        .bind(encode_json(&meta))
1248        .bind(checkpoint_ref.as_str())
1249        .bind(actual_revision as i64)
1250        .execute(&mut *tx)
1251        .await;
1252        let head_write = match head_write {
1253            Ok(result) => result,
1254            Err(err) if is_contention_error(&err) => {
1255                // The head row is contended by a concurrent committer (lock
1256                // timeout / serialization failure / deadlock). That is a conflict,
1257                // not an opaque backend error: surface it so the caller reloads
1258                // and retries. The tx is now aborted; returning drops it.
1259                return Err(StoreError::HeadRevisionConflict {
1260                    expected: commit.expected_head_revision.or(Some(actual_revision)),
1261                    actual: actual_revision,
1262                });
1263            }
1264            Err(err) => return Err(store_sqlx_error(err)),
1265        };
1266        if head_write.rows_affected() == 0 {
1267            // A concurrent commit won the race: the head no longer matches the
1268            // revision we read. Re-read the now-current revision for an accurate
1269            // report, then drop `tx` (auto-rollback), discarding this attempt's
1270            // node/usage writes; the caller reloads and retries.
1271            let actual_now = sqlx::query_scalar::<_, i64>(
1272                "SELECT head_revision FROM lash_sessions WHERE session_id = $1",
1273            )
1274            .bind(&commit.session_id)
1275            .fetch_optional(&mut *tx)
1276            .await
1277            .map_err(store_sqlx_error)?
1278            .map_or(actual_revision, |revision| revision as u64);
1279            return Err(StoreError::HeadRevisionConflict {
1280                expected: commit.expected_head_revision.or(Some(actual_revision)),
1281                actual: actual_now,
1282            });
1283        }
1284        for completed in &commit.completed_queue_claims {
1285            for batch_id in &completed.batch_ids {
1286                sqlx::query(
1287                    "DELETE FROM lash_queued_work_batches
1288                     WHERE session_id = $1 AND batch_id = $2 AND claim_id = $3 AND claim_token = $4",
1289                )
1290                .bind(&completed.session_id)
1291                .bind(batch_id)
1292                .bind(&completed.claim_id)
1293                .bind(&completed.lease_token)
1294                .execute(&mut *tx)
1295                .await
1296                .map_err(store_sqlx_error)?;
1297            }
1298        }
1299        commit_attachment_refs_tx(
1300            &mut tx,
1301            &commit.session_id,
1302            &commit.committed_attachment_ids,
1303        )
1304        .await?;
1305        let result = RuntimeCommitResult {
1306            head_revision: next_revision,
1307            checkpoint_ref,
1308            manifest,
1309        };
1310        if let Some(completed) = &commit.turn_commit {
1311            sqlx::query(
1312                "INSERT INTO lash_runtime_turn_commits (
1313                    session_id, turn_id, turn_commit_hash, result_json, committed_at_ms
1314                 )
1315                 VALUES ($1, $2, $3, $4, $5)",
1316            )
1317            .bind(&completed.session_id)
1318            .bind(&completed.turn_id)
1319            .bind(&completed.turn_commit_hash)
1320            .bind(encode_json(&result))
1321            .bind(current_epoch_ms() as i64)
1322            .execute(&mut *tx)
1323            .await
1324            .map_err(store_sqlx_error)?;
1325        }
1326        tx.commit().await.map_err(store_sqlx_error)?;
1327        Ok(result)
1328    }
1329
1330    async fn enqueue_queued_work(
1331        &self,
1332        batch: QueuedWorkBatchDraft,
1333    ) -> Result<QueuedWorkBatch, StoreError> {
1334        let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1335        if let Some(source_key) = batch.source_key.as_deref() {
1336            let existing_id: Option<String> = sqlx::query_scalar(
1337                "SELECT batch_id FROM lash_queued_work_batches
1338                 WHERE session_id = $1 AND source_key = $2",
1339            )
1340            .bind(&batch.session_id)
1341            .bind(source_key)
1342            .fetch_optional(&mut *tx)
1343            .await
1344            .map_err(store_sqlx_error)?;
1345            if let Some(batch_id) = existing_id {
1346                let existing = load_queued_batch(&mut tx, &batch_id)
1347                    .await?
1348                    .ok_or_else(|| {
1349                        StoreError::Backend("queued work source row disappeared".to_string())
1350                    })?;
1351                tx.commit().await.map_err(store_sqlx_error)?;
1352                return Ok(existing);
1353            }
1354        }
1355        let now = current_epoch_ms();
1356        let batch_id = format!(
1357            "qwb:{:x}",
1358            Sha256::digest(format!("{}:{:?}:{now}", batch.session_id, batch.source_key).as_bytes())
1359        );
1360        let row = sqlx::query_scalar::<_, i64>(
1361            "INSERT INTO lash_queued_work_batches (
1362                batch_id, session_id, source_key, delivery_policy, slot_policy,
1363                merge_key_json, available_at_ms, enqueued_at_ms
1364             )
1365             VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
1366             RETURNING enqueue_seq",
1367        )
1368        .bind(&batch_id)
1369        .bind(&batch.session_id)
1370        .bind(&batch.source_key)
1371        .bind(batch.delivery_policy.as_str())
1372        .bind(batch.slot_policy.as_str())
1373        .bind(encode_json(&batch.merge_key))
1374        .bind(batch.available_at_ms as i64)
1375        .bind(now as i64)
1376        .fetch_one(&mut *tx)
1377        .await
1378        .map_err(store_sqlx_error)?;
1379        for (index, payload) in batch.payloads.iter().enumerate() {
1380            let item_id = format!("{batch_id}:item:{index}");
1381            sqlx::query(
1382                "INSERT INTO lash_queued_work_items (batch_id, item_index, item_id, payload_json)
1383                 VALUES ($1, $2, $3, $4)",
1384            )
1385            .bind(&batch_id)
1386            .bind(index as i32)
1387            .bind(item_id)
1388            .bind(encode_json(payload))
1389            .execute(&mut *tx)
1390            .await
1391            .map_err(store_sqlx_error)?;
1392        }
1393        let queued = load_queued_batch(&mut tx, &batch_id)
1394            .await?
1395            .ok_or_else(|| StoreError::Backend("queued work insert disappeared".to_string()))?;
1396        debug_assert_eq!(queued.enqueue_seq, row as u64);
1397        tx.commit().await.map_err(store_sqlx_error)?;
1398        Ok(queued)
1399    }
1400
1401    async fn claim_ready_queued_work(
1402        &self,
1403        session_id: &str,
1404        owner_id: &str,
1405        boundary: QueuedWorkClaimBoundary,
1406        lease_ttl_ms: u64,
1407        max_batches: usize,
1408    ) -> Result<Option<QueuedWorkClaim>, StoreError> {
1409        if max_batches == 0 {
1410            return Ok(None);
1411        }
1412        let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1413        let now = current_epoch_ms();
1414        let rows = sqlx::query(
1415            "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1416                    slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1417                    claim_fencing_token
1418             FROM lash_queued_work_batches
1419             WHERE session_id = $1
1420               AND available_at_ms <= $2
1421               AND (claim_token IS NULL OR claim_expires_at_ms <= $2)
1422             ORDER BY enqueue_seq ASC
1423             LIMIT $3
1424             FOR UPDATE SKIP LOCKED",
1425        )
1426        .bind(session_id)
1427        .bind(now as i64)
1428        .bind((max_batches as i64).saturating_add(32))
1429        .fetch_all(&mut *tx)
1430        .await
1431        .map_err(store_sqlx_error)?;
1432        let mut candidates = Vec::new();
1433        for row in rows {
1434            candidates.push(queued_batch_row(row)?);
1435        }
1436        let Some(first_row) = candidates.first() else {
1437            tx.commit().await.map_err(store_sqlx_error)?;
1438            return Ok(None);
1439        };
1440        if boundary == QueuedWorkClaimBoundary::ActiveTurnCheckpoint
1441            && first_row.delivery_policy != DeliveryPolicy::EarliestSafeBoundary
1442        {
1443            tx.commit().await.map_err(store_sqlx_error)?;
1444            return Ok(None);
1445        }
1446        let first_slot = first_row.slot_policy;
1447        let first_delivery = first_row.delivery_policy;
1448        let first_merge = first_row.merge_key.clone();
1449        let mut selected = Vec::new();
1450        for row in candidates {
1451            if selected.len() >= max_batches {
1452                break;
1453            }
1454            if selected.is_empty() {
1455                selected.push(row);
1456                if first_slot == SlotPolicy::Exclusive {
1457                    break;
1458                }
1459                continue;
1460            }
1461            if first_slot != SlotPolicy::Join
1462                || row.slot_policy != SlotPolicy::Join
1463                || row.delivery_policy != first_delivery
1464                || row.merge_key != first_merge
1465            {
1466                break;
1467            }
1468            selected.push(row);
1469        }
1470        let Some(first) = selected.first() else {
1471            tx.commit().await.map_err(store_sqlx_error)?;
1472            return Ok(None);
1473        };
1474        let fencing_token = first.claim_fencing_token.saturating_add(1);
1475        let claim_id = format!("qwc:{}:{fencing_token}", first.enqueue_seq);
1476        let lease_token = format!(
1477            "{:x}",
1478            Sha256::digest(format!("{session_id}:{owner_id}:{claim_id}:{now}").as_bytes())
1479        );
1480        let expires_at = now.saturating_add(lease_ttl_ms);
1481        for row in &selected {
1482            let changed = sqlx::query(
1483                "UPDATE lash_queued_work_batches
1484                 SET claim_id = $3,
1485                     claim_owner_id = $4,
1486                     claim_token = $5,
1487                     claim_fencing_token = claim_fencing_token + 1,
1488                     claim_claimed_at_ms = $6,
1489                     claim_expires_at_ms = $7
1490                 WHERE session_id = $1
1491                   AND batch_id = $2
1492                   AND (claim_token IS NULL OR claim_expires_at_ms <= $6)",
1493            )
1494            .bind(session_id)
1495            .bind(&row.batch_id)
1496            .bind(&claim_id)
1497            .bind(owner_id)
1498            .bind(&lease_token)
1499            .bind(now as i64)
1500            .bind(expires_at as i64)
1501            .execute(&mut *tx)
1502            .await
1503            .map_err(store_sqlx_error)?
1504            .rows_affected();
1505            if changed == 0 {
1506                tx.rollback().await.map_err(store_sqlx_error)?;
1507                return Ok(None);
1508            }
1509        }
1510        let mut batches = Vec::new();
1511        for row in selected {
1512            batches.push(queued_work_batch_from_row(&mut tx, row).await?);
1513        }
1514        tx.commit().await.map_err(store_sqlx_error)?;
1515        Ok(Some(QueuedWorkClaim {
1516            session_id: session_id.to_string(),
1517            claim_id,
1518            owner_id: owner_id.to_string(),
1519            lease_token,
1520            fencing_token,
1521            claimed_at_epoch_ms: now,
1522            expires_at_epoch_ms: expires_at,
1523            batches,
1524        }))
1525    }
1526
1527    async fn renew_queued_work_claim(
1528        &self,
1529        claim: &QueuedWorkClaim,
1530        lease_ttl_ms: u64,
1531    ) -> Result<QueuedWorkClaim, StoreError> {
1532        let expires_at = current_epoch_ms().saturating_add(lease_ttl_ms);
1533        let changed = sqlx::query(
1534            "UPDATE lash_queued_work_batches
1535             SET claim_expires_at_ms = $4
1536             WHERE session_id = $1 AND claim_id = $2 AND claim_token = $3",
1537        )
1538        .bind(&claim.session_id)
1539        .bind(&claim.claim_id)
1540        .bind(&claim.lease_token)
1541        .bind(expires_at as i64)
1542        .execute(&self.pool)
1543        .await
1544        .map_err(store_sqlx_error)?
1545        .rows_affected();
1546        if changed as usize != claim.batches.len() {
1547            return Err(StoreError::QueuedWorkClaimExpired {
1548                session_id: claim.session_id.clone(),
1549                claim_id: claim.claim_id.clone(),
1550            });
1551        }
1552        Ok(QueuedWorkClaim {
1553            expires_at_epoch_ms: expires_at,
1554            ..claim.clone()
1555        })
1556    }
1557
1558    async fn abandon_queued_work_claim(&self, claim: &QueuedWorkClaim) -> Result<(), StoreError> {
1559        sqlx::query(
1560            "UPDATE lash_queued_work_batches
1561             SET claim_id = NULL,
1562                 claim_owner_id = NULL,
1563                 claim_token = NULL,
1564                 claim_claimed_at_ms = 0,
1565                 claim_expires_at_ms = 0
1566             WHERE session_id = $1 AND claim_id = $2 AND claim_token = $3",
1567        )
1568        .bind(&claim.session_id)
1569        .bind(&claim.claim_id)
1570        .bind(&claim.lease_token)
1571        .execute(&self.pool)
1572        .await
1573        .map_err(store_sqlx_error)?;
1574        Ok(())
1575    }
1576
1577    async fn cancel_queued_work_batch(
1578        &self,
1579        session_id: &str,
1580        batch_id: &str,
1581    ) -> Result<Option<QueuedWorkBatch>, StoreError> {
1582        let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1583        let now = current_epoch_ms();
1584        let row = sqlx::query(
1585            "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1586                    slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1587                    claim_fencing_token
1588             FROM lash_queued_work_batches
1589             WHERE session_id = $1
1590               AND batch_id = $2
1591               AND (claim_token IS NULL OR claim_expires_at_ms <= $3)
1592             FOR UPDATE",
1593        )
1594        .bind(session_id)
1595        .bind(batch_id)
1596        .bind(now as i64)
1597        .fetch_optional(&mut *tx)
1598        .await
1599        .map_err(store_sqlx_error)?;
1600        let Some(row) = row else {
1601            tx.commit().await.map_err(store_sqlx_error)?;
1602            return Ok(None);
1603        };
1604        let batch = queued_work_batch_from_row(&mut tx, queued_batch_row(row)?).await?;
1605        sqlx::query("DELETE FROM lash_queued_work_batches WHERE batch_id = $1")
1606            .bind(batch_id)
1607            .execute(&mut *tx)
1608            .await
1609            .map_err(store_sqlx_error)?;
1610        tx.commit().await.map_err(store_sqlx_error)?;
1611        Ok(Some(batch))
1612    }
1613
1614    async fn list_queued_work(&self, session_id: &str) -> Result<Vec<QueuedWorkBatch>, StoreError> {
1615        let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1616        let rows = sqlx::query(
1617            "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1618                    slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1619                    claim_fencing_token
1620             FROM lash_queued_work_batches
1621             WHERE session_id = $1
1622             ORDER BY enqueue_seq ASC",
1623        )
1624        .bind(session_id)
1625        .fetch_all(&mut *tx)
1626        .await
1627        .map_err(store_sqlx_error)?;
1628        let mut batches = Vec::new();
1629        for row in rows {
1630            batches.push(queued_work_batch_from_row(&mut tx, queued_batch_row(row)?).await?);
1631        }
1632        tx.commit().await.map_err(store_sqlx_error)?;
1633        Ok(batches)
1634    }
1635
1636    async fn list_pending_queued_work(
1637        &self,
1638        session_id: &str,
1639    ) -> Result<Vec<QueuedWorkBatch>, StoreError> {
1640        let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1641        let now = current_epoch_ms();
1642        let rows = sqlx::query(
1643            "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1644                    slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1645                    claim_fencing_token
1646             FROM lash_queued_work_batches
1647             WHERE session_id = $1
1648               AND (claim_token IS NULL OR claim_expires_at_ms <= $2)
1649             ORDER BY enqueue_seq ASC",
1650        )
1651        .bind(session_id)
1652        .bind(now as i64)
1653        .fetch_all(&mut *tx)
1654        .await
1655        .map_err(store_sqlx_error)?;
1656        let mut batches = Vec::new();
1657        for row in rows {
1658            batches.push(queued_work_batch_from_row(&mut tx, queued_batch_row(row)?).await?);
1659        }
1660        tx.commit().await.map_err(store_sqlx_error)?;
1661        Ok(batches)
1662    }
1663
1664    async fn save_session_meta(&self, meta: SessionMeta) -> Result<(), StoreError> {
1665        sqlx::query(
1666            "INSERT INTO lash_session_meta (session_id, meta_json)
1667             VALUES ($1, $2)
1668             ON CONFLICT (session_id) DO UPDATE SET meta_json = EXCLUDED.meta_json",
1669        )
1670        .bind(&meta.session_id)
1671        .bind(encode_json(&meta))
1672        .execute(&self.pool)
1673        .await
1674        .map_err(store_sqlx_error)?;
1675        Ok(())
1676    }
1677
1678    async fn load_session_meta(&self) -> Result<Option<SessionMeta>, StoreError> {
1679        let json: Option<String> = if let Some(session_id) = &self.session_id {
1680            sqlx::query_scalar("SELECT meta_json FROM lash_session_meta WHERE session_id = $1")
1681                .bind(session_id)
1682                .fetch_optional(&self.pool)
1683                .await
1684                .map_err(store_sqlx_error)?
1685        } else {
1686            sqlx::query_scalar(
1687                "SELECT meta_json FROM lash_session_meta ORDER BY session_id ASC LIMIT 1",
1688            )
1689            .fetch_optional(&self.pool)
1690            .await
1691            .map_err(store_sqlx_error)?
1692        };
1693        json.map(|json| store_decode_json(&json, "session meta"))
1694            .transpose()
1695    }
1696
1697    async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), StoreError> {
1698        for id in ids {
1699            if let Some(session_id) = &self.session_id {
1700                sqlx::query(
1701                    "UPDATE lash_graph_nodes
1702                     SET tombstoned = TRUE
1703                     WHERE session_id = $1 AND node_id = $2",
1704                )
1705                .bind(session_id)
1706                .bind(id)
1707                .execute(&self.pool)
1708                .await
1709                .map_err(store_sqlx_error)?;
1710            } else {
1711                sqlx::query(
1712                    "UPDATE lash_graph_nodes
1713                     SET tombstoned = TRUE
1714                     WHERE node_id = $1",
1715                )
1716                .bind(id)
1717                .execute(&self.pool)
1718                .await
1719                .map_err(store_sqlx_error)?;
1720            }
1721        }
1722        Ok(())
1723    }
1724
1725    async fn vacuum(&self) -> Result<VacuumReport, StoreError> {
1726        let removed = if let Some(session_id) = &self.session_id {
1727            sqlx::query("DELETE FROM lash_graph_nodes WHERE session_id = $1 AND tombstoned = TRUE")
1728                .bind(session_id)
1729                .execute(&self.pool)
1730                .await
1731                .map_err(store_sqlx_error)?
1732                .rows_affected()
1733        } else {
1734            sqlx::query("DELETE FROM lash_graph_nodes WHERE tombstoned = TRUE")
1735                .execute(&self.pool)
1736                .await
1737                .map_err(store_sqlx_error)?
1738                .rows_affected()
1739        };
1740        Ok(VacuumReport {
1741            removed_node_count: removed as usize,
1742        })
1743    }
1744
1745    async fn gc_unreachable(&self) -> Result<GcReport, StoreError> {
1746        Ok(GcReport::default())
1747    }
1748}
1749
1750fn process_status_label(record: &ProcessRecord) -> &'static str {
1751    record.status.label()
1752}
1753
1754async fn load_process_tx(
1755    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1756    process_id: &str,
1757) -> Result<Option<ProcessRecord>, PluginError> {
1758    let json: Option<String> = sqlx::query_scalar(
1759        "SELECT record_json
1760             FROM lash_processes
1761             WHERE process_id = $1
1762             FOR UPDATE",
1763    )
1764    .bind(process_id)
1765    .fetch_optional(&mut **tx)
1766    .await
1767    .map_err(plugin_sqlx_error)?;
1768    json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
1769        .transpose()
1770}
1771
1772async fn load_process(
1773    pool: &PgPool,
1774    process_id: &str,
1775) -> Result<Option<ProcessRecord>, PluginError> {
1776    let json: Option<String> =
1777        sqlx::query_scalar("SELECT record_json FROM lash_processes WHERE process_id = $1")
1778            .bind(process_id)
1779            .fetch_optional(pool)
1780            .await
1781            .map_err(plugin_sqlx_error)?;
1782    json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
1783        .transpose()
1784}
1785
1786async fn save_process_tx(
1787    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1788    record: &ProcessRecord,
1789) -> Result<(), PluginError> {
1790    sqlx::query(
1791        "UPDATE lash_processes
1792         SET updated_at_ms = $2, status = $3, record_json = $4
1793         WHERE process_id = $1",
1794    )
1795    .bind(&record.id)
1796    .bind(record.updated_at_ms as i64)
1797    .bind(process_status_label(record))
1798    .bind(serde_json::to_string(record).map_err(process_decode_error)?)
1799    .execute(&mut **tx)
1800    .await
1801    .map_err(plugin_sqlx_error)?;
1802    Ok(())
1803}
1804
1805async fn load_event_by_key_tx(
1806    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1807    process_id: &str,
1808    replay_key: &str,
1809) -> Result<Option<(String, ProcessEvent)>, PluginError> {
1810    let row = sqlx::query(
1811        "SELECT payload_hash, event_json
1812         FROM lash_process_events
1813         WHERE process_id = $1 AND idempotency_key = $2",
1814    )
1815    .bind(process_id)
1816    .bind(replay_key)
1817    .fetch_optional(&mut **tx)
1818    .await
1819    .map_err(plugin_sqlx_error)?;
1820    row.map(|row| {
1821        let hash: String = row.get(0);
1822        let json: String = row.get(1);
1823        serde_json::from_str(&json)
1824            .map(|event| (hash, event))
1825            .map_err(process_decode_error)
1826    })
1827    .transpose()
1828}
1829
1830async fn load_process_lease_tx(
1831    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1832    process_id: &str,
1833) -> Result<Option<ProcessLease>, PluginError> {
1834    let row = sqlx::query(
1835        "SELECT lease_owner_id, lease_token, lease_fencing_token,
1836                lease_claimed_at_ms, lease_expires_at_ms
1837         FROM lash_process_leases
1838         WHERE process_id = $1",
1839    )
1840    .bind(process_id)
1841    .fetch_optional(&mut **tx)
1842    .await
1843    .map_err(plugin_sqlx_error)?;
1844    let Some(row) = row else {
1845        return Ok(None);
1846    };
1847    let owner_id: Option<String> = row.get(0);
1848    let lease_token: Option<String> = row.get(1);
1849    let (Some(owner_id), Some(lease_token)) = (owner_id, lease_token) else {
1850        return Ok(None);
1851    };
1852    Ok(Some(ProcessLease {
1853        schema_version: PROCESS_LEASE_SCHEMA_VERSION,
1854        process_id: process_id.to_string(),
1855        owner_id,
1856        lease_token,
1857        fencing_token: row.get::<i64, _>(2) as u64,
1858        claimed_at_epoch_ms: row.get::<i64, _>(3) as u64,
1859        expires_at_epoch_ms: row.get::<i64, _>(4) as u64,
1860    }))
1861}
1862
1863fn process_lease_conflict(process_id: &str, current: &ProcessLease) -> PluginError {
1864    PluginError::Session(format!(
1865        "process `{process_id}` is already leased by `{}` until {}",
1866        current.owner_id, current.expires_at_epoch_ms
1867    ))
1868}
1869
1870fn process_lease_expired(process_id: &str) -> PluginError {
1871    PluginError::Session(format!(
1872        "process lease for `{process_id}` is missing or expired"
1873    ))
1874}
1875
1876fn guard_lease(current: Option<&ProcessLease>, lease_token: &str, now: u64) -> bool {
1877    current
1878        .map(|current| current.lease_token == lease_token && current.expires_at_epoch_ms > now)
1879        .unwrap_or(false)
1880}
1881
1882async fn list_grants_for_scope(
1883    pool: &PgPool,
1884    owner_scope: &ProcessScope,
1885    live_only: bool,
1886) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
1887    let status_clause = if live_only {
1888        "AND p.status = 'running'"
1889    } else {
1890        ""
1891    };
1892    let sql = format!(
1893        "SELECT g.process_id, g.descriptor_json, p.record_json
1894         FROM lash_process_handle_grants g
1895         JOIN lash_processes p ON p.process_id = g.process_id
1896         WHERE g.scope_id = $1 {status_clause}
1897         ORDER BY g.process_id ASC"
1898    );
1899    let rows = sqlx::query(&sql)
1900        .bind(owner_scope.id().as_str())
1901        .fetch_all(pool)
1902        .await
1903        .map_err(plugin_sqlx_error)?;
1904    let mut entries = Vec::new();
1905    for row in rows {
1906        let process_id: String = row.get(0);
1907        let descriptor_json: String = row.get(1);
1908        let record_json: String = row.get(2);
1909        let descriptor: ProcessHandleDescriptor =
1910            serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
1911        let record: ProcessRecord =
1912            serde_json::from_str(&record_json).map_err(process_decode_error)?;
1913        entries.push((
1914            ProcessHandleGrant {
1915                session_id: owner_scope.session_id.clone(),
1916                process_id,
1917                descriptor,
1918            },
1919            record,
1920        ));
1921    }
1922    Ok(entries)
1923}
1924
1925#[async_trait::async_trait]
1926impl ProcessRegistry for PostgresProcessRegistry {
1927    fn durability_tier(&self) -> DurabilityTier {
1928        DurabilityTier::Durable
1929    }
1930
1931    async fn register_process(
1932        &self,
1933        registration: ProcessRegistration,
1934    ) -> Result<ProcessRecord, PluginError> {
1935        let (registration, registration_hash) =
1936            lash_core::runtime::prepare_process_registration(registration)?;
1937        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
1938        if let Some(existing) = load_process_tx(&mut tx, &registration.id).await? {
1939            if existing.registration_hash == registration_hash {
1940                tx.commit().await.map_err(plugin_sqlx_error)?;
1941                return Ok(existing);
1942            }
1943            return Err(PluginError::Session(format!(
1944                "process `{}` registration hash conflict: existing {}, new {}",
1945                registration.id, existing.registration_hash, registration_hash
1946            )));
1947        }
1948        let now = current_epoch_ms();
1949        let record =
1950            ProcessRecord::from_prepared_registration(registration, registration_hash, now);
1951        let record_json = serde_json::to_string(&record).map_err(process_decode_error)?;
1952        sqlx::query(
1953            "INSERT INTO lash_processes (
1954                process_id, registration_hash, owner_scope_id, host_profile_id,
1955                created_at_ms, updated_at_ms, status, record_json
1956             )
1957             VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
1958        )
1959        .bind(&record.id)
1960        .bind(&record.registration_hash)
1961        .bind(record.owner_scope_id().as_str())
1962        .bind(record.host_profile_id())
1963        .bind(record.created_at_ms as i64)
1964        .bind(record.updated_at_ms as i64)
1965        .bind(process_status_label(&record))
1966        .bind(record_json)
1967        .execute(&mut *tx)
1968        .await
1969        .map_err(plugin_sqlx_error)?;
1970        tx.commit().await.map_err(plugin_sqlx_error)?;
1971        Ok(record)
1972    }
1973
1974    async fn set_external_ref(
1975        &self,
1976        process_id: &str,
1977        external_ref: ProcessExternalRef,
1978    ) -> Result<ProcessRecord, PluginError> {
1979        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
1980        let mut record = load_process_tx(&mut tx, process_id)
1981            .await?
1982            .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
1983        record.external_ref = Some(external_ref);
1984        record.updated_at_ms = current_epoch_ms();
1985        save_process_tx(&mut tx, &record).await?;
1986        tx.commit().await.map_err(plugin_sqlx_error)?;
1987        Ok(record)
1988    }
1989
1990    async fn grant_handle(
1991        &self,
1992        owner_scope: &ProcessScope,
1993        process_id: &str,
1994        descriptor: ProcessHandleDescriptor,
1995    ) -> Result<ProcessHandleGrant, PluginError> {
1996        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
1997        if load_process_tx(&mut tx, process_id).await?.is_none() {
1998            return Err(PluginError::Session(format!(
1999                "unknown process `{process_id}`"
2000            )));
2001        }
2002        sqlx::query(
2003            "INSERT INTO lash_process_handle_grants (session_id, scope_id, process_id, descriptor_json)
2004             VALUES ($1, $2, $3, $4)
2005             ON CONFLICT (scope_id, process_id) DO UPDATE SET
2006                session_id = EXCLUDED.session_id,
2007                descriptor_json = EXCLUDED.descriptor_json",
2008        )
2009        .bind(&owner_scope.session_id)
2010        .bind(owner_scope.id().as_str())
2011        .bind(process_id)
2012        .bind(serde_json::to_string(&descriptor).map_err(process_decode_error)?)
2013        .execute(&mut *tx)
2014        .await
2015        .map_err(plugin_sqlx_error)?;
2016        tx.commit().await.map_err(plugin_sqlx_error)?;
2017        Ok(ProcessHandleGrant {
2018            session_id: owner_scope.session_id.clone(),
2019            process_id: process_id.to_string(),
2020            descriptor,
2021        })
2022    }
2023
2024    async fn revoke_handle(
2025        &self,
2026        owner_scope: &ProcessScope,
2027        process_id: &str,
2028    ) -> Result<(), PluginError> {
2029        sqlx::query(
2030            "DELETE FROM lash_process_handle_grants WHERE scope_id = $1 AND process_id = $2",
2031        )
2032        .bind(owner_scope.id().as_str())
2033        .bind(process_id)
2034        .execute(&self.pool)
2035        .await
2036        .map_err(plugin_sqlx_error)?;
2037        Ok(())
2038    }
2039
2040    async fn transfer_handle_grants(
2041        &self,
2042        from_scope: &ProcessScope,
2043        to_scope: &ProcessScope,
2044        process_ids: &[String],
2045    ) -> Result<(), PluginError> {
2046        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2047        for process_id in process_ids {
2048            let descriptor_json: Option<String> = sqlx::query_scalar(
2049                "SELECT descriptor_json FROM lash_process_handle_grants
2050                 WHERE scope_id = $1 AND process_id = $2",
2051            )
2052            .bind(from_scope.id().as_str())
2053            .bind(process_id)
2054            .fetch_optional(&mut *tx)
2055            .await
2056            .map_err(plugin_sqlx_error)?;
2057            let Some(descriptor_json) = descriptor_json else {
2058                return Err(PluginError::Session(format!(
2059                    "process handle `{process_id}` is not granted to session `{}`",
2060                    from_scope.session_id
2061                )));
2062            };
2063            sqlx::query(
2064                "DELETE FROM lash_process_handle_grants WHERE scope_id = $1 AND process_id = $2",
2065            )
2066            .bind(from_scope.id().as_str())
2067            .bind(process_id)
2068            .execute(&mut *tx)
2069            .await
2070            .map_err(plugin_sqlx_error)?;
2071            sqlx::query(
2072                "INSERT INTO lash_process_handle_grants (session_id, scope_id, process_id, descriptor_json)
2073                 VALUES ($1, $2, $3, $4)
2074                 ON CONFLICT (scope_id, process_id) DO UPDATE SET
2075                    session_id = EXCLUDED.session_id,
2076                    descriptor_json = EXCLUDED.descriptor_json",
2077            )
2078            .bind(&to_scope.session_id)
2079            .bind(to_scope.id().as_str())
2080            .bind(process_id)
2081            .bind(descriptor_json)
2082            .execute(&mut *tx)
2083            .await
2084            .map_err(plugin_sqlx_error)?;
2085        }
2086        tx.commit().await.map_err(plugin_sqlx_error)
2087    }
2088
2089    async fn list_handle_grants(
2090        &self,
2091        owner_scope: &ProcessScope,
2092    ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
2093        list_grants_for_scope(&self.pool, owner_scope, false).await
2094    }
2095
2096    async fn list_live_handle_grants(
2097        &self,
2098        owner_scope: &ProcessScope,
2099    ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
2100        list_grants_for_scope(&self.pool, owner_scope, true).await
2101    }
2102
2103    async fn has_handle_grant(
2104        &self,
2105        owner_scope: &ProcessScope,
2106        process_id: &str,
2107    ) -> Result<bool, PluginError> {
2108        let exists: Option<i64> = sqlx::query_scalar(
2109            "SELECT 1::BIGINT FROM lash_process_handle_grants
2110             WHERE scope_id = $1 AND process_id = $2
2111             LIMIT 1",
2112        )
2113        .bind(owner_scope.id().as_str())
2114        .bind(process_id)
2115        .fetch_optional(&self.pool)
2116        .await
2117        .map_err(plugin_sqlx_error)?;
2118        Ok(exists.is_some())
2119    }
2120
2121    async fn handle_grants_for_process(
2122        &self,
2123        process_id: &str,
2124    ) -> Result<Vec<ProcessHandleGrant>, PluginError> {
2125        if load_process(&self.pool, process_id).await?.is_none() {
2126            return Err(PluginError::Session(format!(
2127                "unknown process `{process_id}`"
2128            )));
2129        }
2130        let rows = sqlx::query(
2131            "SELECT session_id, descriptor_json
2132             FROM lash_process_handle_grants
2133             WHERE process_id = $1
2134             ORDER BY session_id ASC, scope_id ASC",
2135        )
2136        .bind(process_id)
2137        .fetch_all(&self.pool)
2138        .await
2139        .map_err(plugin_sqlx_error)?;
2140        let mut grants = Vec::new();
2141        for row in rows {
2142            let descriptor_json: String = row.get(1);
2143            grants.push(ProcessHandleGrant {
2144                session_id: row.get(0),
2145                process_id: process_id.to_string(),
2146                descriptor: serde_json::from_str(&descriptor_json).map_err(process_decode_error)?,
2147            });
2148        }
2149        Ok(grants)
2150    }
2151
2152    async fn delete_session_process_state(
2153        &self,
2154        session_id: &str,
2155    ) -> Result<lash_core::ProcessSessionDeleteReport, PluginError> {
2156        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2157        let rows = sqlx::query(
2158            "SELECT g.process_id, p.record_json
2159             FROM lash_process_handle_grants g
2160             JOIN lash_processes p ON p.process_id = g.process_id
2161             WHERE g.session_id = $1
2162             ORDER BY g.process_id ASC",
2163        )
2164        .bind(session_id)
2165        .fetch_all(&mut *tx)
2166        .await
2167        .map_err(plugin_sqlx_error)?;
2168        let mut removed = Vec::new();
2169        for row in rows {
2170            let process_id: String = row.get(0);
2171            let record_json: String = row.get(1);
2172            let record: ProcessRecord =
2173                serde_json::from_str(&record_json).map_err(process_decode_error)?;
2174            removed.push((process_id, record));
2175        }
2176        let revoked = sqlx::query("DELETE FROM lash_process_handle_grants WHERE session_id = $1")
2177            .bind(session_id)
2178            .execute(&mut *tx)
2179            .await
2180            .map_err(plugin_sqlx_error)?
2181            .rows_affected() as usize;
2182        let mut cancel_process_ids = Vec::new();
2183        let mut preserved_process_ids = Vec::new();
2184        for (process_id, record) in removed {
2185            if record.is_terminal() {
2186                continue;
2187            }
2188            let remaining: i64 = sqlx::query_scalar(
2189                "SELECT COUNT(*) FROM lash_process_handle_grants WHERE process_id = $1",
2190            )
2191            .bind(&process_id)
2192            .fetch_one(&mut *tx)
2193            .await
2194            .map_err(plugin_sqlx_error)?;
2195            if remaining == 0 {
2196                cancel_process_ids.push(process_id);
2197            } else {
2198                preserved_process_ids.push(process_id);
2199            }
2200        }
2201        tx.commit().await.map_err(plugin_sqlx_error)?;
2202        cancel_process_ids.sort();
2203        cancel_process_ids.dedup();
2204        preserved_process_ids.sort();
2205        preserved_process_ids.dedup();
2206        Ok(lash_core::ProcessSessionDeleteReport {
2207            session_id: session_id.to_string(),
2208            revoked_handle_count: revoked,
2209            deleted_wake_count: 0,
2210            cancel_process_ids,
2211            preserved_process_ids,
2212        })
2213    }
2214
2215    async fn append_event(
2216        &self,
2217        process_id: &str,
2218        request: ProcessEventAppendRequest,
2219    ) -> Result<ProcessEventAppendResult, PluginError> {
2220        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2221        let mut record = load_process_tx(&mut tx, process_id)
2222            .await?
2223            .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
2224        let replay_lookup =
2225            if let Some(replay_key) = request.replay.as_ref().map(|r| r.key.as_str()) {
2226                load_event_by_key_tx(&mut tx, process_id, replay_key).await?
2227            } else {
2228                None
2229            };
2230        let sequence: i64 = sqlx::query_scalar(
2231            "SELECT COALESCE(MAX(sequence), 0) + 1 FROM lash_process_events WHERE process_id = $1",
2232        )
2233        .bind(process_id)
2234        .fetch_one(&mut *tx)
2235        .await
2236        .map_err(plugin_sqlx_error)?;
2237        let occurred_at_ms = current_epoch_ms();
2238        let prepared = lash_core::runtime::prepare_process_event_append(
2239            &record,
2240            request,
2241            sequence as u64,
2242            replay_lookup,
2243            occurred_at_ms,
2244        )?;
2245        if prepared.replayed {
2246            let repaired = if let Some(status) = prepared.status_update.clone() {
2247                record.status = status;
2248                record.updated_at_ms = prepared.occurred_at_ms;
2249                save_process_tx(&mut tx, &record).await?;
2250                true
2251            } else {
2252                false
2253            };
2254            tx.commit().await.map_err(plugin_sqlx_error)?;
2255            if repaired {
2256                self.notify.notify_waiters();
2257            }
2258            return Ok(ProcessEventAppendResult {
2259                event: prepared.event,
2260                wake_delivery: prepared.wake_delivery,
2261            });
2262        }
2263        let event = prepared.event;
2264        sqlx::query(
2265            "INSERT INTO lash_process_events (
2266                process_id, sequence, event_type, payload_hash, idempotency_key,
2267                occurred_at_ms, event_json
2268             )
2269             VALUES ($1, $2, $3, $4, $5, $6, $7)",
2270        )
2271        .bind(process_id)
2272        .bind(sequence)
2273        .bind(event.event_type.as_str())
2274        .bind(&prepared.payload_hash)
2275        .bind(event.invocation.replay_key())
2276        .bind(prepared.occurred_at_ms as i64)
2277        .bind(serde_json::to_string(&event).map_err(process_decode_error)?)
2278        .execute(&mut *tx)
2279        .await
2280        .map_err(plugin_sqlx_error)?;
2281        if let Some(status) = prepared.status_update.clone() {
2282            record.status = status;
2283        }
2284        record.updated_at_ms = prepared.occurred_at_ms;
2285        save_process_tx(&mut tx, &record).await?;
2286        tx.commit().await.map_err(plugin_sqlx_error)?;
2287        self.notify.notify_waiters();
2288        Ok(ProcessEventAppendResult {
2289            event,
2290            wake_delivery: prepared.wake_delivery,
2291        })
2292    }
2293
2294    async fn events_after(
2295        &self,
2296        process_id: &str,
2297        after_sequence: u64,
2298    ) -> Result<Vec<ProcessEvent>, PluginError> {
2299        if load_process(&self.pool, process_id).await?.is_none() {
2300            return Err(PluginError::Session(format!(
2301                "unknown process `{process_id}`"
2302            )));
2303        }
2304        let rows = sqlx::query(
2305            "SELECT event_json FROM lash_process_events
2306             WHERE process_id = $1 AND sequence > $2
2307             ORDER BY sequence ASC",
2308        )
2309        .bind(process_id)
2310        .bind(after_sequence as i64)
2311        .fetch_all(&self.pool)
2312        .await
2313        .map_err(plugin_sqlx_error)?;
2314        let mut events = Vec::new();
2315        for row in rows {
2316            let json: String = row.get(0);
2317            events.push(serde_json::from_str(&json).map_err(process_decode_error)?);
2318        }
2319        Ok(events)
2320    }
2321
2322    async fn wake_events_after(
2323        &self,
2324        process_id: &str,
2325        after_sequence: u64,
2326    ) -> Result<Vec<ProcessEvent>, PluginError> {
2327        let rows = sqlx::query("SELECT sequence FROM lash_process_wake_acks WHERE process_id = $1")
2328            .bind(process_id)
2329            .fetch_all(&self.pool)
2330            .await
2331            .map_err(plugin_sqlx_error)?;
2332        let acked = rows
2333            .into_iter()
2334            .map(|row| row.get::<i64, _>(0) as u64)
2335            .collect::<HashSet<_>>();
2336        Ok(self
2337            .events_after(process_id, after_sequence)
2338            .await?
2339            .into_iter()
2340            .filter(|event| event.semantics.wake.is_some() && !acked.contains(&event.sequence))
2341            .collect())
2342    }
2343
2344    async fn wait_event_after(
2345        &self,
2346        process_id: &str,
2347        event_type: &str,
2348        after_sequence: u64,
2349    ) -> Result<ProcessEvent, PluginError> {
2350        loop {
2351            if let Some(event) = self
2352                .events_after(process_id, after_sequence)
2353                .await?
2354                .into_iter()
2355                .find(|event| event.event_type == event_type)
2356            {
2357                return Ok(event);
2358            }
2359            tokio::select! {
2360                _ = self.notify.notified() => {}
2361                _ = tokio::time::sleep(Duration::from_millis(50)) => {}
2362            }
2363        }
2364    }
2365
2366    async fn await_process(&self, process_id: &str) -> Result<ProcessAwaitOutput, PluginError> {
2367        loop {
2368            let record = load_process(&self.pool, process_id)
2369                .await?
2370                .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
2371            if let Some(await_output) = record.status.await_output() {
2372                return Ok(await_output.clone());
2373            }
2374            tokio::select! {
2375                _ = self.notify.notified() => {}
2376                _ = tokio::time::sleep(Duration::from_millis(50)) => {}
2377            }
2378        }
2379    }
2380
2381    async fn complete_process(
2382        &self,
2383        process_id: &str,
2384        await_output: ProcessAwaitOutput,
2385    ) -> Result<ProcessRecord, PluginError> {
2386        let event_type = match await_output.terminal_state() {
2387            lash_core::ProcessTerminalState::Completed => "process.completed",
2388            lash_core::ProcessTerminalState::Failed => "process.failed",
2389            lash_core::ProcessTerminalState::Cancelled => "process.cancelled",
2390        };
2391        self.append_event(
2392            process_id,
2393            ProcessEventAppendRequest::new(
2394                event_type,
2395                serde_json::json!({ "await_output": await_output }),
2396            )
2397            .with_replay_key(format!("process:{process_id}:terminal:{event_type}")),
2398        )
2399        .await?;
2400        load_process(&self.pool, process_id).await?.ok_or_else(|| {
2401            PluginError::Session(format!(
2402                "unknown process `{process_id}` after terminal event"
2403            ))
2404        })
2405    }
2406
2407    async fn get_process(&self, process_id: &str) -> Option<ProcessRecord> {
2408        load_process(&self.pool, process_id).await.ok().flatten()
2409    }
2410
2411    async fn ack_wake(&self, process_id: &str, sequence: u64) -> Result<(), PluginError> {
2412        if load_process(&self.pool, process_id).await?.is_none() {
2413            return Err(PluginError::Session(format!(
2414                "unknown process `{process_id}`"
2415            )));
2416        }
2417        sqlx::query(
2418            "INSERT INTO lash_process_wake_acks (process_id, sequence)
2419             VALUES ($1, $2)
2420             ON CONFLICT DO NOTHING",
2421        )
2422        .bind(process_id)
2423        .bind(sequence as i64)
2424        .execute(&self.pool)
2425        .await
2426        .map_err(plugin_sqlx_error)?;
2427        Ok(())
2428    }
2429
2430    async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, PluginError> {
2431        let rows = sqlx::query(
2432            "SELECT record_json FROM lash_processes
2433             WHERE status = 'running'
2434             ORDER BY process_id ASC",
2435        )
2436        .fetch_all(&self.pool)
2437        .await
2438        .map_err(plugin_sqlx_error)?;
2439        let mut records = Vec::new();
2440        for row in rows {
2441            let json: String = row.get(0);
2442            records.push(serde_json::from_str(&json).map_err(process_decode_error)?);
2443        }
2444        Ok(records)
2445    }
2446
2447    async fn claim_process_lease(
2448        &self,
2449        process_id: &str,
2450        owner_id: &str,
2451        lease_ttl_ms: u64,
2452    ) -> Result<ProcessLease, PluginError> {
2453        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2454        if load_process_tx(&mut tx, process_id).await?.is_none() {
2455            return Err(PluginError::Session(format!(
2456                "unknown process `{process_id}`"
2457            )));
2458        }
2459        let now = current_epoch_ms();
2460        let current = load_process_lease_tx(&mut tx, process_id).await?;
2461        if let Some(current) = current.as_ref()
2462            && current.expires_at_epoch_ms > now
2463            && current.owner_id != owner_id
2464        {
2465            return Err(process_lease_conflict(process_id, current));
2466        }
2467        let existing_fence: Option<i64> = sqlx::query_scalar(
2468            "SELECT lease_fencing_token FROM lash_process_leases WHERE process_id = $1 FOR UPDATE",
2469        )
2470        .bind(process_id)
2471        .fetch_optional(&mut *tx)
2472        .await
2473        .map_err(plugin_sqlx_error)?;
2474        let fencing_token = existing_fence.unwrap_or(0) as u64 + 1;
2475        let lease = ProcessLease {
2476            schema_version: PROCESS_LEASE_SCHEMA_VERSION,
2477            process_id: process_id.to_string(),
2478            owner_id: owner_id.to_string(),
2479            lease_token: format!(
2480                "{:x}",
2481                Sha256::digest(format!("{process_id}:{owner_id}:{now}:{fencing_token}").as_bytes())
2482            ),
2483            fencing_token,
2484            claimed_at_epoch_ms: now,
2485            expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
2486        };
2487        sqlx::query(
2488            "INSERT INTO lash_process_leases (
2489                process_id, lease_owner_id, lease_token, lease_fencing_token,
2490                lease_claimed_at_ms, lease_expires_at_ms
2491             )
2492             VALUES ($1, $2, $3, $4, $5, $6)
2493             ON CONFLICT (process_id) DO UPDATE SET
2494                lease_owner_id = EXCLUDED.lease_owner_id,
2495                lease_token = EXCLUDED.lease_token,
2496                lease_fencing_token = EXCLUDED.lease_fencing_token,
2497                lease_claimed_at_ms = EXCLUDED.lease_claimed_at_ms,
2498                lease_expires_at_ms = EXCLUDED.lease_expires_at_ms",
2499        )
2500        .bind(&lease.process_id)
2501        .bind(&lease.owner_id)
2502        .bind(&lease.lease_token)
2503        .bind(lease.fencing_token as i64)
2504        .bind(lease.claimed_at_epoch_ms as i64)
2505        .bind(lease.expires_at_epoch_ms as i64)
2506        .execute(&mut *tx)
2507        .await
2508        .map_err(plugin_sqlx_error)?;
2509        tx.commit().await.map_err(plugin_sqlx_error)?;
2510        Ok(lease)
2511    }
2512
2513    async fn renew_process_lease(
2514        &self,
2515        lease: &ProcessLease,
2516        lease_ttl_ms: u64,
2517    ) -> Result<ProcessLease, PluginError> {
2518        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2519        let now = current_epoch_ms();
2520        let current = load_process_lease_tx(&mut tx, &lease.process_id).await?;
2521        if !guard_lease(current.as_ref(), &lease.lease_token, now) {
2522            return Err(process_lease_expired(&lease.process_id));
2523        }
2524        let renewed = ProcessLease {
2525            expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
2526            ..lease.clone()
2527        };
2528        sqlx::query(
2529            "UPDATE lash_process_leases
2530             SET lease_expires_at_ms = $2
2531             WHERE process_id = $1 AND lease_token = $3",
2532        )
2533        .bind(&renewed.process_id)
2534        .bind(renewed.expires_at_epoch_ms as i64)
2535        .bind(&renewed.lease_token)
2536        .execute(&mut *tx)
2537        .await
2538        .map_err(plugin_sqlx_error)?;
2539        tx.commit().await.map_err(plugin_sqlx_error)?;
2540        Ok(renewed)
2541    }
2542
2543    async fn complete_process_lease(
2544        &self,
2545        completion: &ProcessLeaseCompletion,
2546    ) -> Result<(), PluginError> {
2547        sqlx::query(
2548            "UPDATE lash_process_leases
2549             SET lease_owner_id = NULL,
2550                 lease_token = NULL,
2551                 lease_claimed_at_ms = 0,
2552                 lease_expires_at_ms = 0
2553             WHERE process_id = $1 AND lease_token = $2",
2554        )
2555        .bind(&completion.process_id)
2556        .bind(&completion.lease_token)
2557        .execute(&self.pool)
2558        .await
2559        .map_err(plugin_sqlx_error)?;
2560        Ok(())
2561    }
2562}
2563
2564#[async_trait::async_trait]
2565impl HostEventStore for PostgresHostEventStore {
2566    fn durability_tier(&self) -> DurabilityTier {
2567        DurabilityTier::Durable
2568    }
2569
2570    async fn register_subscription(
2571        &self,
2572        draft: TriggerSubscriptionDraft,
2573    ) -> Result<TriggerSubscriptionRecord, PluginError> {
2574        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2575        let seq: i64 = sqlx::query_scalar("SELECT nextval('lash_host_event_subscription_seq')")
2576            .fetch_one(&mut *tx)
2577            .await
2578            .map_err(plugin_sqlx_error)?;
2579        let handle = format!("trigger:{seq}");
2580        let subscription_id = format!("subscription:{seq}");
2581        let now = current_epoch_ms();
2582        let record = TriggerSubscriptionRecord {
2583            subscription_id: subscription_id.clone(),
2584            session_id: draft.session_id,
2585            handle,
2586            name: draft.name,
2587            source_type: draft.source_type,
2588            source_key: draft.source_key,
2589            source: draft.source,
2590            event_ty: draft.event_ty,
2591            module_ref: draft.module_ref,
2592            required_surface_ref: draft.required_surface_ref,
2593            process_ref: draft.process_ref,
2594            process_name: draft.process_name,
2595            input_template: draft.input_template,
2596            enabled: true,
2597            created_at_ms: now,
2598            updated_at_ms: now,
2599        };
2600        sqlx::query(
2601            "INSERT INTO lash_host_event_trigger_subscriptions (
2602                subscription_id, session_id, handle, source_type, source_key,
2603                enabled, created_at_ms, updated_at_ms, record_json
2604             )
2605             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
2606        )
2607        .bind(&record.subscription_id)
2608        .bind(&record.session_id)
2609        .bind(&record.handle)
2610        .bind(&record.source_type)
2611        .bind(&record.source_key)
2612        .bind(record.enabled)
2613        .bind(record.created_at_ms as i64)
2614        .bind(record.updated_at_ms as i64)
2615        .bind(serde_json::to_string(&record).map_err(process_decode_error)?)
2616        .execute(&mut *tx)
2617        .await
2618        .map_err(plugin_sqlx_error)?;
2619        tx.commit().await.map_err(plugin_sqlx_error)?;
2620        Ok(record)
2621    }
2622
2623    async fn list_subscriptions(
2624        &self,
2625        filter: TriggerSubscriptionFilter,
2626    ) -> Result<Vec<TriggerSubscriptionRecord>, PluginError> {
2627        let rows = sqlx::query(
2628            "SELECT record_json FROM lash_host_event_trigger_subscriptions
2629             ORDER BY session_id ASC, handle ASC",
2630        )
2631        .fetch_all(&self.pool)
2632        .await
2633        .map_err(plugin_sqlx_error)?;
2634        let mut records = Vec::new();
2635        for row in rows {
2636            let json: String = row.get(0);
2637            let record: TriggerSubscriptionRecord =
2638                serde_json::from_str(&json).map_err(process_decode_error)?;
2639            if filter.matches(&record) {
2640                records.push(record);
2641            }
2642        }
2643        Ok(records)
2644    }
2645
2646    async fn cancel_subscription(
2647        &self,
2648        session_id: &str,
2649        handle: &str,
2650    ) -> Result<bool, PluginError> {
2651        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2652        let json: Option<String> = sqlx::query_scalar(
2653            "SELECT record_json FROM lash_host_event_trigger_subscriptions
2654             WHERE session_id = $1 AND handle = $2
2655             FOR UPDATE",
2656        )
2657        .bind(session_id)
2658        .bind(handle)
2659        .fetch_optional(&mut *tx)
2660        .await
2661        .map_err(plugin_sqlx_error)?;
2662        let Some(json) = json else {
2663            tx.commit().await.map_err(plugin_sqlx_error)?;
2664            return Ok(false);
2665        };
2666        let mut record: TriggerSubscriptionRecord =
2667            serde_json::from_str(&json).map_err(process_decode_error)?;
2668        let changed = record.enabled;
2669        record.enabled = false;
2670        record.updated_at_ms = current_epoch_ms();
2671        sqlx::query(
2672            "UPDATE lash_host_event_trigger_subscriptions
2673             SET enabled = $3, updated_at_ms = $4, record_json = $5
2674             WHERE session_id = $1 AND handle = $2",
2675        )
2676        .bind(session_id)
2677        .bind(handle)
2678        .bind(record.enabled)
2679        .bind(record.updated_at_ms as i64)
2680        .bind(serde_json::to_string(&record).map_err(process_decode_error)?)
2681        .execute(&mut *tx)
2682        .await
2683        .map_err(plugin_sqlx_error)?;
2684        tx.commit().await.map_err(plugin_sqlx_error)?;
2685        Ok(changed)
2686    }
2687
2688    async fn record_occurrence(
2689        &self,
2690        request: HostEventOccurrenceRequest,
2691    ) -> Result<HostEventOccurrenceRecord, PluginError> {
2692        lash_core::validate_host_event_occurrence_request(&request)?;
2693        let request_hash = lash_core::host_event_occurrence_request_hash(&request)?;
2694        let occurrence_id = lash_core::deterministic_occurrence_id(&request)?;
2695        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2696        let existing = sqlx::query(
2697            "SELECT request_hash, record_json
2698             FROM lash_host_event_occurrences
2699             WHERE idempotency_key = $1
2700             FOR UPDATE",
2701        )
2702        .bind(&request.idempotency_key)
2703        .fetch_optional(&mut *tx)
2704        .await
2705        .map_err(plugin_sqlx_error)?;
2706        if let Some(row) = existing {
2707            let existing_hash: String = row.get(0);
2708            let existing_json: String = row.get(1);
2709            if existing_hash != request_hash {
2710                return Err(PluginError::Session(format!(
2711                    "host event occurrence idempotency conflict for `{}`",
2712                    request.idempotency_key
2713                )));
2714            }
2715            let record = serde_json::from_str(&existing_json).map_err(process_decode_error)?;
2716            tx.commit().await.map_err(plugin_sqlx_error)?;
2717            return Ok(record);
2718        }
2719        let record = HostEventOccurrenceRecord {
2720            occurrence_id: occurrence_id.clone(),
2721            source_type: request.source_type,
2722            source_key: request.source_key,
2723            payload: request.payload,
2724            idempotency_key: request.idempotency_key.clone(),
2725            source: request.source,
2726            occurred_at_ms: current_epoch_ms(),
2727        };
2728        sqlx::query(
2729            "INSERT INTO lash_host_event_occurrences (
2730                occurrence_id, idempotency_key, request_hash, source_type, source_key,
2731                occurred_at_ms, record_json
2732             )
2733             VALUES ($1, $2, $3, $4, $5, $6, $7)",
2734        )
2735        .bind(&record.occurrence_id)
2736        .bind(&record.idempotency_key)
2737        .bind(&request_hash)
2738        .bind(&record.source_type)
2739        .bind(&record.source_key)
2740        .bind(record.occurred_at_ms as i64)
2741        .bind(serde_json::to_string(&record).map_err(process_decode_error)?)
2742        .execute(&mut *tx)
2743        .await
2744        .map_err(plugin_sqlx_error)?;
2745        tx.commit().await.map_err(plugin_sqlx_error)?;
2746        Ok(record)
2747    }
2748
2749    async fn reserve_matching_deliveries(
2750        &self,
2751        occurrence_id: &str,
2752    ) -> Result<Vec<TriggerDeliveryReservation>, PluginError> {
2753        let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2754        let occurrence_json: Option<String> = sqlx::query_scalar(
2755            "SELECT record_json FROM lash_host_event_occurrences WHERE occurrence_id = $1",
2756        )
2757        .bind(occurrence_id)
2758        .fetch_optional(&mut *tx)
2759        .await
2760        .map_err(plugin_sqlx_error)?;
2761        let Some(occurrence_json) = occurrence_json else {
2762            return Err(PluginError::Session(format!(
2763                "unknown host event occurrence `{occurrence_id}`"
2764            )));
2765        };
2766        let occurrence: HostEventOccurrenceRecord =
2767            serde_json::from_str(&occurrence_json).map_err(process_decode_error)?;
2768        let rows = sqlx::query(
2769            "SELECT record_json FROM lash_host_event_trigger_subscriptions
2770             WHERE enabled = TRUE AND source_type = $1 AND source_key = $2
2771             ORDER BY session_id ASC, handle ASC",
2772        )
2773        .bind(&occurrence.source_type)
2774        .bind(&occurrence.source_key)
2775        .fetch_all(&mut *tx)
2776        .await
2777        .map_err(plugin_sqlx_error)?;
2778        let mut deliveries = Vec::new();
2779        for row in rows {
2780            let json: String = row.get(0);
2781            let subscription: TriggerSubscriptionRecord =
2782                serde_json::from_str(&json).map_err(process_decode_error)?;
2783            let process_id = lash_core::deterministic_delivery_process_id(
2784                &occurrence.occurrence_id,
2785                &subscription.subscription_id,
2786            )?;
2787            let inserted = sqlx::query(
2788                "INSERT INTO lash_host_event_deliveries (
2789                    occurrence_id, subscription_id, process_id, created_at_ms
2790                 )
2791                 VALUES ($1, $2, $3, $4)
2792                 ON CONFLICT DO NOTHING",
2793            )
2794            .bind(&occurrence.occurrence_id)
2795            .bind(&subscription.subscription_id)
2796            .bind(&process_id)
2797            .bind(current_epoch_ms() as i64)
2798            .execute(&mut *tx)
2799            .await
2800            .map_err(plugin_sqlx_error)?
2801            .rows_affected();
2802            if inserted == 0 {
2803                continue;
2804            }
2805            deliveries.push(TriggerDeliveryReservation {
2806                occurrence: occurrence.clone(),
2807                subscription,
2808                process_id,
2809            });
2810        }
2811        tx.commit().await.map_err(plugin_sqlx_error)?;
2812        Ok(deliveries)
2813    }
2814}
2815
2816#[async_trait::async_trait]
2817impl lashlang::LashlangArtifactStore for PostgresLashlangArtifactStore {
2818    fn durability_tier(&self) -> DurabilityTier {
2819        DurabilityTier::Durable
2820    }
2821
2822    async fn put_module_artifact(
2823        &self,
2824        artifact: &lashlang::ModuleArtifact,
2825    ) -> Result<(), lashlang::ArtifactStoreError> {
2826        let bytes = artifact
2827            .to_store_bytes()
2828            .map_err(lashlang::ArtifactStoreError::from)?;
2829        sqlx::query(
2830            "INSERT INTO lash_lashlang_artifacts (module_ref, artifact_bytes)
2831             VALUES ($1, $2)
2832             ON CONFLICT (module_ref) DO UPDATE SET artifact_bytes = EXCLUDED.artifact_bytes",
2833        )
2834        .bind(artifact.module_ref.as_str())
2835        .bind(bytes)
2836        .execute(&self.pool)
2837        .await
2838        .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?;
2839        Ok(())
2840    }
2841
2842    async fn get_module_artifact(
2843        &self,
2844        module_ref: &lashlang::ModuleRef,
2845    ) -> Result<Option<Arc<lashlang::ModuleArtifact>>, lashlang::ArtifactStoreError> {
2846        let bytes: Option<Vec<u8>> = sqlx::query_scalar(
2847            "SELECT artifact_bytes FROM lash_lashlang_artifacts WHERE module_ref = $1",
2848        )
2849        .bind(module_ref.as_str())
2850        .fetch_optional(&self.pool)
2851        .await
2852        .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?;
2853        bytes
2854            .map(|bytes| {
2855                lashlang::ModuleArtifact::from_store_bytes(&bytes)
2856                    .map(Arc::new)
2857                    .map_err(lashlang::ArtifactStoreError::from)
2858            })
2859            .transpose()
2860    }
2861}