1use std::collections::HashSet;
8use std::sync::{Arc, OnceLock};
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11use lash_core::runtime::{
12 ProcessHandleGrantEntry, QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim,
13 QueuedWorkClaimBoundary, QueuedWorkCompletion, QueuedWorkItem,
14};
15use lash_core::store::queued_work::{
16 ClaimCandidate, QueuedWorkClaimLease, claim_scan_limit, derive_batch_id, renewed_claim,
17 select_claim_prefix,
18};
19use lash_core::store::{
20 GraphCommitDelta, HydratedSessionCheckpoint, PersistedSessionRead, RuntimeCommit,
21 RuntimeCommitResult, SessionCheckpoint, SessionHeadMeta,
22};
23use lash_core::{
24 AttachmentId, AttachmentIntent, AttachmentManifest, AttachmentManifestEntry, BlobRef,
25 DeliveryPolicy, DurabilityTier, GcReport, MergeKey, ProcessAwaitOutput, ProcessEvent,
26 ProcessEventAppendRequest, ProcessEventAppendResult, ProcessExternalRef,
27 ProcessHandleDescriptor, ProcessHandleGrant, ProcessLease, ProcessLeaseCompletion,
28 ProcessRecord, ProcessRegistration, ProcessRegistry, ProcessScope, RuntimePersistence,
29 SessionMeta, SessionNodeRecord, SessionReadScope, SessionStoreCreateRequest,
30 SessionStoreFactory, SlotPolicy, StoreError, TokenLedgerEntry, VacuumReport,
31};
32use lash_core::{
33 HostEventOccurrenceRecord, HostEventOccurrenceRequest, HostEventStore, PluginError,
34 TriggerDeliveryReservation, TriggerSubscriptionDraft, TriggerSubscriptionFilter,
35 TriggerSubscriptionRecord,
36};
37use sha2::{Digest, Sha256};
38use sqlx::postgres::{PgPool, PgPoolOptions, PgRow};
39use sqlx::{Executor, Row};
40
41const SCHEMA_COMPONENT: &str = "lash-postgres-store";
42const SCHEMA_VERSION: i32 = 1;
43const PROCESS_LEASE_SCHEMA_VERSION: u32 = lash_core::PROCESS_LEASE_SCHEMA_VERSION;
44
45#[derive(Clone)]
46pub struct PostgresStorage {
47 pool: PgPool,
48}
49
50#[derive(Clone)]
51pub struct PostgresSessionStoreFactory {
52 pool: PgPool,
53}
54
55#[derive(Clone)]
56pub struct PostgresSessionStore {
57 pool: PgPool,
58 session_id: Option<String>,
60 bound_session: Arc<OnceLock<String>>,
66}
67
68#[derive(Clone)]
69pub struct PostgresProcessRegistry {
70 pool: PgPool,
71 notify: Arc<tokio::sync::Notify>,
72}
73
74#[derive(Clone)]
75pub struct PostgresHostEventStore {
76 pool: PgPool,
77}
78
79#[derive(Clone)]
80pub struct PostgresLashlangArtifactStore {
81 pool: PgPool,
82}
83
84#[derive(Clone, Debug)]
93pub struct PostgresStoreConfig {
94 pub max_connections: u32,
96 pub min_connections: u32,
98 pub acquire_timeout: Duration,
100 pub idle_timeout: Option<Duration>,
102 pub max_lifetime: Option<Duration>,
104 pub lock_timeout: Option<Duration>,
106 pub statement_timeout: Option<Duration>,
109}
110
111impl Default for PostgresStoreConfig {
112 fn default() -> Self {
113 Self {
114 max_connections: 16,
115 min_connections: 0,
116 acquire_timeout: Duration::from_secs(30),
117 idle_timeout: Some(Duration::from_secs(600)),
118 max_lifetime: Some(Duration::from_secs(1800)),
119 lock_timeout: Some(Duration::from_secs(10)),
120 statement_timeout: Some(Duration::from_secs(30)),
121 }
122 }
123}
124
125impl PostgresStorage {
126 pub async fn connect(database_url: &str) -> Result<Self, StoreError> {
128 Self::connect_with(database_url, PostgresStoreConfig::default()).await
129 }
130
131 pub async fn connect_with(
133 database_url: &str,
134 config: PostgresStoreConfig,
135 ) -> Result<Self, StoreError> {
136 let lock_ms = config.lock_timeout.map(|d| d.as_millis().max(1) as u64);
137 let statement_ms = config
138 .statement_timeout
139 .map(|d| d.as_millis().max(1) as u64);
140 let mut options = PgPoolOptions::new()
141 .max_connections(config.max_connections)
142 .min_connections(config.min_connections)
143 .acquire_timeout(config.acquire_timeout);
144 if let Some(timeout) = config.idle_timeout {
145 options = options.idle_timeout(timeout);
146 }
147 if let Some(timeout) = config.max_lifetime {
148 options = options.max_lifetime(timeout);
149 }
150 let pool = options
151 .after_connect(move |conn, _meta| {
152 Box::pin(async move {
153 if let Some(ms) = lock_ms {
154 conn.execute(format!("SET lock_timeout = {ms}").as_str())
155 .await?;
156 }
157 if let Some(ms) = statement_ms {
158 conn.execute(format!("SET statement_timeout = {ms}").as_str())
159 .await?;
160 }
161 Ok(())
162 })
163 })
164 .connect(database_url)
165 .await
166 .map_err(store_sqlx_error)?;
167 ensure_schema(&pool).await?;
168 Ok(Self { pool })
169 }
170
171 pub fn from_pool(pool: PgPool) -> Self {
172 Self { pool }
173 }
174
175 pub fn pool(&self) -> &PgPool {
176 &self.pool
177 }
178
179 pub fn session_store_factory(&self) -> PostgresSessionStoreFactory {
180 PostgresSessionStoreFactory {
181 pool: self.pool.clone(),
182 }
183 }
184
185 pub fn session_store(&self, session_id: impl Into<String>) -> PostgresSessionStore {
186 PostgresSessionStore {
187 pool: self.pool.clone(),
188 session_id: Some(session_id.into()),
189 bound_session: Arc::new(OnceLock::new()),
190 }
191 }
192
193 pub fn unbound_session_store(&self) -> PostgresSessionStore {
194 PostgresSessionStore {
195 pool: self.pool.clone(),
196 session_id: None,
197 bound_session: Arc::new(OnceLock::new()),
198 }
199 }
200
201 pub fn process_registry(&self) -> PostgresProcessRegistry {
202 PostgresProcessRegistry {
203 pool: self.pool.clone(),
204 notify: Arc::new(tokio::sync::Notify::new()),
205 }
206 }
207
208 pub fn host_event_store(&self) -> PostgresHostEventStore {
209 PostgresHostEventStore {
210 pool: self.pool.clone(),
211 }
212 }
213
214 pub fn lashlang_artifact_store(&self) -> PostgresLashlangArtifactStore {
215 PostgresLashlangArtifactStore {
216 pool: self.pool.clone(),
217 }
218 }
219}
220
221impl PostgresSessionStoreFactory {
222 pub fn new(storage: &PostgresStorage) -> Self {
223 storage.session_store_factory()
224 }
225}
226
227impl PostgresSessionStore {
228 pub fn unbound(storage: &PostgresStorage) -> Self {
229 storage.unbound_session_store()
230 }
231
232 async fn selected_session_id(&self) -> Result<Option<String>, StoreError> {
233 if let Some(session_id) = &self.session_id {
234 return Ok(Some(session_id.clone()));
235 }
236 sqlx::query_scalar("SELECT session_id FROM lash_sessions ORDER BY session_id ASC LIMIT 1")
237 .fetch_optional(&self.pool)
238 .await
239 .map_err(store_sqlx_error)
240 }
241}
242
243async fn ensure_schema(pool: &PgPool) -> Result<(), StoreError> {
244 let mut tx = pool.begin().await.map_err(store_sqlx_error)?;
245 tx.execute("SELECT pg_advisory_xact_lock(715421, 907001)")
246 .await
247 .map_err(store_sqlx_error)?;
248 tx.execute(
249 r#"
250 CREATE TABLE IF NOT EXISTS lash_schema_versions (
251 component TEXT PRIMARY KEY,
252 version INTEGER NOT NULL
253 );
254
255 CREATE TABLE IF NOT EXISTS lash_blobs (
256 hash TEXT PRIMARY KEY,
257 content BYTEA NOT NULL
258 );
259
260 CREATE TABLE IF NOT EXISTS lash_sessions (
261 session_id TEXT PRIMARY KEY,
262 head_revision BIGINT NOT NULL DEFAULT 0,
263 head_json TEXT NOT NULL,
264 checkpoint_ref TEXT
265 );
266
267 CREATE TABLE IF NOT EXISTS lash_graph_nodes (
268 session_id TEXT NOT NULL,
269 seq BIGSERIAL,
270 node_id TEXT NOT NULL,
271 node_json TEXT NOT NULL,
272 tombstoned BOOLEAN NOT NULL DEFAULT FALSE,
273 PRIMARY KEY (session_id, node_id)
274 );
275 CREATE INDEX IF NOT EXISTS idx_lash_graph_nodes_seq
276 ON lash_graph_nodes(session_id, seq);
277
278 CREATE TABLE IF NOT EXISTS lash_usage_deltas (
279 seq BIGSERIAL PRIMARY KEY,
280 session_id TEXT NOT NULL,
281 entry_json TEXT NOT NULL
282 );
283
284 CREATE TABLE IF NOT EXISTS lash_session_meta (
285 session_id TEXT PRIMARY KEY,
286 meta_json TEXT NOT NULL
287 );
288
289 CREATE TABLE IF NOT EXISTS lash_runtime_turn_commits (
290 session_id TEXT NOT NULL,
291 turn_id TEXT NOT NULL,
292 turn_commit_hash TEXT NOT NULL,
293 result_json TEXT NOT NULL,
294 committed_at_ms BIGINT NOT NULL,
295 PRIMARY KEY (session_id, turn_id)
296 );
297
298 CREATE TABLE IF NOT EXISTS lash_queued_work_batches (
299 enqueue_seq BIGSERIAL PRIMARY KEY,
300 batch_id TEXT NOT NULL UNIQUE,
301 session_id TEXT NOT NULL,
302 source_key TEXT,
303 delivery_policy TEXT NOT NULL,
304 slot_policy TEXT NOT NULL,
305 merge_key_json TEXT NOT NULL,
306 available_at_ms BIGINT NOT NULL,
307 enqueued_at_ms BIGINT NOT NULL,
308 claim_id TEXT,
309 claim_owner_id TEXT,
310 claim_token TEXT,
311 claim_fencing_token BIGINT NOT NULL DEFAULT 0,
312 claim_claimed_at_ms BIGINT NOT NULL DEFAULT 0,
313 claim_expires_at_ms BIGINT NOT NULL DEFAULT 0,
314 UNIQUE (session_id, source_key)
315 );
316 CREATE INDEX IF NOT EXISTS idx_lash_queued_work_ready
317 ON lash_queued_work_batches(session_id, available_at_ms, enqueue_seq);
318
319 CREATE TABLE IF NOT EXISTS lash_queued_work_items (
320 batch_id TEXT NOT NULL REFERENCES lash_queued_work_batches(batch_id) ON DELETE CASCADE,
321 item_index INTEGER NOT NULL,
322 item_id TEXT NOT NULL,
323 payload_json TEXT NOT NULL,
324 PRIMARY KEY (batch_id, item_index)
325 );
326
327 CREATE TABLE IF NOT EXISTS lash_attachment_manifest (
328 attachment_id TEXT PRIMARY KEY,
329 session_id TEXT NOT NULL,
330 canonical_uri TEXT NOT NULL,
331 intent_at_ms BIGINT NOT NULL,
332 committed_at_ms BIGINT
333 );
334 CREATE INDEX IF NOT EXISTS idx_lash_attachment_manifest_uncommitted
335 ON lash_attachment_manifest(committed_at_ms)
336 WHERE committed_at_ms IS NULL;
337
338 CREATE TABLE IF NOT EXISTS lash_processes (
339 process_id TEXT PRIMARY KEY,
340 registration_hash TEXT NOT NULL,
341 owner_scope_id TEXT NOT NULL,
342 host_profile_id TEXT NOT NULL,
343 created_at_ms BIGINT NOT NULL,
344 updated_at_ms BIGINT NOT NULL,
345 status TEXT NOT NULL,
346 record_json TEXT NOT NULL
347 );
348 CREATE INDEX IF NOT EXISTS idx_lash_processes_status
349 ON lash_processes(status);
350
351 CREATE TABLE IF NOT EXISTS lash_process_events (
352 process_id TEXT NOT NULL REFERENCES lash_processes(process_id) ON DELETE CASCADE,
353 sequence BIGINT NOT NULL,
354 event_type TEXT NOT NULL,
355 payload_hash TEXT NOT NULL,
356 idempotency_key TEXT,
357 occurred_at_ms BIGINT NOT NULL,
358 event_json TEXT NOT NULL,
359 PRIMARY KEY (process_id, sequence)
360 );
361 CREATE UNIQUE INDEX IF NOT EXISTS idx_lash_process_events_key
362 ON lash_process_events(process_id, idempotency_key)
363 WHERE idempotency_key IS NOT NULL;
364
365 CREATE TABLE IF NOT EXISTS lash_process_wake_acks (
366 process_id TEXT NOT NULL REFERENCES lash_processes(process_id) ON DELETE CASCADE,
367 sequence BIGINT NOT NULL,
368 PRIMARY KEY (process_id, sequence)
369 );
370
371 CREATE TABLE IF NOT EXISTS lash_process_handle_grants (
372 session_id TEXT NOT NULL,
373 scope_id TEXT NOT NULL,
374 process_id TEXT NOT NULL REFERENCES lash_processes(process_id) ON DELETE CASCADE,
375 descriptor_json TEXT NOT NULL,
376 PRIMARY KEY (scope_id, process_id)
377 );
378 CREATE INDEX IF NOT EXISTS idx_lash_process_handle_grants_session
379 ON lash_process_handle_grants(session_id);
380 CREATE INDEX IF NOT EXISTS idx_lash_process_handle_grants_process
381 ON lash_process_handle_grants(process_id);
382
383 CREATE TABLE IF NOT EXISTS lash_process_leases (
384 process_id TEXT PRIMARY KEY REFERENCES lash_processes(process_id) ON DELETE CASCADE,
385 lease_owner_id TEXT,
386 lease_token TEXT,
387 lease_fencing_token BIGINT NOT NULL DEFAULT 0,
388 lease_claimed_at_ms BIGINT NOT NULL DEFAULT 0,
389 lease_expires_at_ms BIGINT NOT NULL DEFAULT 0
390 );
391
392 CREATE SEQUENCE IF NOT EXISTS lash_host_event_subscription_seq;
393 CREATE TABLE IF NOT EXISTS lash_host_event_trigger_subscriptions (
394 subscription_id TEXT PRIMARY KEY,
395 session_id TEXT NOT NULL,
396 handle TEXT NOT NULL,
397 source_type TEXT NOT NULL,
398 source_key TEXT NOT NULL,
399 enabled BOOLEAN NOT NULL,
400 created_at_ms BIGINT NOT NULL,
401 updated_at_ms BIGINT NOT NULL,
402 record_json TEXT NOT NULL,
403 UNIQUE(session_id, handle)
404 );
405 CREATE INDEX IF NOT EXISTS idx_lash_host_event_subscriptions_source
406 ON lash_host_event_trigger_subscriptions(source_type, source_key, enabled);
407
408 CREATE TABLE IF NOT EXISTS lash_host_event_occurrences (
409 occurrence_id TEXT PRIMARY KEY,
410 idempotency_key TEXT NOT NULL UNIQUE,
411 request_hash TEXT NOT NULL,
412 source_type TEXT NOT NULL,
413 source_key TEXT NOT NULL,
414 occurred_at_ms BIGINT NOT NULL,
415 record_json TEXT NOT NULL
416 );
417
418 CREATE TABLE IF NOT EXISTS lash_host_event_deliveries (
419 occurrence_id TEXT NOT NULL REFERENCES lash_host_event_occurrences(occurrence_id) ON DELETE CASCADE,
420 subscription_id TEXT NOT NULL REFERENCES lash_host_event_trigger_subscriptions(subscription_id) ON DELETE CASCADE,
421 process_id TEXT NOT NULL,
422 created_at_ms BIGINT NOT NULL,
423 PRIMARY KEY (occurrence_id, subscription_id)
424 );
425
426 CREATE TABLE IF NOT EXISTS lash_lashlang_artifacts (
427 module_ref TEXT PRIMARY KEY,
428 artifact_bytes BYTEA NOT NULL
429 );
430 "#,
431 )
432 .await
433 .map_err(store_sqlx_error)?;
434
435 let existing: Option<i32> =
436 sqlx::query_scalar("SELECT version FROM lash_schema_versions WHERE component = $1")
437 .bind(SCHEMA_COMPONENT)
438 .fetch_optional(&mut *tx)
439 .await
440 .map_err(store_sqlx_error)?;
441 match existing {
442 Some(version) if version == SCHEMA_VERSION => {}
443 Some(version) => {
444 return Err(StoreError::Backend(format!(
445 "Postgres schema component `{SCHEMA_COMPONENT}` has version {version}, expected {SCHEMA_VERSION}"
446 )));
447 }
448 None => {
449 sqlx::query("INSERT INTO lash_schema_versions (component, version) VALUES ($1, $2)")
450 .bind(SCHEMA_COMPONENT)
451 .bind(SCHEMA_VERSION)
452 .execute(&mut *tx)
453 .await
454 .map_err(store_sqlx_error)?;
455 }
456 }
457 tx.commit().await.map_err(store_sqlx_error)
458}
459
460fn current_epoch_ms() -> u64 {
461 SystemTime::now()
462 .duration_since(UNIX_EPOCH)
463 .unwrap_or_default()
464 .as_millis() as u64
465}
466
467fn current_timestamp_string() -> String {
468 let now = SystemTime::now()
469 .duration_since(UNIX_EPOCH)
470 .unwrap_or_default();
471 format!("unix:{}", now.as_secs())
472}
473
474fn store_sqlx_error(err: sqlx::Error) -> StoreError {
475 StoreError::Backend(err.to_string())
476}
477
478fn is_contention_error(err: &sqlx::Error) -> bool {
483 matches!(
484 err.as_database_error().and_then(|db| db.code()).as_deref(),
485 Some("40001" | "40P01" | "55P03")
486 )
487}
488
489fn plugin_sqlx_error(err: sqlx::Error) -> PluginError {
490 PluginError::Session(err.to_string())
491}
492
493fn process_decode_error(err: serde_json::Error) -> PluginError {
494 PluginError::Session(format!("failed to decode process registry row: {err}"))
495}
496
497fn store_decode_json<T: serde::de::DeserializeOwned>(
498 json: &str,
499 what: &str,
500) -> Result<T, StoreError> {
501 serde_json::from_str(json)
502 .map_err(|err| StoreError::Backend(format!("failed to decode {what}: {err}")))
503}
504
505fn encode_json<T: serde::Serialize>(value: &T) -> String {
506 serde_json::to_string(value).expect("persisted state should serialize")
507}
508
509fn encode_msgpack<T: serde::Serialize>(value: &T) -> Vec<u8> {
510 let mut buf = Vec::with_capacity(1024);
511 rmp_serde::encode::write_named(&mut buf, value).expect("value should serialize");
512 buf
513}
514
515fn decode_msgpack<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Option<T> {
516 rmp_serde::from_slice(bytes).ok()
517}
518
519fn block_on_detached<T: Send + 'static>(
520 future: impl std::future::Future<Output = T> + Send + 'static,
521) -> T {
522 std::thread::spawn(move || {
523 tokio::runtime::Builder::new_current_thread()
524 .enable_all()
525 .build()
526 .expect("postgres manifest runtime")
527 .block_on(future)
528 })
529 .join()
530 .expect("postgres manifest thread")
531}
532
533fn merge_token_ledger_entries(entries: Vec<TokenLedgerEntry>) -> Vec<TokenLedgerEntry> {
534 let mut merged = Vec::<TokenLedgerEntry>::new();
535 for entry in entries {
536 if entry.usage.total() == 0 {
537 continue;
538 }
539 if let Some(existing) = merged
540 .iter_mut()
541 .find(|existing| existing.source == entry.source && existing.model == entry.model)
542 {
543 existing.usage.input_tokens += entry.usage.input_tokens;
544 existing.usage.output_tokens += entry.usage.output_tokens;
545 existing.usage.cached_input_tokens += entry.usage.cached_input_tokens;
546 existing.usage.reasoning_tokens += entry.usage.reasoning_tokens;
547 } else {
548 merged.push(entry);
549 }
550 }
551 merged
552}
553
554#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
555struct SessionCheckpointEnvelope {
556 manifest: SessionCheckpoint,
557 tool_state: Option<lash_core::ToolState>,
558 plugin_snapshot: Option<lash_core::PluginSessionSnapshot>,
559 execution_state: Option<Vec<u8>>,
560}
561
562async fn put_blob_tx(
563 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
564 content: &[u8],
565) -> Result<BlobRef, StoreError> {
566 let hash = format!("{:x}", Sha256::digest(content));
567 sqlx::query(
568 "INSERT INTO lash_blobs (hash, content)
569 VALUES ($1, $2)
570 ON CONFLICT (hash) DO NOTHING",
571 )
572 .bind(&hash)
573 .bind(content)
574 .execute(&mut **tx)
575 .await
576 .map_err(store_sqlx_error)?;
577 Ok(BlobRef(hash))
578}
579
580async fn get_blob(pool: &PgPool, blob_ref: &BlobRef) -> Option<Vec<u8>> {
581 sqlx::query_scalar::<_, Vec<u8>>("SELECT content FROM lash_blobs WHERE hash = $1")
582 .bind(blob_ref.as_str())
583 .fetch_optional(pool)
584 .await
585 .ok()
586 .flatten()
587}
588
589async fn put_checkpoint_tx(
590 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
591 checkpoint: &HydratedSessionCheckpoint,
592) -> Result<(BlobRef, SessionCheckpoint), StoreError> {
593 let manifest = SessionCheckpoint {
594 turn_state: checkpoint.turn_state.clone(),
595 tool_state_ref: checkpoint.tool_state_ref.clone(),
596 plugin_snapshot_ref: checkpoint.plugin_snapshot_ref.clone(),
597 plugin_snapshot_revision: checkpoint.plugin_snapshot_revision,
598 execution_state_ref: checkpoint.execution_state_ref.clone(),
599 };
600 let envelope = SessionCheckpointEnvelope {
601 manifest: manifest.clone(),
602 tool_state: checkpoint.tool_state.clone(),
603 plugin_snapshot: checkpoint.plugin_snapshot.clone(),
604 execution_state: checkpoint.execution_state.clone(),
605 };
606 let bytes = encode_msgpack(&envelope);
607 let checkpoint_ref = put_blob_tx(tx, &bytes).await?;
608 Ok((checkpoint_ref, manifest))
609}
610
611async fn get_checkpoint(pool: &PgPool, blob_ref: &BlobRef) -> Option<HydratedSessionCheckpoint> {
612 let bytes = get_blob(pool, blob_ref).await?;
613 let envelope: SessionCheckpointEnvelope = decode_msgpack(&bytes)?;
614 Some(HydratedSessionCheckpoint {
615 turn_state: envelope.manifest.turn_state,
616 tool_state_ref: envelope.manifest.tool_state_ref,
617 tool_state: envelope.tool_state,
618 plugin_snapshot_ref: envelope.manifest.plugin_snapshot_ref,
619 plugin_snapshot: envelope.plugin_snapshot,
620 plugin_snapshot_revision: envelope.manifest.plugin_snapshot_revision,
621 execution_state_ref: envelope.manifest.execution_state_ref,
622 execution_state: envelope.execution_state,
623 })
624}
625
626async fn load_session_head_meta_tx(
627 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
628 session_id: &str,
629 for_update: bool,
630) -> Result<Option<SessionHeadMeta>, StoreError> {
631 let sql = if for_update {
632 "SELECT head_json, head_revision FROM lash_sessions WHERE session_id = $1 FOR UPDATE"
633 } else {
634 "SELECT head_json, head_revision FROM lash_sessions WHERE session_id = $1"
635 };
636 let row = sqlx::query(sql)
637 .bind(session_id)
638 .fetch_optional(&mut **tx)
639 .await
640 .map_err(store_sqlx_error)?;
641 let Some(row) = row else {
642 return Ok(None);
643 };
644 let head_json: String = row.get(0);
645 let head_revision: i64 = row.get(1);
646 let mut meta: SessionHeadMeta = store_decode_json(&head_json, "session head")?;
647 meta.head_revision = head_revision as u64;
648 Ok(Some(meta))
649}
650
651async fn load_usage_deltas(pool: &PgPool, session_id: &str) -> Vec<TokenLedgerEntry> {
652 let rows = sqlx::query(
653 "SELECT entry_json FROM lash_usage_deltas WHERE session_id = $1 ORDER BY seq ASC",
654 )
655 .bind(session_id)
656 .fetch_all(pool)
657 .await
658 .unwrap_or_default();
659 rows.into_iter()
660 .filter_map(|row| {
661 let json: String = row.get(0);
662 serde_json::from_str(&json).ok()
663 })
664 .collect()
665}
666
667async fn load_graph(
668 pool: &PgPool,
669 session_id: &str,
670 leaf_node_id: Option<String>,
671 active_path: bool,
672) -> Result<lash_core::SessionGraph, StoreError> {
673 let rows = sqlx::query(
674 "SELECT node_json FROM lash_graph_nodes
675 WHERE session_id = $1 AND tombstoned = FALSE
676 ORDER BY seq ASC",
677 )
678 .bind(session_id)
679 .fetch_all(pool)
680 .await
681 .map_err(store_sqlx_error)?;
682 let mut nodes = Vec::<SessionNodeRecord>::new();
683 for row in rows {
684 let json: String = row.get(0);
685 nodes.push(store_decode_json(&json, "session graph node")?);
686 }
687 if active_path && let Some(leaf) = leaf_node_id.clone() {
688 let wanted = active_path_node_ids(&nodes, &leaf);
689 nodes.retain(|node| wanted.contains(&node.node_id));
690 }
691 Ok(lash_core::SessionGraph::from_nodes(nodes, leaf_node_id))
692}
693
694fn active_path_node_ids(nodes: &[SessionNodeRecord], leaf_node_id: &str) -> HashSet<String> {
695 let mut parent_by_id = std::collections::BTreeMap::new();
696 for node in nodes {
697 parent_by_id.insert(node.node_id.clone(), node.parent_node_id.clone());
698 }
699 let mut wanted = HashSet::new();
700 let mut cursor = Some(leaf_node_id.to_string());
701 while let Some(node_id) = cursor {
702 if !wanted.insert(node_id.clone()) {
703 break;
704 }
705 cursor = parent_by_id.get(&node_id).cloned().flatten();
706 }
707 wanted
708}
709
710async fn commit_attachment_refs_tx(
711 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
712 session_id: &str,
713 attachment_ids: &[AttachmentId],
714) -> Result<(), StoreError> {
715 if attachment_ids.is_empty() {
716 return Ok(());
717 }
718 let now = current_epoch_ms() as i64;
719 for id in attachment_ids {
720 sqlx::query(
721 "UPDATE lash_attachment_manifest
722 SET committed_at_ms = COALESCE(committed_at_ms, $1)
723 WHERE attachment_id = $2 AND session_id = $3",
724 )
725 .bind(now)
726 .bind(id.as_str())
727 .bind(session_id)
728 .execute(&mut **tx)
729 .await
730 .map_err(store_sqlx_error)?;
731 }
732 Ok(())
733}
734
735impl AttachmentManifest for PostgresSessionStore {
736 fn record_intent(&self, intent: AttachmentIntent) -> Result<(), StoreError> {
737 let pool = self.pool.clone();
738 block_on_detached(async move {
739 sqlx::query(
740 "INSERT INTO lash_attachment_manifest (
741 attachment_id, session_id, canonical_uri, intent_at_ms, committed_at_ms
742 )
743 VALUES ($1, $2, $3, $4, NULL)
744 ON CONFLICT (attachment_id) DO UPDATE SET
745 session_id = EXCLUDED.session_id,
746 canonical_uri = EXCLUDED.canonical_uri,
747 intent_at_ms = EXCLUDED.intent_at_ms",
748 )
749 .bind(intent.attachment_id.as_str())
750 .bind(intent.session_id)
751 .bind(intent.canonical_uri)
752 .bind(intent.intent_at_epoch_ms as i64)
753 .execute(&pool)
754 .await
755 .map(|_| ())
756 .map_err(store_sqlx_error)
757 })
758 }
759
760 fn commit_refs(
761 &self,
762 session_id: &str,
763 attachment_ids: &[AttachmentId],
764 ) -> Result<(), StoreError> {
765 let pool = self.pool.clone();
766 let session_id = session_id.to_string();
767 let attachment_ids = attachment_ids.to_vec();
768 block_on_detached(async move {
769 let mut tx = pool.begin().await.map_err(store_sqlx_error)?;
770 commit_attachment_refs_tx(&mut tx, &session_id, &attachment_ids).await?;
771 tx.commit().await.map_err(store_sqlx_error)
772 })
773 }
774
775 fn list_uncommitted(
776 &self,
777 older_than_epoch_ms: u64,
778 ) -> Result<Vec<AttachmentManifestEntry>, StoreError> {
779 let pool = self.pool.clone();
780 block_on_detached(async move {
781 let rows = sqlx::query(
782 "SELECT attachment_id, session_id, canonical_uri, intent_at_ms, committed_at_ms
783 FROM lash_attachment_manifest
784 WHERE committed_at_ms IS NULL AND intent_at_ms <= $1
785 ORDER BY attachment_id ASC",
786 )
787 .bind(older_than_epoch_ms as i64)
788 .fetch_all(&pool)
789 .await
790 .map_err(store_sqlx_error)?;
791 Ok(rows
792 .into_iter()
793 .map(|row| AttachmentManifestEntry {
794 attachment_id: AttachmentId::new(row.get::<String, _>(0)),
795 session_id: row.get(1),
796 canonical_uri: row.get(2),
797 intent_at_epoch_ms: row.get::<i64, _>(3) as u64,
798 committed_at_epoch_ms: row.get::<Option<i64>, _>(4).map(|value| value as u64),
799 })
800 .collect())
801 })
802 }
803
804 fn forget(&self, attachment_id: &AttachmentId) -> Result<(), StoreError> {
805 let pool = self.pool.clone();
806 let attachment_id = attachment_id.to_string();
807 block_on_detached(async move {
808 sqlx::query("DELETE FROM lash_attachment_manifest WHERE attachment_id = $1")
809 .bind(attachment_id)
810 .execute(&pool)
811 .await
812 .map(|_| ())
813 .map_err(store_sqlx_error)
814 })
815 }
816}
817
818#[async_trait::async_trait]
819impl SessionStoreFactory for PostgresSessionStoreFactory {
820 fn durability_tier(&self) -> DurabilityTier {
821 DurabilityTier::Durable
822 }
823
824 async fn create_store(
825 &self,
826 request: &SessionStoreCreateRequest,
827 ) -> Result<Arc<dyn RuntimePersistence>, String> {
828 let store = PostgresSessionStore {
829 pool: self.pool.clone(),
830 session_id: Some(request.session_id.clone()),
831 bound_session: Arc::new(OnceLock::new()),
832 };
833 if store
834 .load_session_meta()
835 .await
836 .map_err(|err| err.to_string())?
837 .is_none()
838 {
839 store
840 .save_session_meta(SessionMeta {
841 session_id: request.session_id.clone(),
842 session_name: request.session_id.clone(),
843 created_at: current_timestamp_string(),
844 model: request.policy.model.id.clone(),
845 cwd: std::env::current_dir()
846 .ok()
847 .and_then(|path| path.to_str().map(str::to_string)),
848 relation: request.relation.clone(),
849 })
850 .await
851 .map_err(|err| err.to_string())?;
852 }
853 Ok(Arc::new(store))
854 }
855
856 async fn delete_session(&self, session_id: &str) -> Result<(), String> {
857 let mut tx = self.pool.begin().await.map_err(|err| err.to_string())?;
858 for sql in [
859 "DELETE FROM lash_queued_work_items WHERE batch_id IN (SELECT batch_id FROM lash_queued_work_batches WHERE session_id = $1)",
860 "DELETE FROM lash_queued_work_batches WHERE session_id = $1",
861 "DELETE FROM lash_usage_deltas WHERE session_id = $1",
862 "DELETE FROM lash_graph_nodes WHERE session_id = $1",
863 "DELETE FROM lash_runtime_turn_commits WHERE session_id = $1",
864 "DELETE FROM lash_session_meta WHERE session_id = $1",
865 "DELETE FROM lash_sessions WHERE session_id = $1",
866 "DELETE FROM lash_attachment_manifest WHERE session_id = $1",
867 ] {
868 sqlx::query(sql)
869 .bind(session_id)
870 .execute(&mut *tx)
871 .await
872 .map_err(|err| err.to_string())?;
873 }
874 tx.commit().await.map_err(|err| err.to_string())
875 }
876}
877
878#[derive(Clone, Debug)]
879struct QueuedBatchRow {
880 enqueue_seq: u64,
881 batch_id: String,
882 session_id: String,
883 source_key: Option<String>,
884 delivery_policy: DeliveryPolicy,
885 slot_policy: SlotPolicy,
886 merge_key: MergeKey,
887 available_at_ms: u64,
888 enqueued_at_ms: u64,
889 claim_fencing_token: u64,
890}
891
892fn queued_batch_row(row: PgRow) -> Result<QueuedBatchRow, StoreError> {
893 let delivery_policy =
894 DeliveryPolicy::from_wire_str(row.get::<String, _>("delivery_policy").as_str())
895 .ok_or_else(|| {
896 StoreError::Backend("invalid queued work delivery policy".to_string())
897 })?;
898 let slot_policy = SlotPolicy::from_wire_str(row.get::<String, _>("slot_policy").as_str())
899 .ok_or_else(|| StoreError::Backend("invalid queued work slot policy".to_string()))?;
900 let merge_json: String = row.get("merge_key_json");
901 Ok(QueuedBatchRow {
902 enqueue_seq: row.get::<i64, _>("enqueue_seq") as u64,
903 batch_id: row.get("batch_id"),
904 session_id: row.get("session_id"),
905 source_key: row.get("source_key"),
906 delivery_policy,
907 slot_policy,
908 merge_key: store_decode_json(&merge_json, "queued work merge key")?,
909 available_at_ms: row.get::<i64, _>("available_at_ms") as u64,
910 enqueued_at_ms: row.get::<i64, _>("enqueued_at_ms") as u64,
911 claim_fencing_token: row.get::<i64, _>("claim_fencing_token") as u64,
912 })
913}
914
915async fn load_queued_batch(
916 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
917 batch_id: &str,
918) -> Result<Option<QueuedWorkBatch>, StoreError> {
919 let row = sqlx::query(
920 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
921 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
922 claim_fencing_token
923 FROM lash_queued_work_batches
924 WHERE batch_id = $1",
925 )
926 .bind(batch_id)
927 .fetch_optional(&mut **tx)
928 .await
929 .map_err(store_sqlx_error)?;
930 let Some(row) = row else {
931 return Ok(None);
932 };
933 let row = queued_batch_row(row)?;
934 queued_work_batch_from_row(tx, row).await.map(Some)
935}
936
937async fn queued_work_batch_from_row(
938 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
939 row: QueuedBatchRow,
940) -> Result<QueuedWorkBatch, StoreError> {
941 let item_rows = sqlx::query(
942 "SELECT item_id, payload_json
943 FROM lash_queued_work_items
944 WHERE batch_id = $1
945 ORDER BY item_index ASC",
946 )
947 .bind(&row.batch_id)
948 .fetch_all(&mut **tx)
949 .await
950 .map_err(store_sqlx_error)?;
951 let mut items = Vec::new();
952 for item in item_rows {
953 let payload_json: String = item.get(1);
954 items.push(QueuedWorkItem {
955 item_id: item.get(0),
956 payload: store_decode_json(&payload_json, "queued work payload")?,
957 });
958 }
959 Ok(QueuedWorkBatch {
960 batch_id: row.batch_id,
961 session_id: row.session_id,
962 enqueue_seq: row.enqueue_seq,
963 source_key: row.source_key,
964 delivery_policy: row.delivery_policy,
965 slot_policy: row.slot_policy,
966 merge_key: row.merge_key,
967 available_at_ms: row.available_at_ms,
968 enqueued_at_ms: row.enqueued_at_ms,
969 items,
970 })
971}
972
973async fn ensure_queued_work_completion_tx(
974 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
975 completed: &QueuedWorkCompletion,
976) -> Result<(), StoreError> {
977 for batch_id in &completed.batch_ids {
978 let exists: Option<i64> = sqlx::query_scalar(
979 "SELECT 1::BIGINT FROM lash_queued_work_batches
980 WHERE session_id = $1
981 AND batch_id = $2
982 AND claim_id = $3
983 AND claim_token = $4
984 LIMIT 1",
985 )
986 .bind(&completed.session_id)
987 .bind(batch_id)
988 .bind(&completed.claim_id)
989 .bind(&completed.lease_token)
990 .fetch_optional(&mut **tx)
991 .await
992 .map_err(store_sqlx_error)?;
993 if exists.is_none() {
994 return Err(StoreError::QueuedWorkClaimExpired {
995 session_id: completed.session_id.clone(),
996 claim_id: completed.claim_id.clone(),
997 });
998 }
999 }
1000 Ok(())
1001}
1002
1003#[async_trait::async_trait]
1004impl RuntimePersistence for PostgresSessionStore {
1005 fn durability_tier(&self) -> DurabilityTier {
1006 DurabilityTier::Durable
1007 }
1008
1009 async fn load_session(
1010 &self,
1011 scope: SessionReadScope,
1012 ) -> Result<Option<PersistedSessionRead>, StoreError> {
1013 let Some(session_id) = self.selected_session_id().await? else {
1014 return Ok(None);
1015 };
1016 let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1017 let Some(meta) = load_session_head_meta_tx(&mut tx, &session_id, false).await? else {
1018 return Ok(None);
1019 };
1020 tx.commit().await.map_err(store_sqlx_error)?;
1021 let leaf_node_id = match &scope {
1022 SessionReadScope::FullGraph => meta.leaf_node_id.clone(),
1023 SessionReadScope::ActivePath { leaf_node_id } => {
1024 leaf_node_id.clone().or_else(|| meta.leaf_node_id.clone())
1025 }
1026 };
1027 let graph = load_graph(
1028 &self.pool,
1029 &session_id,
1030 leaf_node_id.clone(),
1031 matches!(scope, SessionReadScope::ActivePath { .. }),
1032 )
1033 .await?;
1034 let checkpoint = match meta.checkpoint_ref.as_ref() {
1035 Some(blob_ref) => get_checkpoint(&self.pool, blob_ref).await,
1036 None => None,
1037 };
1038 Ok(Some(PersistedSessionRead {
1039 session_id: meta.session_id,
1040 head_revision: meta.head_revision,
1041 config: meta.config,
1042 agent_frames: meta.agent_frames,
1043 current_agent_frame_id: meta.current_agent_frame_id,
1044 graph,
1045 checkpoint_ref: meta.checkpoint_ref,
1046 checkpoint,
1047 token_ledger: merge_token_ledger_entries(
1048 load_usage_deltas(&self.pool, &session_id).await,
1049 ),
1050 }))
1051 }
1052
1053 async fn load_node(&self, node_id: &str) -> Result<Option<SessionNodeRecord>, StoreError> {
1054 let json: Option<String> = if let Some(session_id) = &self.session_id {
1055 sqlx::query_scalar(
1056 "SELECT node_json FROM lash_graph_nodes
1057 WHERE session_id = $1 AND node_id = $2 AND tombstoned = FALSE",
1058 )
1059 .bind(session_id)
1060 .bind(node_id)
1061 .fetch_optional(&self.pool)
1062 .await
1063 .map_err(store_sqlx_error)?
1064 } else {
1065 sqlx::query_scalar(
1066 "SELECT node_json FROM lash_graph_nodes
1067 WHERE node_id = $1 AND tombstoned = FALSE
1068 ORDER BY session_id ASC
1069 LIMIT 1",
1070 )
1071 .bind(node_id)
1072 .fetch_optional(&self.pool)
1073 .await
1074 .map_err(store_sqlx_error)?
1075 };
1076 json.map(|json| store_decode_json(&json, "session graph node"))
1077 .transpose()
1078 }
1079
1080 async fn commit_runtime_state(
1081 &self,
1082 commit: RuntimeCommit,
1083 ) -> Result<RuntimeCommitResult, StoreError> {
1084 let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1085 let existing = load_session_head_meta_tx(&mut tx, &commit.session_id, false).await?;
1089 if let Some(bound_session_id) = existing.as_ref().map(|meta| meta.session_id.as_str())
1090 && bound_session_id != commit.session_id
1091 {
1092 return Err(StoreError::SessionBindingMismatch {
1093 bound_session_id: bound_session_id.to_string(),
1094 attempted_session_id: commit.session_id,
1095 });
1096 }
1097 let effective_binding = self
1101 .session_id
1102 .clone()
1103 .or_else(|| self.bound_session.get().cloned());
1104 if let Some(bound_session_id) = &effective_binding
1105 && commit.session_id != *bound_session_id
1106 {
1107 return Err(StoreError::SessionBindingMismatch {
1108 bound_session_id: bound_session_id.clone(),
1109 attempted_session_id: commit.session_id,
1110 });
1111 }
1112 if self.session_id.is_none() {
1113 let _ = self.bound_session.set(commit.session_id.clone());
1114 }
1115 if let Some(completed) = &commit.turn_commit {
1116 if completed.session_id != commit.session_id {
1117 return Err(StoreError::RuntimeTurnCommitConflict {
1118 session_id: completed.session_id.clone(),
1119 turn_id: completed.turn_id.clone(),
1120 });
1121 }
1122 let prior = sqlx::query(
1123 "SELECT turn_commit_hash, result_json
1124 FROM lash_runtime_turn_commits
1125 WHERE session_id = $1 AND turn_id = $2",
1126 )
1127 .bind(&completed.session_id)
1128 .bind(&completed.turn_id)
1129 .fetch_optional(&mut *tx)
1130 .await
1131 .map_err(store_sqlx_error)?;
1132 if let Some(row) = prior {
1133 let hash: String = row.get(0);
1134 let result_json: String = row.get(1);
1135 if hash == completed.turn_commit_hash {
1136 return store_decode_json(&result_json, "runtime turn commit result");
1137 }
1138 return Err(StoreError::RuntimeTurnCommitConflict {
1139 session_id: completed.session_id.clone(),
1140 turn_id: completed.turn_id.clone(),
1141 });
1142 }
1143 }
1144 let actual_revision = existing.as_ref().map_or(0, |meta| meta.head_revision);
1145 if commit.expected_head_revision.is_some()
1146 && commit.expected_head_revision != Some(actual_revision)
1147 {
1148 return Err(StoreError::HeadRevisionConflict {
1149 expected: commit.expected_head_revision,
1150 actual: actual_revision,
1151 });
1152 }
1153 for completed in &commit.completed_queue_claims {
1154 if completed.session_id != commit.session_id {
1155 return Err(StoreError::QueuedWorkClaimExpired {
1156 session_id: completed.session_id.clone(),
1157 claim_id: completed.claim_id.clone(),
1158 });
1159 }
1160 ensure_queued_work_completion_tx(&mut tx, completed).await?;
1161 }
1162 let (checkpoint_ref, manifest) = put_checkpoint_tx(&mut tx, &commit.checkpoint).await?;
1163 for entry in &commit.usage_deltas {
1164 sqlx::query("INSERT INTO lash_usage_deltas (session_id, entry_json) VALUES ($1, $2)")
1165 .bind(&commit.session_id)
1166 .bind(encode_json(entry))
1167 .execute(&mut *tx)
1168 .await
1169 .map_err(store_sqlx_error)?;
1170 }
1171 let leaf_node_id = match &commit.graph {
1172 GraphCommitDelta::Unchanged { leaf_node_id } => leaf_node_id.clone(),
1173 GraphCommitDelta::Append {
1174 nodes,
1175 leaf_node_id,
1176 } => {
1177 for node in nodes {
1178 sqlx::query(
1179 "INSERT INTO lash_graph_nodes (session_id, node_id, node_json)
1180 VALUES ($1, $2, $3)
1181 ON CONFLICT (session_id, node_id) DO UPDATE SET
1182 node_json = EXCLUDED.node_json,
1183 tombstoned = FALSE",
1184 )
1185 .bind(&commit.session_id)
1186 .bind(&node.node_id)
1187 .bind(encode_json(node))
1188 .execute(&mut *tx)
1189 .await
1190 .map_err(store_sqlx_error)?;
1191 }
1192 leaf_node_id.clone()
1193 }
1194 GraphCommitDelta::ReplaceFull(graph) => {
1195 sqlx::query("DELETE FROM lash_graph_nodes WHERE session_id = $1")
1196 .bind(&commit.session_id)
1197 .execute(&mut *tx)
1198 .await
1199 .map_err(store_sqlx_error)?;
1200 for node in &graph.nodes {
1201 sqlx::query(
1202 "INSERT INTO lash_graph_nodes (session_id, node_id, node_json)
1203 VALUES ($1, $2, $3)",
1204 )
1205 .bind(&commit.session_id)
1206 .bind(&node.node_id)
1207 .bind(encode_json(node))
1208 .execute(&mut *tx)
1209 .await
1210 .map_err(store_sqlx_error)?;
1211 }
1212 graph.leaf_node_id.clone()
1213 }
1214 };
1215 let graph_node_count: i64 = sqlx::query_scalar(
1216 "SELECT COUNT(*) FROM lash_graph_nodes WHERE session_id = $1 AND tombstoned = FALSE",
1217 )
1218 .bind(&commit.session_id)
1219 .fetch_one(&mut *tx)
1220 .await
1221 .map_err(store_sqlx_error)?;
1222 let next_revision = actual_revision + 1;
1223 let meta = SessionHeadMeta {
1224 session_id: commit.session_id.clone(),
1225 head_revision: next_revision,
1226 config: commit.config.clone(),
1227 agent_frames: commit.agent_frames.clone(),
1228 current_agent_frame_id: commit.current_agent_frame_id.clone(),
1229 checkpoint_ref: Some(checkpoint_ref.clone()),
1230 leaf_node_id,
1231 graph_node_count: graph_node_count as usize,
1232 token_ledger: Vec::new(),
1233 };
1234 let head_write = sqlx::query(
1239 "INSERT INTO lash_sessions (session_id, head_revision, head_json, checkpoint_ref)
1240 VALUES ($1, $2, $3, $4)
1241 ON CONFLICT (session_id) DO UPDATE SET
1242 head_revision = EXCLUDED.head_revision,
1243 head_json = EXCLUDED.head_json,
1244 checkpoint_ref = EXCLUDED.checkpoint_ref
1245 WHERE lash_sessions.head_revision = $5",
1246 )
1247 .bind(&commit.session_id)
1248 .bind(next_revision as i64)
1249 .bind(encode_json(&meta))
1250 .bind(checkpoint_ref.as_str())
1251 .bind(actual_revision as i64)
1252 .execute(&mut *tx)
1253 .await;
1254 let head_write = match head_write {
1255 Ok(result) => result,
1256 Err(err) if is_contention_error(&err) => {
1257 return Err(StoreError::HeadRevisionConflict {
1262 expected: commit.expected_head_revision.or(Some(actual_revision)),
1263 actual: actual_revision,
1264 });
1265 }
1266 Err(err) => return Err(store_sqlx_error(err)),
1267 };
1268 if head_write.rows_affected() == 0 {
1269 let actual_now = sqlx::query_scalar::<_, i64>(
1274 "SELECT head_revision FROM lash_sessions WHERE session_id = $1",
1275 )
1276 .bind(&commit.session_id)
1277 .fetch_optional(&mut *tx)
1278 .await
1279 .map_err(store_sqlx_error)?
1280 .map_or(actual_revision, |revision| revision as u64);
1281 return Err(StoreError::HeadRevisionConflict {
1282 expected: commit.expected_head_revision.or(Some(actual_revision)),
1283 actual: actual_now,
1284 });
1285 }
1286 for completed in &commit.completed_queue_claims {
1287 for batch_id in &completed.batch_ids {
1288 sqlx::query(
1289 "DELETE FROM lash_queued_work_batches
1290 WHERE session_id = $1 AND batch_id = $2 AND claim_id = $3 AND claim_token = $4",
1291 )
1292 .bind(&completed.session_id)
1293 .bind(batch_id)
1294 .bind(&completed.claim_id)
1295 .bind(&completed.lease_token)
1296 .execute(&mut *tx)
1297 .await
1298 .map_err(store_sqlx_error)?;
1299 }
1300 }
1301 commit_attachment_refs_tx(
1302 &mut tx,
1303 &commit.session_id,
1304 &commit.committed_attachment_ids,
1305 )
1306 .await?;
1307 let result = RuntimeCommitResult {
1308 head_revision: next_revision,
1309 checkpoint_ref,
1310 manifest,
1311 };
1312 if let Some(completed) = &commit.turn_commit {
1313 sqlx::query(
1314 "INSERT INTO lash_runtime_turn_commits (
1315 session_id, turn_id, turn_commit_hash, result_json, committed_at_ms
1316 )
1317 VALUES ($1, $2, $3, $4, $5)",
1318 )
1319 .bind(&completed.session_id)
1320 .bind(&completed.turn_id)
1321 .bind(&completed.turn_commit_hash)
1322 .bind(encode_json(&result))
1323 .bind(current_epoch_ms() as i64)
1324 .execute(&mut *tx)
1325 .await
1326 .map_err(store_sqlx_error)?;
1327 }
1328 tx.commit().await.map_err(store_sqlx_error)?;
1329 Ok(result)
1330 }
1331
1332 async fn enqueue_queued_work(
1333 &self,
1334 batch: QueuedWorkBatchDraft,
1335 ) -> Result<QueuedWorkBatch, StoreError> {
1336 let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1337 if let Some(source_key) = batch.source_key.as_deref() {
1338 let existing_id: Option<String> = sqlx::query_scalar(
1339 "SELECT batch_id FROM lash_queued_work_batches
1340 WHERE session_id = $1 AND source_key = $2",
1341 )
1342 .bind(&batch.session_id)
1343 .bind(source_key)
1344 .fetch_optional(&mut *tx)
1345 .await
1346 .map_err(store_sqlx_error)?;
1347 if let Some(batch_id) = existing_id {
1348 let existing = load_queued_batch(&mut tx, &batch_id)
1349 .await?
1350 .ok_or_else(|| {
1351 StoreError::Backend("queued work source row disappeared".to_string())
1352 })?;
1353 tx.commit().await.map_err(store_sqlx_error)?;
1354 return Ok(existing);
1355 }
1356 }
1357 let now = current_epoch_ms();
1358 let batch_id = derive_batch_id(&batch.session_id, batch.source_key.as_deref(), now, None);
1359 let row = sqlx::query_scalar::<_, i64>(
1360 "INSERT INTO lash_queued_work_batches (
1361 batch_id, session_id, source_key, delivery_policy, slot_policy,
1362 merge_key_json, available_at_ms, enqueued_at_ms
1363 )
1364 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
1365 RETURNING enqueue_seq",
1366 )
1367 .bind(&batch_id)
1368 .bind(&batch.session_id)
1369 .bind(&batch.source_key)
1370 .bind(batch.delivery_policy.as_str())
1371 .bind(batch.slot_policy.as_str())
1372 .bind(encode_json(&batch.merge_key))
1373 .bind(batch.available_at_ms as i64)
1374 .bind(now as i64)
1375 .fetch_one(&mut *tx)
1376 .await
1377 .map_err(store_sqlx_error)?;
1378 for (index, payload) in batch.payloads.iter().enumerate() {
1379 let item_id = format!("{batch_id}:item:{index}");
1380 sqlx::query(
1381 "INSERT INTO lash_queued_work_items (batch_id, item_index, item_id, payload_json)
1382 VALUES ($1, $2, $3, $4)",
1383 )
1384 .bind(&batch_id)
1385 .bind(index as i32)
1386 .bind(item_id)
1387 .bind(encode_json(payload))
1388 .execute(&mut *tx)
1389 .await
1390 .map_err(store_sqlx_error)?;
1391 }
1392 let queued = load_queued_batch(&mut tx, &batch_id)
1393 .await?
1394 .ok_or_else(|| StoreError::Backend("queued work insert disappeared".to_string()))?;
1395 debug_assert_eq!(queued.enqueue_seq, row as u64);
1396 tx.commit().await.map_err(store_sqlx_error)?;
1397 Ok(queued)
1398 }
1399
1400 async fn claim_ready_queued_work(
1401 &self,
1402 session_id: &str,
1403 owner_id: &str,
1404 boundary: QueuedWorkClaimBoundary,
1405 lease_ttl_ms: u64,
1406 max_batches: usize,
1407 ) -> Result<Option<QueuedWorkClaim>, StoreError> {
1408 if max_batches == 0 {
1409 return Ok(None);
1410 }
1411 let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1412 let now = current_epoch_ms();
1413 let rows = sqlx::query(
1414 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1415 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1416 claim_fencing_token
1417 FROM lash_queued_work_batches
1418 WHERE session_id = $1
1419 AND available_at_ms <= $2
1420 AND (claim_token IS NULL OR claim_expires_at_ms <= $2)
1421 ORDER BY enqueue_seq ASC
1422 LIMIT $3
1423 FOR UPDATE SKIP LOCKED",
1424 )
1425 .bind(session_id)
1426 .bind(now as i64)
1427 .bind(claim_scan_limit(max_batches))
1428 .fetch_all(&mut *tx)
1429 .await
1430 .map_err(store_sqlx_error)?;
1431 let mut selected = Vec::new();
1432 for row in rows {
1433 selected.push(queued_batch_row(row)?);
1434 }
1435 let candidates = selected
1436 .iter()
1437 .map(|row| ClaimCandidate {
1438 enqueue_seq: row.enqueue_seq,
1439 claim_fencing_token: row.claim_fencing_token,
1440 delivery_policy: row.delivery_policy,
1441 slot_policy: row.slot_policy,
1442 merge_key: row.merge_key.clone(),
1443 })
1444 .collect::<Vec<_>>();
1445 let selected_len = select_claim_prefix(&candidates, boundary, max_batches);
1446 if selected_len == 0 {
1447 tx.commit().await.map_err(store_sqlx_error)?;
1448 return Ok(None);
1449 }
1450 selected.truncate(selected_len);
1451 let lease =
1452 QueuedWorkClaimLease::derive(&candidates[0], session_id, owner_id, now, lease_ttl_ms);
1453 for row in &selected {
1454 let changed = sqlx::query(
1455 "UPDATE lash_queued_work_batches
1456 SET claim_id = $3,
1457 claim_owner_id = $4,
1458 claim_token = $5,
1459 claim_fencing_token = claim_fencing_token + 1,
1460 claim_claimed_at_ms = $6,
1461 claim_expires_at_ms = $7
1462 WHERE session_id = $1
1463 AND batch_id = $2
1464 AND (claim_token IS NULL OR claim_expires_at_ms <= $6)",
1465 )
1466 .bind(session_id)
1467 .bind(&row.batch_id)
1468 .bind(&lease.claim_id)
1469 .bind(owner_id)
1470 .bind(&lease.lease_token)
1471 .bind(now as i64)
1472 .bind(lease.expires_at_epoch_ms as i64)
1473 .execute(&mut *tx)
1474 .await
1475 .map_err(store_sqlx_error)?
1476 .rows_affected();
1477 if changed == 0 {
1478 tx.rollback().await.map_err(store_sqlx_error)?;
1479 return Ok(None);
1480 }
1481 }
1482 let mut batches = Vec::new();
1483 for row in selected {
1484 batches.push(queued_work_batch_from_row(&mut tx, row).await?);
1485 }
1486 tx.commit().await.map_err(store_sqlx_error)?;
1487 Ok(Some(QueuedWorkClaim {
1488 session_id: session_id.to_string(),
1489 claim_id: lease.claim_id,
1490 owner_id: owner_id.to_string(),
1491 lease_token: lease.lease_token,
1492 fencing_token: lease.fencing_token,
1493 claimed_at_epoch_ms: lease.claimed_at_epoch_ms,
1494 expires_at_epoch_ms: lease.expires_at_epoch_ms,
1495 batches,
1496 }))
1497 }
1498
1499 async fn renew_queued_work_claim(
1500 &self,
1501 claim: &QueuedWorkClaim,
1502 lease_ttl_ms: u64,
1503 ) -> Result<QueuedWorkClaim, StoreError> {
1504 let expires_at = current_epoch_ms().saturating_add(lease_ttl_ms);
1505 let changed = sqlx::query(
1506 "UPDATE lash_queued_work_batches
1507 SET claim_expires_at_ms = $4
1508 WHERE session_id = $1 AND claim_id = $2 AND claim_token = $3",
1509 )
1510 .bind(&claim.session_id)
1511 .bind(&claim.claim_id)
1512 .bind(&claim.lease_token)
1513 .bind(expires_at as i64)
1514 .execute(&self.pool)
1515 .await
1516 .map_err(store_sqlx_error)?
1517 .rows_affected();
1518 renewed_claim(claim, changed as usize, expires_at)
1519 }
1520
1521 async fn abandon_queued_work_claim(&self, claim: &QueuedWorkClaim) -> Result<(), StoreError> {
1522 sqlx::query(
1523 "UPDATE lash_queued_work_batches
1524 SET claim_id = NULL,
1525 claim_owner_id = NULL,
1526 claim_token = NULL,
1527 claim_claimed_at_ms = 0,
1528 claim_expires_at_ms = 0
1529 WHERE session_id = $1 AND claim_id = $2 AND claim_token = $3",
1530 )
1531 .bind(&claim.session_id)
1532 .bind(&claim.claim_id)
1533 .bind(&claim.lease_token)
1534 .execute(&self.pool)
1535 .await
1536 .map_err(store_sqlx_error)?;
1537 Ok(())
1538 }
1539
1540 async fn cancel_queued_work_batch(
1541 &self,
1542 session_id: &str,
1543 batch_id: &str,
1544 ) -> Result<Option<QueuedWorkBatch>, StoreError> {
1545 let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1546 let now = current_epoch_ms();
1547 let row = sqlx::query(
1548 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1549 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1550 claim_fencing_token
1551 FROM lash_queued_work_batches
1552 WHERE session_id = $1
1553 AND batch_id = $2
1554 AND (claim_token IS NULL OR claim_expires_at_ms <= $3)
1555 FOR UPDATE",
1556 )
1557 .bind(session_id)
1558 .bind(batch_id)
1559 .bind(now as i64)
1560 .fetch_optional(&mut *tx)
1561 .await
1562 .map_err(store_sqlx_error)?;
1563 let Some(row) = row else {
1564 tx.commit().await.map_err(store_sqlx_error)?;
1565 return Ok(None);
1566 };
1567 let batch = queued_work_batch_from_row(&mut tx, queued_batch_row(row)?).await?;
1568 sqlx::query("DELETE FROM lash_queued_work_batches WHERE batch_id = $1")
1569 .bind(batch_id)
1570 .execute(&mut *tx)
1571 .await
1572 .map_err(store_sqlx_error)?;
1573 tx.commit().await.map_err(store_sqlx_error)?;
1574 Ok(Some(batch))
1575 }
1576
1577 async fn list_queued_work(&self, session_id: &str) -> Result<Vec<QueuedWorkBatch>, StoreError> {
1578 let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1579 let rows = sqlx::query(
1580 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1581 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1582 claim_fencing_token
1583 FROM lash_queued_work_batches
1584 WHERE session_id = $1
1585 ORDER BY enqueue_seq ASC",
1586 )
1587 .bind(session_id)
1588 .fetch_all(&mut *tx)
1589 .await
1590 .map_err(store_sqlx_error)?;
1591 let mut batches = Vec::new();
1592 for row in rows {
1593 batches.push(queued_work_batch_from_row(&mut tx, queued_batch_row(row)?).await?);
1594 }
1595 tx.commit().await.map_err(store_sqlx_error)?;
1596 Ok(batches)
1597 }
1598
1599 async fn list_pending_queued_work(
1600 &self,
1601 session_id: &str,
1602 ) -> Result<Vec<QueuedWorkBatch>, StoreError> {
1603 let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1604 let now = current_epoch_ms();
1605 let rows = sqlx::query(
1606 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1607 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1608 claim_fencing_token
1609 FROM lash_queued_work_batches
1610 WHERE session_id = $1
1611 AND (claim_token IS NULL OR claim_expires_at_ms <= $2)
1612 ORDER BY enqueue_seq ASC",
1613 )
1614 .bind(session_id)
1615 .bind(now as i64)
1616 .fetch_all(&mut *tx)
1617 .await
1618 .map_err(store_sqlx_error)?;
1619 let mut batches = Vec::new();
1620 for row in rows {
1621 batches.push(queued_work_batch_from_row(&mut tx, queued_batch_row(row)?).await?);
1622 }
1623 tx.commit().await.map_err(store_sqlx_error)?;
1624 Ok(batches)
1625 }
1626
1627 async fn save_session_meta(&self, meta: SessionMeta) -> Result<(), StoreError> {
1628 sqlx::query(
1629 "INSERT INTO lash_session_meta (session_id, meta_json)
1630 VALUES ($1, $2)
1631 ON CONFLICT (session_id) DO UPDATE SET meta_json = EXCLUDED.meta_json",
1632 )
1633 .bind(&meta.session_id)
1634 .bind(encode_json(&meta))
1635 .execute(&self.pool)
1636 .await
1637 .map_err(store_sqlx_error)?;
1638 Ok(())
1639 }
1640
1641 async fn load_session_meta(&self) -> Result<Option<SessionMeta>, StoreError> {
1642 let json: Option<String> = if let Some(session_id) = &self.session_id {
1643 sqlx::query_scalar("SELECT meta_json FROM lash_session_meta WHERE session_id = $1")
1644 .bind(session_id)
1645 .fetch_optional(&self.pool)
1646 .await
1647 .map_err(store_sqlx_error)?
1648 } else {
1649 sqlx::query_scalar(
1650 "SELECT meta_json FROM lash_session_meta ORDER BY session_id ASC LIMIT 1",
1651 )
1652 .fetch_optional(&self.pool)
1653 .await
1654 .map_err(store_sqlx_error)?
1655 };
1656 json.map(|json| store_decode_json(&json, "session meta"))
1657 .transpose()
1658 }
1659
1660 async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), StoreError> {
1661 for id in ids {
1662 if let Some(session_id) = &self.session_id {
1663 sqlx::query(
1664 "UPDATE lash_graph_nodes
1665 SET tombstoned = TRUE
1666 WHERE session_id = $1 AND node_id = $2",
1667 )
1668 .bind(session_id)
1669 .bind(id)
1670 .execute(&self.pool)
1671 .await
1672 .map_err(store_sqlx_error)?;
1673 } else {
1674 sqlx::query(
1675 "UPDATE lash_graph_nodes
1676 SET tombstoned = TRUE
1677 WHERE node_id = $1",
1678 )
1679 .bind(id)
1680 .execute(&self.pool)
1681 .await
1682 .map_err(store_sqlx_error)?;
1683 }
1684 }
1685 Ok(())
1686 }
1687
1688 async fn vacuum(&self) -> Result<VacuumReport, StoreError> {
1689 let removed = if let Some(session_id) = &self.session_id {
1690 sqlx::query("DELETE FROM lash_graph_nodes WHERE session_id = $1 AND tombstoned = TRUE")
1691 .bind(session_id)
1692 .execute(&self.pool)
1693 .await
1694 .map_err(store_sqlx_error)?
1695 .rows_affected()
1696 } else {
1697 sqlx::query("DELETE FROM lash_graph_nodes WHERE tombstoned = TRUE")
1698 .execute(&self.pool)
1699 .await
1700 .map_err(store_sqlx_error)?
1701 .rows_affected()
1702 };
1703 Ok(VacuumReport {
1704 removed_node_count: removed as usize,
1705 })
1706 }
1707
1708 async fn gc_unreachable(&self) -> Result<GcReport, StoreError> {
1709 Ok(GcReport::default())
1710 }
1711}
1712
1713fn process_status_label(record: &ProcessRecord) -> &'static str {
1714 record.status.label()
1715}
1716
1717async fn load_process_tx(
1718 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1719 process_id: &str,
1720) -> Result<Option<ProcessRecord>, PluginError> {
1721 let json: Option<String> = sqlx::query_scalar(
1722 "SELECT record_json
1723 FROM lash_processes
1724 WHERE process_id = $1
1725 FOR UPDATE",
1726 )
1727 .bind(process_id)
1728 .fetch_optional(&mut **tx)
1729 .await
1730 .map_err(plugin_sqlx_error)?;
1731 json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
1732 .transpose()
1733}
1734
1735async fn load_process(
1736 pool: &PgPool,
1737 process_id: &str,
1738) -> Result<Option<ProcessRecord>, PluginError> {
1739 let json: Option<String> =
1740 sqlx::query_scalar("SELECT record_json FROM lash_processes WHERE process_id = $1")
1741 .bind(process_id)
1742 .fetch_optional(pool)
1743 .await
1744 .map_err(plugin_sqlx_error)?;
1745 json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
1746 .transpose()
1747}
1748
1749async fn save_process_tx(
1750 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1751 record: &ProcessRecord,
1752) -> Result<(), PluginError> {
1753 sqlx::query(
1754 "UPDATE lash_processes
1755 SET updated_at_ms = $2, status = $3, record_json = $4
1756 WHERE process_id = $1",
1757 )
1758 .bind(&record.id)
1759 .bind(record.updated_at_ms as i64)
1760 .bind(process_status_label(record))
1761 .bind(serde_json::to_string(record).map_err(process_decode_error)?)
1762 .execute(&mut **tx)
1763 .await
1764 .map_err(plugin_sqlx_error)?;
1765 Ok(())
1766}
1767
1768async fn load_event_by_key_tx(
1769 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1770 process_id: &str,
1771 replay_key: &str,
1772) -> Result<Option<(String, ProcessEvent)>, PluginError> {
1773 let row = sqlx::query(
1774 "SELECT payload_hash, event_json
1775 FROM lash_process_events
1776 WHERE process_id = $1 AND idempotency_key = $2",
1777 )
1778 .bind(process_id)
1779 .bind(replay_key)
1780 .fetch_optional(&mut **tx)
1781 .await
1782 .map_err(plugin_sqlx_error)?;
1783 row.map(|row| {
1784 let hash: String = row.get(0);
1785 let json: String = row.get(1);
1786 serde_json::from_str(&json)
1787 .map(|event| (hash, event))
1788 .map_err(process_decode_error)
1789 })
1790 .transpose()
1791}
1792
1793async fn load_process_lease_tx(
1794 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1795 process_id: &str,
1796) -> Result<Option<ProcessLease>, PluginError> {
1797 let row = sqlx::query(
1798 "SELECT lease_owner_id, lease_token, lease_fencing_token,
1799 lease_claimed_at_ms, lease_expires_at_ms
1800 FROM lash_process_leases
1801 WHERE process_id = $1",
1802 )
1803 .bind(process_id)
1804 .fetch_optional(&mut **tx)
1805 .await
1806 .map_err(plugin_sqlx_error)?;
1807 let Some(row) = row else {
1808 return Ok(None);
1809 };
1810 let owner_id: Option<String> = row.get(0);
1811 let lease_token: Option<String> = row.get(1);
1812 let (Some(owner_id), Some(lease_token)) = (owner_id, lease_token) else {
1813 return Ok(None);
1814 };
1815 Ok(Some(ProcessLease {
1816 schema_version: PROCESS_LEASE_SCHEMA_VERSION,
1817 process_id: process_id.to_string(),
1818 owner_id,
1819 lease_token,
1820 fencing_token: row.get::<i64, _>(2) as u64,
1821 claimed_at_epoch_ms: row.get::<i64, _>(3) as u64,
1822 expires_at_epoch_ms: row.get::<i64, _>(4) as u64,
1823 }))
1824}
1825
1826fn process_lease_conflict(process_id: &str, current: &ProcessLease) -> PluginError {
1827 PluginError::Session(format!(
1828 "process `{process_id}` is already leased by `{}` until {}",
1829 current.owner_id, current.expires_at_epoch_ms
1830 ))
1831}
1832
1833fn process_lease_expired(process_id: &str) -> PluginError {
1834 PluginError::Session(format!(
1835 "process lease for `{process_id}` is missing or expired"
1836 ))
1837}
1838
1839fn guard_lease(current: Option<&ProcessLease>, lease_token: &str, now: u64) -> bool {
1840 current
1841 .map(|current| current.lease_token == lease_token && current.expires_at_epoch_ms > now)
1842 .unwrap_or(false)
1843}
1844
1845async fn list_grants_for_scope(
1846 pool: &PgPool,
1847 owner_scope: &ProcessScope,
1848 live_only: bool,
1849) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
1850 let status_clause = if live_only {
1851 "AND p.status = 'running'"
1852 } else {
1853 ""
1854 };
1855 let sql = format!(
1856 "SELECT g.process_id, g.descriptor_json, p.record_json
1857 FROM lash_process_handle_grants g
1858 JOIN lash_processes p ON p.process_id = g.process_id
1859 WHERE g.scope_id = $1 {status_clause}
1860 ORDER BY g.process_id ASC"
1861 );
1862 let rows = sqlx::query(&sql)
1863 .bind(owner_scope.id().as_str())
1864 .fetch_all(pool)
1865 .await
1866 .map_err(plugin_sqlx_error)?;
1867 let mut entries = Vec::new();
1868 for row in rows {
1869 let process_id: String = row.get(0);
1870 let descriptor_json: String = row.get(1);
1871 let record_json: String = row.get(2);
1872 let descriptor: ProcessHandleDescriptor =
1873 serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
1874 let record: ProcessRecord =
1875 serde_json::from_str(&record_json).map_err(process_decode_error)?;
1876 entries.push((
1877 ProcessHandleGrant {
1878 session_id: owner_scope.session_id.clone(),
1879 process_id,
1880 descriptor,
1881 },
1882 record,
1883 ));
1884 }
1885 Ok(entries)
1886}
1887
1888#[async_trait::async_trait]
1889impl ProcessRegistry for PostgresProcessRegistry {
1890 fn durability_tier(&self) -> DurabilityTier {
1891 DurabilityTier::Durable
1892 }
1893
1894 async fn register_process(
1895 &self,
1896 registration: ProcessRegistration,
1897 ) -> Result<ProcessRecord, PluginError> {
1898 let (registration, registration_hash) =
1899 lash_core::runtime::prepare_process_registration(registration)?;
1900 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
1901 if let Some(existing) = load_process_tx(&mut tx, ®istration.id).await? {
1902 if existing.registration_hash == registration_hash {
1903 tx.commit().await.map_err(plugin_sqlx_error)?;
1904 return Ok(existing);
1905 }
1906 return Err(PluginError::Session(format!(
1907 "process `{}` registration hash conflict: existing {}, new {}",
1908 registration.id, existing.registration_hash, registration_hash
1909 )));
1910 }
1911 let now = current_epoch_ms();
1912 let record =
1913 ProcessRecord::from_prepared_registration(registration, registration_hash, now);
1914 let record_json = serde_json::to_string(&record).map_err(process_decode_error)?;
1915 sqlx::query(
1916 "INSERT INTO lash_processes (
1917 process_id, registration_hash, owner_scope_id, host_profile_id,
1918 created_at_ms, updated_at_ms, status, record_json
1919 )
1920 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
1921 )
1922 .bind(&record.id)
1923 .bind(&record.registration_hash)
1924 .bind(record.owner_scope_id().as_str())
1925 .bind(record.host_profile_id())
1926 .bind(record.created_at_ms as i64)
1927 .bind(record.updated_at_ms as i64)
1928 .bind(process_status_label(&record))
1929 .bind(record_json)
1930 .execute(&mut *tx)
1931 .await
1932 .map_err(plugin_sqlx_error)?;
1933 tx.commit().await.map_err(plugin_sqlx_error)?;
1934 Ok(record)
1935 }
1936
1937 async fn set_external_ref(
1938 &self,
1939 process_id: &str,
1940 external_ref: ProcessExternalRef,
1941 ) -> Result<ProcessRecord, PluginError> {
1942 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
1943 let mut record = load_process_tx(&mut tx, process_id)
1944 .await?
1945 .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
1946 record.external_ref = Some(external_ref);
1947 record.updated_at_ms = current_epoch_ms();
1948 save_process_tx(&mut tx, &record).await?;
1949 tx.commit().await.map_err(plugin_sqlx_error)?;
1950 Ok(record)
1951 }
1952
1953 async fn grant_handle(
1954 &self,
1955 owner_scope: &ProcessScope,
1956 process_id: &str,
1957 descriptor: ProcessHandleDescriptor,
1958 ) -> Result<ProcessHandleGrant, PluginError> {
1959 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
1960 if load_process_tx(&mut tx, process_id).await?.is_none() {
1961 return Err(PluginError::Session(format!(
1962 "unknown process `{process_id}`"
1963 )));
1964 }
1965 sqlx::query(
1966 "INSERT INTO lash_process_handle_grants (session_id, scope_id, process_id, descriptor_json)
1967 VALUES ($1, $2, $3, $4)
1968 ON CONFLICT (scope_id, process_id) DO UPDATE SET
1969 session_id = EXCLUDED.session_id,
1970 descriptor_json = EXCLUDED.descriptor_json",
1971 )
1972 .bind(&owner_scope.session_id)
1973 .bind(owner_scope.id().as_str())
1974 .bind(process_id)
1975 .bind(serde_json::to_string(&descriptor).map_err(process_decode_error)?)
1976 .execute(&mut *tx)
1977 .await
1978 .map_err(plugin_sqlx_error)?;
1979 tx.commit().await.map_err(plugin_sqlx_error)?;
1980 Ok(ProcessHandleGrant {
1981 session_id: owner_scope.session_id.clone(),
1982 process_id: process_id.to_string(),
1983 descriptor,
1984 })
1985 }
1986
1987 async fn revoke_handle(
1988 &self,
1989 owner_scope: &ProcessScope,
1990 process_id: &str,
1991 ) -> Result<(), PluginError> {
1992 sqlx::query(
1993 "DELETE FROM lash_process_handle_grants WHERE scope_id = $1 AND process_id = $2",
1994 )
1995 .bind(owner_scope.id().as_str())
1996 .bind(process_id)
1997 .execute(&self.pool)
1998 .await
1999 .map_err(plugin_sqlx_error)?;
2000 Ok(())
2001 }
2002
2003 async fn transfer_handle_grants(
2004 &self,
2005 from_scope: &ProcessScope,
2006 to_scope: &ProcessScope,
2007 process_ids: &[String],
2008 ) -> Result<(), PluginError> {
2009 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2010 for process_id in process_ids {
2011 let descriptor_json: Option<String> = sqlx::query_scalar(
2012 "SELECT descriptor_json FROM lash_process_handle_grants
2013 WHERE scope_id = $1 AND process_id = $2",
2014 )
2015 .bind(from_scope.id().as_str())
2016 .bind(process_id)
2017 .fetch_optional(&mut *tx)
2018 .await
2019 .map_err(plugin_sqlx_error)?;
2020 let Some(descriptor_json) = descriptor_json else {
2021 return Err(PluginError::Session(format!(
2022 "process handle `{process_id}` is not granted to session `{}`",
2023 from_scope.session_id
2024 )));
2025 };
2026 sqlx::query(
2027 "DELETE FROM lash_process_handle_grants WHERE scope_id = $1 AND process_id = $2",
2028 )
2029 .bind(from_scope.id().as_str())
2030 .bind(process_id)
2031 .execute(&mut *tx)
2032 .await
2033 .map_err(plugin_sqlx_error)?;
2034 sqlx::query(
2035 "INSERT INTO lash_process_handle_grants (session_id, scope_id, process_id, descriptor_json)
2036 VALUES ($1, $2, $3, $4)
2037 ON CONFLICT (scope_id, process_id) DO UPDATE SET
2038 session_id = EXCLUDED.session_id,
2039 descriptor_json = EXCLUDED.descriptor_json",
2040 )
2041 .bind(&to_scope.session_id)
2042 .bind(to_scope.id().as_str())
2043 .bind(process_id)
2044 .bind(descriptor_json)
2045 .execute(&mut *tx)
2046 .await
2047 .map_err(plugin_sqlx_error)?;
2048 }
2049 tx.commit().await.map_err(plugin_sqlx_error)
2050 }
2051
2052 async fn list_handle_grants(
2053 &self,
2054 owner_scope: &ProcessScope,
2055 ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
2056 list_grants_for_scope(&self.pool, owner_scope, false).await
2057 }
2058
2059 async fn list_live_handle_grants(
2060 &self,
2061 owner_scope: &ProcessScope,
2062 ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
2063 list_grants_for_scope(&self.pool, owner_scope, true).await
2064 }
2065
2066 async fn has_handle_grant(
2067 &self,
2068 owner_scope: &ProcessScope,
2069 process_id: &str,
2070 ) -> Result<bool, PluginError> {
2071 let exists: Option<i64> = sqlx::query_scalar(
2072 "SELECT 1::BIGINT FROM lash_process_handle_grants
2073 WHERE scope_id = $1 AND process_id = $2
2074 LIMIT 1",
2075 )
2076 .bind(owner_scope.id().as_str())
2077 .bind(process_id)
2078 .fetch_optional(&self.pool)
2079 .await
2080 .map_err(plugin_sqlx_error)?;
2081 Ok(exists.is_some())
2082 }
2083
2084 async fn handle_grants_for_process(
2085 &self,
2086 process_id: &str,
2087 ) -> Result<Vec<ProcessHandleGrant>, PluginError> {
2088 if load_process(&self.pool, process_id).await?.is_none() {
2089 return Err(PluginError::Session(format!(
2090 "unknown process `{process_id}`"
2091 )));
2092 }
2093 let rows = sqlx::query(
2094 "SELECT session_id, descriptor_json
2095 FROM lash_process_handle_grants
2096 WHERE process_id = $1
2097 ORDER BY session_id ASC, scope_id ASC",
2098 )
2099 .bind(process_id)
2100 .fetch_all(&self.pool)
2101 .await
2102 .map_err(plugin_sqlx_error)?;
2103 let mut grants = Vec::new();
2104 for row in rows {
2105 let descriptor_json: String = row.get(1);
2106 grants.push(ProcessHandleGrant {
2107 session_id: row.get(0),
2108 process_id: process_id.to_string(),
2109 descriptor: serde_json::from_str(&descriptor_json).map_err(process_decode_error)?,
2110 });
2111 }
2112 Ok(grants)
2113 }
2114
2115 async fn delete_session_process_state(
2116 &self,
2117 session_id: &str,
2118 ) -> Result<lash_core::ProcessSessionDeleteReport, PluginError> {
2119 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2120 let rows = sqlx::query(
2121 "SELECT g.process_id, p.record_json
2122 FROM lash_process_handle_grants g
2123 JOIN lash_processes p ON p.process_id = g.process_id
2124 WHERE g.session_id = $1
2125 ORDER BY g.process_id ASC",
2126 )
2127 .bind(session_id)
2128 .fetch_all(&mut *tx)
2129 .await
2130 .map_err(plugin_sqlx_error)?;
2131 let mut removed = Vec::new();
2132 for row in rows {
2133 let process_id: String = row.get(0);
2134 let record_json: String = row.get(1);
2135 let record: ProcessRecord =
2136 serde_json::from_str(&record_json).map_err(process_decode_error)?;
2137 removed.push((process_id, record));
2138 }
2139 let revoked = sqlx::query("DELETE FROM lash_process_handle_grants WHERE session_id = $1")
2140 .bind(session_id)
2141 .execute(&mut *tx)
2142 .await
2143 .map_err(plugin_sqlx_error)?
2144 .rows_affected() as usize;
2145 let mut cancel_process_ids = Vec::new();
2146 let mut preserved_process_ids = Vec::new();
2147 for (process_id, record) in removed {
2148 if record.is_terminal() {
2149 continue;
2150 }
2151 let remaining: i64 = sqlx::query_scalar(
2152 "SELECT COUNT(*) FROM lash_process_handle_grants WHERE process_id = $1",
2153 )
2154 .bind(&process_id)
2155 .fetch_one(&mut *tx)
2156 .await
2157 .map_err(plugin_sqlx_error)?;
2158 if remaining == 0 {
2159 cancel_process_ids.push(process_id);
2160 } else {
2161 preserved_process_ids.push(process_id);
2162 }
2163 }
2164 tx.commit().await.map_err(plugin_sqlx_error)?;
2165 cancel_process_ids.sort();
2166 cancel_process_ids.dedup();
2167 preserved_process_ids.sort();
2168 preserved_process_ids.dedup();
2169 Ok(lash_core::ProcessSessionDeleteReport {
2170 session_id: session_id.to_string(),
2171 revoked_handle_count: revoked,
2172 deleted_wake_count: 0,
2173 cancel_process_ids,
2174 preserved_process_ids,
2175 })
2176 }
2177
2178 async fn append_event(
2179 &self,
2180 process_id: &str,
2181 request: ProcessEventAppendRequest,
2182 ) -> Result<ProcessEventAppendResult, PluginError> {
2183 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2184 let mut record = load_process_tx(&mut tx, process_id)
2185 .await?
2186 .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
2187 let replay_lookup =
2188 if let Some(replay_key) = request.replay.as_ref().map(|r| r.key.as_str()) {
2189 load_event_by_key_tx(&mut tx, process_id, replay_key).await?
2190 } else {
2191 None
2192 };
2193 let sequence: i64 = sqlx::query_scalar(
2194 "SELECT COALESCE(MAX(sequence), 0) + 1 FROM lash_process_events WHERE process_id = $1",
2195 )
2196 .bind(process_id)
2197 .fetch_one(&mut *tx)
2198 .await
2199 .map_err(plugin_sqlx_error)?;
2200 let occurred_at_ms = current_epoch_ms();
2201 let prepared = lash_core::runtime::prepare_process_event_append(
2202 &record,
2203 request,
2204 sequence as u64,
2205 replay_lookup,
2206 occurred_at_ms,
2207 )?;
2208 if prepared.replayed {
2209 let repaired = if let Some(status) = prepared.status_update.clone() {
2210 record.status = status;
2211 record.updated_at_ms = prepared.occurred_at_ms;
2212 save_process_tx(&mut tx, &record).await?;
2213 true
2214 } else {
2215 false
2216 };
2217 tx.commit().await.map_err(plugin_sqlx_error)?;
2218 if repaired {
2219 self.notify.notify_waiters();
2220 }
2221 return Ok(ProcessEventAppendResult {
2222 event: prepared.event,
2223 wake_delivery: prepared.wake_delivery,
2224 });
2225 }
2226 let event = prepared.event;
2227 sqlx::query(
2228 "INSERT INTO lash_process_events (
2229 process_id, sequence, event_type, payload_hash, idempotency_key,
2230 occurred_at_ms, event_json
2231 )
2232 VALUES ($1, $2, $3, $4, $5, $6, $7)",
2233 )
2234 .bind(process_id)
2235 .bind(sequence)
2236 .bind(event.event_type.as_str())
2237 .bind(&prepared.payload_hash)
2238 .bind(event.invocation.replay_key())
2239 .bind(prepared.occurred_at_ms as i64)
2240 .bind(serde_json::to_string(&event).map_err(process_decode_error)?)
2241 .execute(&mut *tx)
2242 .await
2243 .map_err(plugin_sqlx_error)?;
2244 if let Some(status) = prepared.status_update.clone() {
2245 record.status = status;
2246 }
2247 record.updated_at_ms = prepared.occurred_at_ms;
2248 save_process_tx(&mut tx, &record).await?;
2249 tx.commit().await.map_err(plugin_sqlx_error)?;
2250 self.notify.notify_waiters();
2251 Ok(ProcessEventAppendResult {
2252 event,
2253 wake_delivery: prepared.wake_delivery,
2254 })
2255 }
2256
2257 async fn events_after(
2258 &self,
2259 process_id: &str,
2260 after_sequence: u64,
2261 ) -> Result<Vec<ProcessEvent>, PluginError> {
2262 if load_process(&self.pool, process_id).await?.is_none() {
2263 return Err(PluginError::Session(format!(
2264 "unknown process `{process_id}`"
2265 )));
2266 }
2267 let rows = sqlx::query(
2268 "SELECT event_json FROM lash_process_events
2269 WHERE process_id = $1 AND sequence > $2
2270 ORDER BY sequence ASC",
2271 )
2272 .bind(process_id)
2273 .bind(after_sequence as i64)
2274 .fetch_all(&self.pool)
2275 .await
2276 .map_err(plugin_sqlx_error)?;
2277 let mut events = Vec::new();
2278 for row in rows {
2279 let json: String = row.get(0);
2280 events.push(serde_json::from_str(&json).map_err(process_decode_error)?);
2281 }
2282 Ok(events)
2283 }
2284
2285 async fn wake_events_after(
2286 &self,
2287 process_id: &str,
2288 after_sequence: u64,
2289 ) -> Result<Vec<ProcessEvent>, PluginError> {
2290 let rows = sqlx::query("SELECT sequence FROM lash_process_wake_acks WHERE process_id = $1")
2291 .bind(process_id)
2292 .fetch_all(&self.pool)
2293 .await
2294 .map_err(plugin_sqlx_error)?;
2295 let acked = rows
2296 .into_iter()
2297 .map(|row| row.get::<i64, _>(0) as u64)
2298 .collect::<HashSet<_>>();
2299 Ok(self
2300 .events_after(process_id, after_sequence)
2301 .await?
2302 .into_iter()
2303 .filter(|event| event.semantics.wake.is_some() && !acked.contains(&event.sequence))
2304 .collect())
2305 }
2306
2307 async fn wait_event_after(
2308 &self,
2309 process_id: &str,
2310 event_type: &str,
2311 after_sequence: u64,
2312 ) -> Result<ProcessEvent, PluginError> {
2313 loop {
2314 if let Some(event) = self
2315 .events_after(process_id, after_sequence)
2316 .await?
2317 .into_iter()
2318 .find(|event| event.event_type == event_type)
2319 {
2320 return Ok(event);
2321 }
2322 tokio::select! {
2323 _ = self.notify.notified() => {}
2324 _ = tokio::time::sleep(Duration::from_millis(50)) => {}
2325 }
2326 }
2327 }
2328
2329 async fn await_process(&self, process_id: &str) -> Result<ProcessAwaitOutput, PluginError> {
2330 loop {
2331 let record = load_process(&self.pool, process_id)
2332 .await?
2333 .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
2334 if let Some(await_output) = record.status.await_output() {
2335 return Ok(await_output.clone());
2336 }
2337 tokio::select! {
2338 _ = self.notify.notified() => {}
2339 _ = tokio::time::sleep(Duration::from_millis(50)) => {}
2340 }
2341 }
2342 }
2343
2344 async fn complete_process(
2345 &self,
2346 process_id: &str,
2347 await_output: ProcessAwaitOutput,
2348 ) -> Result<ProcessRecord, PluginError> {
2349 let event_type = match await_output.terminal_state() {
2350 lash_core::ProcessTerminalState::Completed => "process.completed",
2351 lash_core::ProcessTerminalState::Failed => "process.failed",
2352 lash_core::ProcessTerminalState::Cancelled => "process.cancelled",
2353 };
2354 self.append_event(
2355 process_id,
2356 ProcessEventAppendRequest::new(
2357 event_type,
2358 serde_json::json!({ "await_output": await_output }),
2359 )
2360 .with_replay_key(format!("process:{process_id}:terminal:{event_type}")),
2361 )
2362 .await?;
2363 load_process(&self.pool, process_id).await?.ok_or_else(|| {
2364 PluginError::Session(format!(
2365 "unknown process `{process_id}` after terminal event"
2366 ))
2367 })
2368 }
2369
2370 async fn get_process(&self, process_id: &str) -> Option<ProcessRecord> {
2371 load_process(&self.pool, process_id).await.ok().flatten()
2372 }
2373
2374 async fn ack_wake(&self, process_id: &str, sequence: u64) -> Result<(), PluginError> {
2375 if load_process(&self.pool, process_id).await?.is_none() {
2376 return Err(PluginError::Session(format!(
2377 "unknown process `{process_id}`"
2378 )));
2379 }
2380 sqlx::query(
2381 "INSERT INTO lash_process_wake_acks (process_id, sequence)
2382 VALUES ($1, $2)
2383 ON CONFLICT DO NOTHING",
2384 )
2385 .bind(process_id)
2386 .bind(sequence as i64)
2387 .execute(&self.pool)
2388 .await
2389 .map_err(plugin_sqlx_error)?;
2390 Ok(())
2391 }
2392
2393 async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, PluginError> {
2394 let rows = sqlx::query(
2395 "SELECT record_json FROM lash_processes
2396 WHERE status = 'running'
2397 ORDER BY process_id ASC",
2398 )
2399 .fetch_all(&self.pool)
2400 .await
2401 .map_err(plugin_sqlx_error)?;
2402 let mut records = Vec::new();
2403 for row in rows {
2404 let json: String = row.get(0);
2405 records.push(serde_json::from_str(&json).map_err(process_decode_error)?);
2406 }
2407 Ok(records)
2408 }
2409
2410 async fn claim_process_lease(
2411 &self,
2412 process_id: &str,
2413 owner_id: &str,
2414 lease_ttl_ms: u64,
2415 ) -> Result<ProcessLease, PluginError> {
2416 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2417 if load_process_tx(&mut tx, process_id).await?.is_none() {
2418 return Err(PluginError::Session(format!(
2419 "unknown process `{process_id}`"
2420 )));
2421 }
2422 let now = current_epoch_ms();
2423 let current = load_process_lease_tx(&mut tx, process_id).await?;
2424 if let Some(current) = current.as_ref()
2425 && current.expires_at_epoch_ms > now
2426 && current.owner_id != owner_id
2427 {
2428 return Err(process_lease_conflict(process_id, current));
2429 }
2430 let existing_fence: Option<i64> = sqlx::query_scalar(
2431 "SELECT lease_fencing_token FROM lash_process_leases WHERE process_id = $1 FOR UPDATE",
2432 )
2433 .bind(process_id)
2434 .fetch_optional(&mut *tx)
2435 .await
2436 .map_err(plugin_sqlx_error)?;
2437 let fencing_token = existing_fence.unwrap_or(0) as u64 + 1;
2438 let lease = ProcessLease {
2439 schema_version: PROCESS_LEASE_SCHEMA_VERSION,
2440 process_id: process_id.to_string(),
2441 owner_id: owner_id.to_string(),
2442 lease_token: format!(
2443 "{:x}",
2444 Sha256::digest(format!("{process_id}:{owner_id}:{now}:{fencing_token}").as_bytes())
2445 ),
2446 fencing_token,
2447 claimed_at_epoch_ms: now,
2448 expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
2449 };
2450 sqlx::query(
2451 "INSERT INTO lash_process_leases (
2452 process_id, lease_owner_id, lease_token, lease_fencing_token,
2453 lease_claimed_at_ms, lease_expires_at_ms
2454 )
2455 VALUES ($1, $2, $3, $4, $5, $6)
2456 ON CONFLICT (process_id) DO UPDATE SET
2457 lease_owner_id = EXCLUDED.lease_owner_id,
2458 lease_token = EXCLUDED.lease_token,
2459 lease_fencing_token = EXCLUDED.lease_fencing_token,
2460 lease_claimed_at_ms = EXCLUDED.lease_claimed_at_ms,
2461 lease_expires_at_ms = EXCLUDED.lease_expires_at_ms",
2462 )
2463 .bind(&lease.process_id)
2464 .bind(&lease.owner_id)
2465 .bind(&lease.lease_token)
2466 .bind(lease.fencing_token as i64)
2467 .bind(lease.claimed_at_epoch_ms as i64)
2468 .bind(lease.expires_at_epoch_ms as i64)
2469 .execute(&mut *tx)
2470 .await
2471 .map_err(plugin_sqlx_error)?;
2472 tx.commit().await.map_err(plugin_sqlx_error)?;
2473 Ok(lease)
2474 }
2475
2476 async fn renew_process_lease(
2477 &self,
2478 lease: &ProcessLease,
2479 lease_ttl_ms: u64,
2480 ) -> Result<ProcessLease, PluginError> {
2481 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2482 let now = current_epoch_ms();
2483 let current = load_process_lease_tx(&mut tx, &lease.process_id).await?;
2484 if !guard_lease(current.as_ref(), &lease.lease_token, now) {
2485 return Err(process_lease_expired(&lease.process_id));
2486 }
2487 let renewed = ProcessLease {
2488 expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
2489 ..lease.clone()
2490 };
2491 sqlx::query(
2492 "UPDATE lash_process_leases
2493 SET lease_expires_at_ms = $2
2494 WHERE process_id = $1 AND lease_token = $3",
2495 )
2496 .bind(&renewed.process_id)
2497 .bind(renewed.expires_at_epoch_ms as i64)
2498 .bind(&renewed.lease_token)
2499 .execute(&mut *tx)
2500 .await
2501 .map_err(plugin_sqlx_error)?;
2502 tx.commit().await.map_err(plugin_sqlx_error)?;
2503 Ok(renewed)
2504 }
2505
2506 async fn complete_process_lease(
2507 &self,
2508 completion: &ProcessLeaseCompletion,
2509 ) -> Result<(), PluginError> {
2510 sqlx::query(
2511 "UPDATE lash_process_leases
2512 SET lease_owner_id = NULL,
2513 lease_token = NULL,
2514 lease_claimed_at_ms = 0,
2515 lease_expires_at_ms = 0
2516 WHERE process_id = $1 AND lease_token = $2",
2517 )
2518 .bind(&completion.process_id)
2519 .bind(&completion.lease_token)
2520 .execute(&self.pool)
2521 .await
2522 .map_err(plugin_sqlx_error)?;
2523 Ok(())
2524 }
2525}
2526
2527#[async_trait::async_trait]
2528impl HostEventStore for PostgresHostEventStore {
2529 fn durability_tier(&self) -> DurabilityTier {
2530 DurabilityTier::Durable
2531 }
2532
2533 async fn register_subscription(
2534 &self,
2535 draft: TriggerSubscriptionDraft,
2536 ) -> Result<TriggerSubscriptionRecord, PluginError> {
2537 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2538 let seq: i64 = sqlx::query_scalar("SELECT nextval('lash_host_event_subscription_seq')")
2539 .fetch_one(&mut *tx)
2540 .await
2541 .map_err(plugin_sqlx_error)?;
2542 let handle = format!("trigger:{seq}");
2543 let subscription_id = format!("subscription:{seq}");
2544 let now = current_epoch_ms();
2545 let record = TriggerSubscriptionRecord {
2546 subscription_id: subscription_id.clone(),
2547 session_id: draft.session_id,
2548 handle,
2549 name: draft.name,
2550 source_type: draft.source_type,
2551 source_key: draft.source_key,
2552 source: draft.source,
2553 event_ty: draft.event_ty,
2554 module_ref: draft.module_ref,
2555 required_surface_ref: draft.required_surface_ref,
2556 process_ref: draft.process_ref,
2557 process_name: draft.process_name,
2558 input_template: draft.input_template,
2559 enabled: true,
2560 created_at_ms: now,
2561 updated_at_ms: now,
2562 };
2563 sqlx::query(
2564 "INSERT INTO lash_host_event_trigger_subscriptions (
2565 subscription_id, session_id, handle, source_type, source_key,
2566 enabled, created_at_ms, updated_at_ms, record_json
2567 )
2568 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
2569 )
2570 .bind(&record.subscription_id)
2571 .bind(&record.session_id)
2572 .bind(&record.handle)
2573 .bind(&record.source_type)
2574 .bind(&record.source_key)
2575 .bind(record.enabled)
2576 .bind(record.created_at_ms as i64)
2577 .bind(record.updated_at_ms as i64)
2578 .bind(serde_json::to_string(&record).map_err(process_decode_error)?)
2579 .execute(&mut *tx)
2580 .await
2581 .map_err(plugin_sqlx_error)?;
2582 tx.commit().await.map_err(plugin_sqlx_error)?;
2583 Ok(record)
2584 }
2585
2586 async fn list_subscriptions(
2587 &self,
2588 filter: TriggerSubscriptionFilter,
2589 ) -> Result<Vec<TriggerSubscriptionRecord>, PluginError> {
2590 let rows = sqlx::query(
2591 "SELECT record_json FROM lash_host_event_trigger_subscriptions
2592 ORDER BY session_id ASC, handle ASC",
2593 )
2594 .fetch_all(&self.pool)
2595 .await
2596 .map_err(plugin_sqlx_error)?;
2597 let mut records = Vec::new();
2598 for row in rows {
2599 let json: String = row.get(0);
2600 let record: TriggerSubscriptionRecord =
2601 serde_json::from_str(&json).map_err(process_decode_error)?;
2602 if filter.matches(&record) {
2603 records.push(record);
2604 }
2605 }
2606 Ok(records)
2607 }
2608
2609 async fn cancel_subscription(
2610 &self,
2611 session_id: &str,
2612 handle: &str,
2613 ) -> Result<bool, PluginError> {
2614 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2615 let json: Option<String> = sqlx::query_scalar(
2616 "SELECT record_json FROM lash_host_event_trigger_subscriptions
2617 WHERE session_id = $1 AND handle = $2
2618 FOR UPDATE",
2619 )
2620 .bind(session_id)
2621 .bind(handle)
2622 .fetch_optional(&mut *tx)
2623 .await
2624 .map_err(plugin_sqlx_error)?;
2625 let Some(json) = json else {
2626 tx.commit().await.map_err(plugin_sqlx_error)?;
2627 return Ok(false);
2628 };
2629 let mut record: TriggerSubscriptionRecord =
2630 serde_json::from_str(&json).map_err(process_decode_error)?;
2631 let changed = record.enabled;
2632 record.enabled = false;
2633 record.updated_at_ms = current_epoch_ms();
2634 sqlx::query(
2635 "UPDATE lash_host_event_trigger_subscriptions
2636 SET enabled = $3, updated_at_ms = $4, record_json = $5
2637 WHERE session_id = $1 AND handle = $2",
2638 )
2639 .bind(session_id)
2640 .bind(handle)
2641 .bind(record.enabled)
2642 .bind(record.updated_at_ms as i64)
2643 .bind(serde_json::to_string(&record).map_err(process_decode_error)?)
2644 .execute(&mut *tx)
2645 .await
2646 .map_err(plugin_sqlx_error)?;
2647 tx.commit().await.map_err(plugin_sqlx_error)?;
2648 Ok(changed)
2649 }
2650
2651 async fn record_occurrence(
2652 &self,
2653 request: HostEventOccurrenceRequest,
2654 ) -> Result<HostEventOccurrenceRecord, PluginError> {
2655 lash_core::validate_host_event_occurrence_request(&request)?;
2656 let request_hash = lash_core::host_event_occurrence_request_hash(&request)?;
2657 let occurrence_id = lash_core::deterministic_occurrence_id(&request)?;
2658 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2659 let existing = sqlx::query(
2660 "SELECT request_hash, record_json
2661 FROM lash_host_event_occurrences
2662 WHERE idempotency_key = $1
2663 FOR UPDATE",
2664 )
2665 .bind(&request.idempotency_key)
2666 .fetch_optional(&mut *tx)
2667 .await
2668 .map_err(plugin_sqlx_error)?;
2669 if let Some(row) = existing {
2670 let existing_hash: String = row.get(0);
2671 let existing_json: String = row.get(1);
2672 if existing_hash != request_hash {
2673 return Err(PluginError::Session(format!(
2674 "host event occurrence idempotency conflict for `{}`",
2675 request.idempotency_key
2676 )));
2677 }
2678 let record = serde_json::from_str(&existing_json).map_err(process_decode_error)?;
2679 tx.commit().await.map_err(plugin_sqlx_error)?;
2680 return Ok(record);
2681 }
2682 let record = HostEventOccurrenceRecord {
2683 occurrence_id: occurrence_id.clone(),
2684 source_type: request.source_type,
2685 source_key: request.source_key,
2686 payload: request.payload,
2687 idempotency_key: request.idempotency_key.clone(),
2688 source: request.source,
2689 occurred_at_ms: current_epoch_ms(),
2690 };
2691 sqlx::query(
2692 "INSERT INTO lash_host_event_occurrences (
2693 occurrence_id, idempotency_key, request_hash, source_type, source_key,
2694 occurred_at_ms, record_json
2695 )
2696 VALUES ($1, $2, $3, $4, $5, $6, $7)",
2697 )
2698 .bind(&record.occurrence_id)
2699 .bind(&record.idempotency_key)
2700 .bind(&request_hash)
2701 .bind(&record.source_type)
2702 .bind(&record.source_key)
2703 .bind(record.occurred_at_ms as i64)
2704 .bind(serde_json::to_string(&record).map_err(process_decode_error)?)
2705 .execute(&mut *tx)
2706 .await
2707 .map_err(plugin_sqlx_error)?;
2708 tx.commit().await.map_err(plugin_sqlx_error)?;
2709 Ok(record)
2710 }
2711
2712 async fn reserve_matching_deliveries(
2713 &self,
2714 occurrence_id: &str,
2715 ) -> Result<Vec<TriggerDeliveryReservation>, PluginError> {
2716 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2717 let occurrence_json: Option<String> = sqlx::query_scalar(
2718 "SELECT record_json FROM lash_host_event_occurrences WHERE occurrence_id = $1",
2719 )
2720 .bind(occurrence_id)
2721 .fetch_optional(&mut *tx)
2722 .await
2723 .map_err(plugin_sqlx_error)?;
2724 let Some(occurrence_json) = occurrence_json else {
2725 return Err(PluginError::Session(format!(
2726 "unknown host event occurrence `{occurrence_id}`"
2727 )));
2728 };
2729 let occurrence: HostEventOccurrenceRecord =
2730 serde_json::from_str(&occurrence_json).map_err(process_decode_error)?;
2731 let rows = sqlx::query(
2732 "SELECT record_json FROM lash_host_event_trigger_subscriptions
2733 WHERE enabled = TRUE AND source_type = $1 AND source_key = $2
2734 ORDER BY session_id ASC, handle ASC",
2735 )
2736 .bind(&occurrence.source_type)
2737 .bind(&occurrence.source_key)
2738 .fetch_all(&mut *tx)
2739 .await
2740 .map_err(plugin_sqlx_error)?;
2741 let mut deliveries = Vec::new();
2742 for row in rows {
2743 let json: String = row.get(0);
2744 let subscription: TriggerSubscriptionRecord =
2745 serde_json::from_str(&json).map_err(process_decode_error)?;
2746 let process_id = lash_core::deterministic_delivery_process_id(
2747 &occurrence.occurrence_id,
2748 &subscription.subscription_id,
2749 )?;
2750 let inserted = sqlx::query(
2751 "INSERT INTO lash_host_event_deliveries (
2752 occurrence_id, subscription_id, process_id, created_at_ms
2753 )
2754 VALUES ($1, $2, $3, $4)
2755 ON CONFLICT DO NOTHING",
2756 )
2757 .bind(&occurrence.occurrence_id)
2758 .bind(&subscription.subscription_id)
2759 .bind(&process_id)
2760 .bind(current_epoch_ms() as i64)
2761 .execute(&mut *tx)
2762 .await
2763 .map_err(plugin_sqlx_error)?
2764 .rows_affected();
2765 if inserted == 0 {
2766 continue;
2767 }
2768 deliveries.push(TriggerDeliveryReservation {
2769 occurrence: occurrence.clone(),
2770 subscription,
2771 process_id,
2772 });
2773 }
2774 tx.commit().await.map_err(plugin_sqlx_error)?;
2775 Ok(deliveries)
2776 }
2777}
2778
2779#[async_trait::async_trait]
2780impl lashlang::LashlangArtifactStore for PostgresLashlangArtifactStore {
2781 fn durability_tier(&self) -> DurabilityTier {
2782 DurabilityTier::Durable
2783 }
2784
2785 async fn put_module_artifact(
2786 &self,
2787 artifact: &lashlang::ModuleArtifact,
2788 ) -> Result<(), lashlang::ArtifactStoreError> {
2789 let bytes = artifact
2790 .to_store_bytes()
2791 .map_err(lashlang::ArtifactStoreError::from)?;
2792 sqlx::query(
2793 "INSERT INTO lash_lashlang_artifacts (module_ref, artifact_bytes)
2794 VALUES ($1, $2)
2795 ON CONFLICT (module_ref) DO UPDATE SET artifact_bytes = EXCLUDED.artifact_bytes",
2796 )
2797 .bind(artifact.module_ref.as_str())
2798 .bind(bytes)
2799 .execute(&self.pool)
2800 .await
2801 .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?;
2802 Ok(())
2803 }
2804
2805 async fn get_module_artifact(
2806 &self,
2807 module_ref: &lashlang::ModuleRef,
2808 ) -> Result<Option<Arc<lashlang::ModuleArtifact>>, lashlang::ArtifactStoreError> {
2809 let bytes: Option<Vec<u8>> = sqlx::query_scalar(
2810 "SELECT artifact_bytes FROM lash_lashlang_artifacts WHERE module_ref = $1",
2811 )
2812 .bind(module_ref.as_str())
2813 .fetch_optional(&self.pool)
2814 .await
2815 .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?;
2816 bytes
2817 .map(|bytes| {
2818 lashlang::ModuleArtifact::from_store_bytes(&bytes)
2819 .map(Arc::new)
2820 .map_err(lashlang::ArtifactStoreError::from)
2821 })
2822 .transpose()
2823 }
2824}