1use std::collections::HashSet;
8use std::sync::{Arc, OnceLock};
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11use lash_core::runtime::{
12 ProcessHandleGrantEntry, QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim,
13 QueuedWorkClaimBoundary, QueuedWorkCompletion, QueuedWorkItem,
14};
15use lash_core::store::{
16 GraphCommitDelta, HydratedSessionCheckpoint, PersistedSessionRead, RuntimeCommit,
17 RuntimeCommitResult, SessionCheckpoint, SessionHeadMeta,
18};
19use lash_core::{
20 AttachmentId, AttachmentIntent, AttachmentManifest, AttachmentManifestEntry, BlobRef,
21 DeliveryPolicy, DurabilityTier, GcReport, MergeKey, ProcessAwaitOutput, ProcessEvent,
22 ProcessEventAppendRequest, ProcessEventAppendResult, ProcessExternalRef,
23 ProcessHandleDescriptor, ProcessHandleGrant, ProcessLease, ProcessLeaseCompletion,
24 ProcessRecord, ProcessRegistration, ProcessRegistry, ProcessScope, RuntimePersistence,
25 SessionMeta, SessionNodeRecord, SessionReadScope, SessionStoreCreateRequest,
26 SessionStoreFactory, SlotPolicy, StoreError, TokenLedgerEntry, VacuumReport,
27};
28use lash_core::{
29 HostEventOccurrenceRecord, HostEventOccurrenceRequest, HostEventStore, PluginError,
30 TriggerDeliveryReservation, TriggerSubscriptionDraft, TriggerSubscriptionFilter,
31 TriggerSubscriptionRecord,
32};
33use sha2::{Digest, Sha256};
34use sqlx::postgres::{PgPool, PgPoolOptions, PgRow};
35use sqlx::{Executor, Row};
36
37const SCHEMA_COMPONENT: &str = "lash-postgres-store";
38const SCHEMA_VERSION: i32 = 1;
39const PROCESS_LEASE_SCHEMA_VERSION: u32 = lash_core::PROCESS_LEASE_SCHEMA_VERSION;
40
41#[derive(Clone)]
42pub struct PostgresStorage {
43 pool: PgPool,
44}
45
46#[derive(Clone)]
47pub struct PostgresSessionStoreFactory {
48 pool: PgPool,
49}
50
51#[derive(Clone)]
52pub struct PostgresSessionStore {
53 pool: PgPool,
54 session_id: Option<String>,
56 bound_session: Arc<OnceLock<String>>,
62}
63
64#[derive(Clone)]
65pub struct PostgresProcessRegistry {
66 pool: PgPool,
67 notify: Arc<tokio::sync::Notify>,
68}
69
70#[derive(Clone)]
71pub struct PostgresHostEventStore {
72 pool: PgPool,
73}
74
75#[derive(Clone)]
76pub struct PostgresLashlangArtifactStore {
77 pool: PgPool,
78}
79
80#[derive(Clone, Debug)]
89pub struct PostgresStoreConfig {
90 pub max_connections: u32,
92 pub min_connections: u32,
94 pub acquire_timeout: Duration,
96 pub idle_timeout: Option<Duration>,
98 pub max_lifetime: Option<Duration>,
100 pub lock_timeout: Option<Duration>,
102 pub statement_timeout: Option<Duration>,
104}
105
106impl Default for PostgresStoreConfig {
107 fn default() -> Self {
108 Self {
109 max_connections: 16,
110 min_connections: 0,
111 acquire_timeout: Duration::from_secs(30),
112 idle_timeout: Some(Duration::from_secs(600)),
113 max_lifetime: Some(Duration::from_secs(1800)),
114 lock_timeout: Some(Duration::from_secs(10)),
115 statement_timeout: None,
116 }
117 }
118}
119
120impl PostgresStorage {
121 pub async fn connect(database_url: &str) -> Result<Self, StoreError> {
123 Self::connect_with(database_url, PostgresStoreConfig::default()).await
124 }
125
126 pub async fn connect_with(
128 database_url: &str,
129 config: PostgresStoreConfig,
130 ) -> Result<Self, StoreError> {
131 let lock_ms = config.lock_timeout.map(|d| d.as_millis().max(1) as u64);
132 let statement_ms = config
133 .statement_timeout
134 .map(|d| d.as_millis().max(1) as u64);
135 let mut options = PgPoolOptions::new()
136 .max_connections(config.max_connections)
137 .min_connections(config.min_connections)
138 .acquire_timeout(config.acquire_timeout);
139 if let Some(timeout) = config.idle_timeout {
140 options = options.idle_timeout(timeout);
141 }
142 if let Some(timeout) = config.max_lifetime {
143 options = options.max_lifetime(timeout);
144 }
145 let pool = options
146 .after_connect(move |conn, _meta| {
147 Box::pin(async move {
148 if let Some(ms) = lock_ms {
149 conn.execute(format!("SET lock_timeout = {ms}").as_str())
150 .await?;
151 }
152 if let Some(ms) = statement_ms {
153 conn.execute(format!("SET statement_timeout = {ms}").as_str())
154 .await?;
155 }
156 Ok(())
157 })
158 })
159 .connect(database_url)
160 .await
161 .map_err(store_sqlx_error)?;
162 ensure_schema(&pool).await?;
163 Ok(Self { pool })
164 }
165
166 pub fn from_pool(pool: PgPool) -> Self {
167 Self { pool }
168 }
169
170 pub fn pool(&self) -> &PgPool {
171 &self.pool
172 }
173
174 pub fn session_store_factory(&self) -> PostgresSessionStoreFactory {
175 PostgresSessionStoreFactory {
176 pool: self.pool.clone(),
177 }
178 }
179
180 pub fn session_store(&self, session_id: impl Into<String>) -> PostgresSessionStore {
181 PostgresSessionStore {
182 pool: self.pool.clone(),
183 session_id: Some(session_id.into()),
184 bound_session: Arc::new(OnceLock::new()),
185 }
186 }
187
188 pub fn unbound_session_store(&self) -> PostgresSessionStore {
189 PostgresSessionStore {
190 pool: self.pool.clone(),
191 session_id: None,
192 bound_session: Arc::new(OnceLock::new()),
193 }
194 }
195
196 pub fn process_registry(&self) -> PostgresProcessRegistry {
197 PostgresProcessRegistry {
198 pool: self.pool.clone(),
199 notify: Arc::new(tokio::sync::Notify::new()),
200 }
201 }
202
203 pub fn host_event_store(&self) -> PostgresHostEventStore {
204 PostgresHostEventStore {
205 pool: self.pool.clone(),
206 }
207 }
208
209 pub fn lashlang_artifact_store(&self) -> PostgresLashlangArtifactStore {
210 PostgresLashlangArtifactStore {
211 pool: self.pool.clone(),
212 }
213 }
214}
215
216impl PostgresSessionStoreFactory {
217 pub fn new(storage: &PostgresStorage) -> Self {
218 storage.session_store_factory()
219 }
220}
221
222impl PostgresSessionStore {
223 pub fn unbound(storage: &PostgresStorage) -> Self {
224 storage.unbound_session_store()
225 }
226
227 async fn selected_session_id(&self) -> Result<Option<String>, StoreError> {
228 if let Some(session_id) = &self.session_id {
229 return Ok(Some(session_id.clone()));
230 }
231 sqlx::query_scalar("SELECT session_id FROM lash_sessions ORDER BY session_id ASC LIMIT 1")
232 .fetch_optional(&self.pool)
233 .await
234 .map_err(store_sqlx_error)
235 }
236}
237
238async fn ensure_schema(pool: &PgPool) -> Result<(), StoreError> {
239 let mut tx = pool.begin().await.map_err(store_sqlx_error)?;
240 tx.execute("SELECT pg_advisory_xact_lock(715421, 907001)")
241 .await
242 .map_err(store_sqlx_error)?;
243 tx.execute(
244 r#"
245 CREATE TABLE IF NOT EXISTS lash_schema_versions (
246 component TEXT PRIMARY KEY,
247 version INTEGER NOT NULL
248 );
249
250 CREATE TABLE IF NOT EXISTS lash_blobs (
251 hash TEXT PRIMARY KEY,
252 content BYTEA NOT NULL
253 );
254
255 CREATE TABLE IF NOT EXISTS lash_sessions (
256 session_id TEXT PRIMARY KEY,
257 head_revision BIGINT NOT NULL DEFAULT 0,
258 head_json TEXT NOT NULL,
259 checkpoint_ref TEXT
260 );
261
262 CREATE TABLE IF NOT EXISTS lash_graph_nodes (
263 session_id TEXT NOT NULL,
264 seq BIGSERIAL,
265 node_id TEXT NOT NULL,
266 node_json TEXT NOT NULL,
267 tombstoned BOOLEAN NOT NULL DEFAULT FALSE,
268 PRIMARY KEY (session_id, node_id)
269 );
270 CREATE INDEX IF NOT EXISTS idx_lash_graph_nodes_seq
271 ON lash_graph_nodes(session_id, seq);
272
273 CREATE TABLE IF NOT EXISTS lash_usage_deltas (
274 seq BIGSERIAL PRIMARY KEY,
275 session_id TEXT NOT NULL,
276 entry_json TEXT NOT NULL
277 );
278
279 CREATE TABLE IF NOT EXISTS lash_session_meta (
280 session_id TEXT PRIMARY KEY,
281 meta_json TEXT NOT NULL
282 );
283
284 CREATE TABLE IF NOT EXISTS lash_runtime_turn_commits (
285 session_id TEXT NOT NULL,
286 turn_id TEXT NOT NULL,
287 turn_commit_hash TEXT NOT NULL,
288 result_json TEXT NOT NULL,
289 committed_at_ms BIGINT NOT NULL,
290 PRIMARY KEY (session_id, turn_id)
291 );
292
293 CREATE TABLE IF NOT EXISTS lash_queued_work_batches (
294 enqueue_seq BIGSERIAL PRIMARY KEY,
295 batch_id TEXT NOT NULL UNIQUE,
296 session_id TEXT NOT NULL,
297 source_key TEXT,
298 delivery_policy TEXT NOT NULL,
299 slot_policy TEXT NOT NULL,
300 merge_key_json TEXT NOT NULL,
301 available_at_ms BIGINT NOT NULL,
302 enqueued_at_ms BIGINT NOT NULL,
303 claim_id TEXT,
304 claim_owner_id TEXT,
305 claim_token TEXT,
306 claim_fencing_token BIGINT NOT NULL DEFAULT 0,
307 claim_claimed_at_ms BIGINT NOT NULL DEFAULT 0,
308 claim_expires_at_ms BIGINT NOT NULL DEFAULT 0,
309 UNIQUE (session_id, source_key)
310 );
311 CREATE INDEX IF NOT EXISTS idx_lash_queued_work_ready
312 ON lash_queued_work_batches(session_id, available_at_ms, enqueue_seq);
313
314 CREATE TABLE IF NOT EXISTS lash_queued_work_items (
315 batch_id TEXT NOT NULL REFERENCES lash_queued_work_batches(batch_id) ON DELETE CASCADE,
316 item_index INTEGER NOT NULL,
317 item_id TEXT NOT NULL,
318 payload_json TEXT NOT NULL,
319 PRIMARY KEY (batch_id, item_index)
320 );
321
322 CREATE TABLE IF NOT EXISTS lash_attachment_manifest (
323 attachment_id TEXT PRIMARY KEY,
324 session_id TEXT NOT NULL,
325 canonical_uri TEXT NOT NULL,
326 intent_at_ms BIGINT NOT NULL,
327 committed_at_ms BIGINT
328 );
329 CREATE INDEX IF NOT EXISTS idx_lash_attachment_manifest_uncommitted
330 ON lash_attachment_manifest(committed_at_ms)
331 WHERE committed_at_ms IS NULL;
332
333 CREATE TABLE IF NOT EXISTS lash_processes (
334 process_id TEXT PRIMARY KEY,
335 registration_hash TEXT NOT NULL,
336 owner_scope_id TEXT NOT NULL,
337 host_profile_id TEXT NOT NULL,
338 created_at_ms BIGINT NOT NULL,
339 updated_at_ms BIGINT NOT NULL,
340 status TEXT NOT NULL,
341 record_json TEXT NOT NULL
342 );
343 CREATE INDEX IF NOT EXISTS idx_lash_processes_status
344 ON lash_processes(status);
345
346 CREATE TABLE IF NOT EXISTS lash_process_events (
347 process_id TEXT NOT NULL REFERENCES lash_processes(process_id) ON DELETE CASCADE,
348 sequence BIGINT NOT NULL,
349 event_type TEXT NOT NULL,
350 payload_hash TEXT NOT NULL,
351 idempotency_key TEXT,
352 occurred_at_ms BIGINT NOT NULL,
353 event_json TEXT NOT NULL,
354 PRIMARY KEY (process_id, sequence)
355 );
356 CREATE UNIQUE INDEX IF NOT EXISTS idx_lash_process_events_key
357 ON lash_process_events(process_id, idempotency_key)
358 WHERE idempotency_key IS NOT NULL;
359
360 CREATE TABLE IF NOT EXISTS lash_process_wake_acks (
361 process_id TEXT NOT NULL REFERENCES lash_processes(process_id) ON DELETE CASCADE,
362 sequence BIGINT NOT NULL,
363 PRIMARY KEY (process_id, sequence)
364 );
365
366 CREATE TABLE IF NOT EXISTS lash_process_handle_grants (
367 session_id TEXT NOT NULL,
368 scope_id TEXT NOT NULL,
369 process_id TEXT NOT NULL REFERENCES lash_processes(process_id) ON DELETE CASCADE,
370 descriptor_json TEXT NOT NULL,
371 PRIMARY KEY (scope_id, process_id)
372 );
373 CREATE INDEX IF NOT EXISTS idx_lash_process_handle_grants_session
374 ON lash_process_handle_grants(session_id);
375 CREATE INDEX IF NOT EXISTS idx_lash_process_handle_grants_process
376 ON lash_process_handle_grants(process_id);
377
378 CREATE TABLE IF NOT EXISTS lash_process_leases (
379 process_id TEXT PRIMARY KEY REFERENCES lash_processes(process_id) ON DELETE CASCADE,
380 lease_owner_id TEXT,
381 lease_token TEXT,
382 lease_fencing_token BIGINT NOT NULL DEFAULT 0,
383 lease_claimed_at_ms BIGINT NOT NULL DEFAULT 0,
384 lease_expires_at_ms BIGINT NOT NULL DEFAULT 0
385 );
386
387 CREATE SEQUENCE IF NOT EXISTS lash_host_event_subscription_seq;
388 CREATE TABLE IF NOT EXISTS lash_host_event_trigger_subscriptions (
389 subscription_id TEXT PRIMARY KEY,
390 session_id TEXT NOT NULL,
391 handle TEXT NOT NULL,
392 source_type TEXT NOT NULL,
393 source_key TEXT NOT NULL,
394 enabled BOOLEAN NOT NULL,
395 created_at_ms BIGINT NOT NULL,
396 updated_at_ms BIGINT NOT NULL,
397 record_json TEXT NOT NULL,
398 UNIQUE(session_id, handle)
399 );
400 CREATE INDEX IF NOT EXISTS idx_lash_host_event_subscriptions_source
401 ON lash_host_event_trigger_subscriptions(source_type, source_key, enabled);
402
403 CREATE TABLE IF NOT EXISTS lash_host_event_occurrences (
404 occurrence_id TEXT PRIMARY KEY,
405 idempotency_key TEXT NOT NULL UNIQUE,
406 request_hash TEXT NOT NULL,
407 source_type TEXT NOT NULL,
408 source_key TEXT NOT NULL,
409 occurred_at_ms BIGINT NOT NULL,
410 record_json TEXT NOT NULL
411 );
412
413 CREATE TABLE IF NOT EXISTS lash_host_event_deliveries (
414 occurrence_id TEXT NOT NULL REFERENCES lash_host_event_occurrences(occurrence_id) ON DELETE CASCADE,
415 subscription_id TEXT NOT NULL REFERENCES lash_host_event_trigger_subscriptions(subscription_id) ON DELETE CASCADE,
416 process_id TEXT NOT NULL,
417 created_at_ms BIGINT NOT NULL,
418 PRIMARY KEY (occurrence_id, subscription_id)
419 );
420
421 CREATE TABLE IF NOT EXISTS lash_lashlang_artifacts (
422 module_ref TEXT PRIMARY KEY,
423 artifact_bytes BYTEA NOT NULL
424 );
425 "#,
426 )
427 .await
428 .map_err(store_sqlx_error)?;
429
430 let existing: Option<i32> =
431 sqlx::query_scalar("SELECT version FROM lash_schema_versions WHERE component = $1")
432 .bind(SCHEMA_COMPONENT)
433 .fetch_optional(&mut *tx)
434 .await
435 .map_err(store_sqlx_error)?;
436 match existing {
437 Some(version) if version == SCHEMA_VERSION => {}
438 Some(version) => {
439 return Err(StoreError::Backend(format!(
440 "Postgres schema component `{SCHEMA_COMPONENT}` has version {version}, expected {SCHEMA_VERSION}"
441 )));
442 }
443 None => {
444 sqlx::query("INSERT INTO lash_schema_versions (component, version) VALUES ($1, $2)")
445 .bind(SCHEMA_COMPONENT)
446 .bind(SCHEMA_VERSION)
447 .execute(&mut *tx)
448 .await
449 .map_err(store_sqlx_error)?;
450 }
451 }
452 tx.commit().await.map_err(store_sqlx_error)
453}
454
455fn current_epoch_ms() -> u64 {
456 SystemTime::now()
457 .duration_since(UNIX_EPOCH)
458 .unwrap_or_default()
459 .as_millis() as u64
460}
461
462fn current_timestamp_string() -> String {
463 let now = SystemTime::now()
464 .duration_since(UNIX_EPOCH)
465 .unwrap_or_default();
466 format!("unix:{}", now.as_secs())
467}
468
469fn store_sqlx_error(err: sqlx::Error) -> StoreError {
470 StoreError::Backend(err.to_string())
471}
472
473fn plugin_sqlx_error(err: sqlx::Error) -> PluginError {
474 PluginError::Session(err.to_string())
475}
476
477fn process_decode_error(err: serde_json::Error) -> PluginError {
478 PluginError::Session(format!("failed to decode process registry row: {err}"))
479}
480
481fn store_decode_json<T: serde::de::DeserializeOwned>(
482 json: &str,
483 what: &str,
484) -> Result<T, StoreError> {
485 serde_json::from_str(json)
486 .map_err(|err| StoreError::Backend(format!("failed to decode {what}: {err}")))
487}
488
489fn encode_json<T: serde::Serialize>(value: &T) -> String {
490 serde_json::to_string(value).expect("persisted state should serialize")
491}
492
493fn encode_msgpack<T: serde::Serialize>(value: &T) -> Vec<u8> {
494 let mut buf = Vec::with_capacity(1024);
495 rmp_serde::encode::write_named(&mut buf, value).expect("value should serialize");
496 buf
497}
498
499fn decode_msgpack<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Option<T> {
500 rmp_serde::from_slice(bytes).ok()
501}
502
503fn block_on_detached<T: Send + 'static>(
504 future: impl std::future::Future<Output = T> + Send + 'static,
505) -> T {
506 std::thread::spawn(move || {
507 tokio::runtime::Builder::new_current_thread()
508 .enable_all()
509 .build()
510 .expect("postgres manifest runtime")
511 .block_on(future)
512 })
513 .join()
514 .expect("postgres manifest thread")
515}
516
517fn merge_token_ledger_entries(entries: Vec<TokenLedgerEntry>) -> Vec<TokenLedgerEntry> {
518 let mut merged = Vec::<TokenLedgerEntry>::new();
519 for entry in entries {
520 if entry.usage.total() == 0 {
521 continue;
522 }
523 if let Some(existing) = merged
524 .iter_mut()
525 .find(|existing| existing.source == entry.source && existing.model == entry.model)
526 {
527 existing.usage.input_tokens += entry.usage.input_tokens;
528 existing.usage.output_tokens += entry.usage.output_tokens;
529 existing.usage.cached_input_tokens += entry.usage.cached_input_tokens;
530 existing.usage.reasoning_tokens += entry.usage.reasoning_tokens;
531 } else {
532 merged.push(entry);
533 }
534 }
535 merged
536}
537
538#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
539struct SessionCheckpointEnvelope {
540 manifest: SessionCheckpoint,
541 tool_state: Option<lash_core::ToolState>,
542 plugin_snapshot: Option<lash_core::PluginSessionSnapshot>,
543 execution_state: Option<Vec<u8>>,
544}
545
546async fn put_blob_tx(
547 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
548 content: &[u8],
549) -> Result<BlobRef, StoreError> {
550 let hash = format!("{:x}", Sha256::digest(content));
551 sqlx::query(
552 "INSERT INTO lash_blobs (hash, content)
553 VALUES ($1, $2)
554 ON CONFLICT (hash) DO NOTHING",
555 )
556 .bind(&hash)
557 .bind(content)
558 .execute(&mut **tx)
559 .await
560 .map_err(store_sqlx_error)?;
561 Ok(BlobRef(hash))
562}
563
564async fn get_blob(pool: &PgPool, blob_ref: &BlobRef) -> Option<Vec<u8>> {
565 sqlx::query_scalar::<_, Vec<u8>>("SELECT content FROM lash_blobs WHERE hash = $1")
566 .bind(blob_ref.as_str())
567 .fetch_optional(pool)
568 .await
569 .ok()
570 .flatten()
571}
572
573async fn put_checkpoint_tx(
574 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
575 checkpoint: &HydratedSessionCheckpoint,
576) -> Result<(BlobRef, SessionCheckpoint), StoreError> {
577 let manifest = SessionCheckpoint {
578 turn_state: checkpoint.turn_state.clone(),
579 tool_state_ref: checkpoint.tool_state_ref.clone(),
580 plugin_snapshot_ref: checkpoint.plugin_snapshot_ref.clone(),
581 plugin_snapshot_revision: checkpoint.plugin_snapshot_revision,
582 execution_state_ref: checkpoint.execution_state_ref.clone(),
583 };
584 let envelope = SessionCheckpointEnvelope {
585 manifest: manifest.clone(),
586 tool_state: checkpoint.tool_state.clone(),
587 plugin_snapshot: checkpoint.plugin_snapshot.clone(),
588 execution_state: checkpoint.execution_state.clone(),
589 };
590 let bytes = encode_msgpack(&envelope);
591 let checkpoint_ref = put_blob_tx(tx, &bytes).await?;
592 Ok((checkpoint_ref, manifest))
593}
594
595async fn get_checkpoint(pool: &PgPool, blob_ref: &BlobRef) -> Option<HydratedSessionCheckpoint> {
596 let bytes = get_blob(pool, blob_ref).await?;
597 let envelope: SessionCheckpointEnvelope = decode_msgpack(&bytes)?;
598 Some(HydratedSessionCheckpoint {
599 turn_state: envelope.manifest.turn_state,
600 tool_state_ref: envelope.manifest.tool_state_ref,
601 tool_state: envelope.tool_state,
602 plugin_snapshot_ref: envelope.manifest.plugin_snapshot_ref,
603 plugin_snapshot: envelope.plugin_snapshot,
604 plugin_snapshot_revision: envelope.manifest.plugin_snapshot_revision,
605 execution_state_ref: envelope.manifest.execution_state_ref,
606 execution_state: envelope.execution_state,
607 })
608}
609
610async fn load_session_head_meta_tx(
611 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
612 session_id: &str,
613 for_update: bool,
614) -> Result<Option<SessionHeadMeta>, StoreError> {
615 let sql = if for_update {
616 "SELECT head_json, head_revision FROM lash_sessions WHERE session_id = $1 FOR UPDATE"
617 } else {
618 "SELECT head_json, head_revision FROM lash_sessions WHERE session_id = $1"
619 };
620 let row = sqlx::query(sql)
621 .bind(session_id)
622 .fetch_optional(&mut **tx)
623 .await
624 .map_err(store_sqlx_error)?;
625 let Some(row) = row else {
626 return Ok(None);
627 };
628 let head_json: String = row.get(0);
629 let head_revision: i64 = row.get(1);
630 let mut meta: SessionHeadMeta = store_decode_json(&head_json, "session head")?;
631 meta.head_revision = head_revision as u64;
632 Ok(Some(meta))
633}
634
635async fn load_usage_deltas(pool: &PgPool, session_id: &str) -> Vec<TokenLedgerEntry> {
636 let rows = sqlx::query(
637 "SELECT entry_json FROM lash_usage_deltas WHERE session_id = $1 ORDER BY seq ASC",
638 )
639 .bind(session_id)
640 .fetch_all(pool)
641 .await
642 .unwrap_or_default();
643 rows.into_iter()
644 .filter_map(|row| {
645 let json: String = row.get(0);
646 serde_json::from_str(&json).ok()
647 })
648 .collect()
649}
650
651async fn load_graph(
652 pool: &PgPool,
653 session_id: &str,
654 leaf_node_id: Option<String>,
655 active_path: bool,
656) -> Result<lash_core::SessionGraph, StoreError> {
657 let rows = sqlx::query(
658 "SELECT node_json FROM lash_graph_nodes
659 WHERE session_id = $1 AND tombstoned = FALSE
660 ORDER BY seq ASC",
661 )
662 .bind(session_id)
663 .fetch_all(pool)
664 .await
665 .map_err(store_sqlx_error)?;
666 let mut nodes = Vec::<SessionNodeRecord>::new();
667 for row in rows {
668 let json: String = row.get(0);
669 nodes.push(store_decode_json(&json, "session graph node")?);
670 }
671 if active_path {
672 if let Some(leaf) = leaf_node_id.clone() {
673 let wanted = active_path_node_ids(&nodes, &leaf);
674 nodes.retain(|node| wanted.contains(&node.node_id));
675 }
676 }
677 Ok(lash_core::SessionGraph::from_nodes(nodes, leaf_node_id))
678}
679
680fn active_path_node_ids(nodes: &[SessionNodeRecord], leaf_node_id: &str) -> HashSet<String> {
681 let mut parent_by_id = std::collections::BTreeMap::new();
682 for node in nodes {
683 parent_by_id.insert(node.node_id.clone(), node.parent_node_id.clone());
684 }
685 let mut wanted = HashSet::new();
686 let mut cursor = Some(leaf_node_id.to_string());
687 while let Some(node_id) = cursor {
688 if !wanted.insert(node_id.clone()) {
689 break;
690 }
691 cursor = parent_by_id.get(&node_id).cloned().flatten();
692 }
693 wanted
694}
695
696async fn commit_attachment_refs_tx(
697 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
698 session_id: &str,
699 attachment_ids: &[AttachmentId],
700) -> Result<(), StoreError> {
701 if attachment_ids.is_empty() {
702 return Ok(());
703 }
704 let now = current_epoch_ms() as i64;
705 for id in attachment_ids {
706 sqlx::query(
707 "UPDATE lash_attachment_manifest
708 SET committed_at_ms = COALESCE(committed_at_ms, $1)
709 WHERE attachment_id = $2 AND session_id = $3",
710 )
711 .bind(now)
712 .bind(id.as_str())
713 .bind(session_id)
714 .execute(&mut **tx)
715 .await
716 .map_err(store_sqlx_error)?;
717 }
718 Ok(())
719}
720
721impl AttachmentManifest for PostgresSessionStore {
722 fn record_intent(&self, intent: AttachmentIntent) -> Result<(), StoreError> {
723 let pool = self.pool.clone();
724 block_on_detached(async move {
725 sqlx::query(
726 "INSERT INTO lash_attachment_manifest (
727 attachment_id, session_id, canonical_uri, intent_at_ms, committed_at_ms
728 )
729 VALUES ($1, $2, $3, $4, NULL)
730 ON CONFLICT (attachment_id) DO UPDATE SET
731 session_id = EXCLUDED.session_id,
732 canonical_uri = EXCLUDED.canonical_uri,
733 intent_at_ms = EXCLUDED.intent_at_ms",
734 )
735 .bind(intent.attachment_id.as_str())
736 .bind(intent.session_id)
737 .bind(intent.canonical_uri)
738 .bind(intent.intent_at_epoch_ms as i64)
739 .execute(&pool)
740 .await
741 .map(|_| ())
742 .map_err(store_sqlx_error)
743 })
744 }
745
746 fn commit_refs(
747 &self,
748 session_id: &str,
749 attachment_ids: &[AttachmentId],
750 ) -> Result<(), StoreError> {
751 let pool = self.pool.clone();
752 let session_id = session_id.to_string();
753 let attachment_ids = attachment_ids.to_vec();
754 block_on_detached(async move {
755 let mut tx = pool.begin().await.map_err(store_sqlx_error)?;
756 commit_attachment_refs_tx(&mut tx, &session_id, &attachment_ids).await?;
757 tx.commit().await.map_err(store_sqlx_error)
758 })
759 }
760
761 fn list_uncommitted(
762 &self,
763 older_than_epoch_ms: u64,
764 ) -> Result<Vec<AttachmentManifestEntry>, StoreError> {
765 let pool = self.pool.clone();
766 block_on_detached(async move {
767 let rows = sqlx::query(
768 "SELECT attachment_id, session_id, canonical_uri, intent_at_ms, committed_at_ms
769 FROM lash_attachment_manifest
770 WHERE committed_at_ms IS NULL AND intent_at_ms <= $1
771 ORDER BY attachment_id ASC",
772 )
773 .bind(older_than_epoch_ms as i64)
774 .fetch_all(&pool)
775 .await
776 .map_err(store_sqlx_error)?;
777 Ok(rows
778 .into_iter()
779 .map(|row| AttachmentManifestEntry {
780 attachment_id: AttachmentId::new(row.get::<String, _>(0)),
781 session_id: row.get(1),
782 canonical_uri: row.get(2),
783 intent_at_epoch_ms: row.get::<i64, _>(3) as u64,
784 committed_at_epoch_ms: row.get::<Option<i64>, _>(4).map(|value| value as u64),
785 })
786 .collect())
787 })
788 }
789
790 fn forget(&self, attachment_id: &AttachmentId) -> Result<(), StoreError> {
791 let pool = self.pool.clone();
792 let attachment_id = attachment_id.to_string();
793 block_on_detached(async move {
794 sqlx::query("DELETE FROM lash_attachment_manifest WHERE attachment_id = $1")
795 .bind(attachment_id)
796 .execute(&pool)
797 .await
798 .map(|_| ())
799 .map_err(store_sqlx_error)
800 })
801 }
802}
803
804#[async_trait::async_trait]
805impl SessionStoreFactory for PostgresSessionStoreFactory {
806 fn durability_tier(&self) -> DurabilityTier {
807 DurabilityTier::Durable
808 }
809
810 async fn create_store(
811 &self,
812 request: &SessionStoreCreateRequest,
813 ) -> Result<Arc<dyn RuntimePersistence>, String> {
814 let store = PostgresSessionStore {
815 pool: self.pool.clone(),
816 session_id: Some(request.session_id.clone()),
817 bound_session: Arc::new(OnceLock::new()),
818 };
819 if store
820 .load_session_meta()
821 .await
822 .map_err(|err| err.to_string())?
823 .is_none()
824 {
825 store
826 .save_session_meta(SessionMeta {
827 session_id: request.session_id.clone(),
828 session_name: request.session_id.clone(),
829 created_at: current_timestamp_string(),
830 model: request.policy.model.id.clone(),
831 cwd: std::env::current_dir()
832 .ok()
833 .and_then(|path| path.to_str().map(str::to_string)),
834 relation: request.relation.clone(),
835 })
836 .await
837 .map_err(|err| err.to_string())?;
838 }
839 Ok(Arc::new(store))
840 }
841
842 async fn delete_session(&self, session_id: &str) -> Result<(), String> {
843 let mut tx = self.pool.begin().await.map_err(|err| err.to_string())?;
844 for sql in [
845 "DELETE FROM lash_queued_work_items WHERE batch_id IN (SELECT batch_id FROM lash_queued_work_batches WHERE session_id = $1)",
846 "DELETE FROM lash_queued_work_batches WHERE session_id = $1",
847 "DELETE FROM lash_usage_deltas WHERE session_id = $1",
848 "DELETE FROM lash_graph_nodes WHERE session_id = $1",
849 "DELETE FROM lash_runtime_turn_commits WHERE session_id = $1",
850 "DELETE FROM lash_session_meta WHERE session_id = $1",
851 "DELETE FROM lash_sessions WHERE session_id = $1",
852 "DELETE FROM lash_attachment_manifest WHERE session_id = $1",
853 ] {
854 sqlx::query(sql)
855 .bind(session_id)
856 .execute(&mut *tx)
857 .await
858 .map_err(|err| err.to_string())?;
859 }
860 tx.commit().await.map_err(|err| err.to_string())
861 }
862}
863
864#[derive(Clone, Debug)]
865struct QueuedBatchRow {
866 enqueue_seq: u64,
867 batch_id: String,
868 session_id: String,
869 source_key: Option<String>,
870 delivery_policy: DeliveryPolicy,
871 slot_policy: SlotPolicy,
872 merge_key: MergeKey,
873 available_at_ms: u64,
874 enqueued_at_ms: u64,
875 claim_fencing_token: u64,
876}
877
878fn queued_batch_row(row: PgRow) -> Result<QueuedBatchRow, StoreError> {
879 let delivery_policy =
880 DeliveryPolicy::from_wire_str(row.get::<String, _>("delivery_policy").as_str())
881 .ok_or_else(|| {
882 StoreError::Backend("invalid queued work delivery policy".to_string())
883 })?;
884 let slot_policy = SlotPolicy::from_wire_str(row.get::<String, _>("slot_policy").as_str())
885 .ok_or_else(|| StoreError::Backend("invalid queued work slot policy".to_string()))?;
886 let merge_json: String = row.get("merge_key_json");
887 Ok(QueuedBatchRow {
888 enqueue_seq: row.get::<i64, _>("enqueue_seq") as u64,
889 batch_id: row.get("batch_id"),
890 session_id: row.get("session_id"),
891 source_key: row.get("source_key"),
892 delivery_policy,
893 slot_policy,
894 merge_key: store_decode_json(&merge_json, "queued work merge key")?,
895 available_at_ms: row.get::<i64, _>("available_at_ms") as u64,
896 enqueued_at_ms: row.get::<i64, _>("enqueued_at_ms") as u64,
897 claim_fencing_token: row.get::<i64, _>("claim_fencing_token") as u64,
898 })
899}
900
901async fn load_queued_batch(
902 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
903 batch_id: &str,
904) -> Result<Option<QueuedWorkBatch>, StoreError> {
905 let row = sqlx::query(
906 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
907 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
908 claim_fencing_token
909 FROM lash_queued_work_batches
910 WHERE batch_id = $1",
911 )
912 .bind(batch_id)
913 .fetch_optional(&mut **tx)
914 .await
915 .map_err(store_sqlx_error)?;
916 let Some(row) = row else {
917 return Ok(None);
918 };
919 let row = queued_batch_row(row)?;
920 queued_work_batch_from_row(tx, row).await.map(Some)
921}
922
923async fn queued_work_batch_from_row(
924 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
925 row: QueuedBatchRow,
926) -> Result<QueuedWorkBatch, StoreError> {
927 let item_rows = sqlx::query(
928 "SELECT item_id, payload_json
929 FROM lash_queued_work_items
930 WHERE batch_id = $1
931 ORDER BY item_index ASC",
932 )
933 .bind(&row.batch_id)
934 .fetch_all(&mut **tx)
935 .await
936 .map_err(store_sqlx_error)?;
937 let mut items = Vec::new();
938 for item in item_rows {
939 let payload_json: String = item.get(1);
940 items.push(QueuedWorkItem {
941 item_id: item.get(0),
942 payload: store_decode_json(&payload_json, "queued work payload")?,
943 });
944 }
945 Ok(QueuedWorkBatch {
946 batch_id: row.batch_id,
947 session_id: row.session_id,
948 enqueue_seq: row.enqueue_seq,
949 source_key: row.source_key,
950 delivery_policy: row.delivery_policy,
951 slot_policy: row.slot_policy,
952 merge_key: row.merge_key,
953 available_at_ms: row.available_at_ms,
954 enqueued_at_ms: row.enqueued_at_ms,
955 items,
956 })
957}
958
959async fn ensure_queued_work_completion_tx(
960 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
961 completed: &QueuedWorkCompletion,
962) -> Result<(), StoreError> {
963 for batch_id in &completed.batch_ids {
964 let exists: Option<i64> = sqlx::query_scalar(
965 "SELECT 1::BIGINT FROM lash_queued_work_batches
966 WHERE session_id = $1
967 AND batch_id = $2
968 AND claim_id = $3
969 AND claim_token = $4
970 LIMIT 1",
971 )
972 .bind(&completed.session_id)
973 .bind(batch_id)
974 .bind(&completed.claim_id)
975 .bind(&completed.lease_token)
976 .fetch_optional(&mut **tx)
977 .await
978 .map_err(store_sqlx_error)?;
979 if exists.is_none() {
980 return Err(StoreError::QueuedWorkClaimExpired {
981 session_id: completed.session_id.clone(),
982 claim_id: completed.claim_id.clone(),
983 });
984 }
985 }
986 Ok(())
987}
988
989#[async_trait::async_trait]
990impl RuntimePersistence for PostgresSessionStore {
991 fn durability_tier(&self) -> DurabilityTier {
992 DurabilityTier::Durable
993 }
994
995 async fn load_session(
996 &self,
997 scope: SessionReadScope,
998 ) -> Result<Option<PersistedSessionRead>, StoreError> {
999 let Some(session_id) = self.selected_session_id().await? else {
1000 return Ok(None);
1001 };
1002 let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1003 let Some(meta) = load_session_head_meta_tx(&mut tx, &session_id, false).await? else {
1004 return Ok(None);
1005 };
1006 tx.commit().await.map_err(store_sqlx_error)?;
1007 let leaf_node_id = match &scope {
1008 SessionReadScope::FullGraph => meta.leaf_node_id.clone(),
1009 SessionReadScope::ActivePath { leaf_node_id } => {
1010 leaf_node_id.clone().or_else(|| meta.leaf_node_id.clone())
1011 }
1012 };
1013 let graph = load_graph(
1014 &self.pool,
1015 &session_id,
1016 leaf_node_id.clone(),
1017 matches!(scope, SessionReadScope::ActivePath { .. }),
1018 )
1019 .await?;
1020 let checkpoint = match meta.checkpoint_ref.as_ref() {
1021 Some(blob_ref) => get_checkpoint(&self.pool, blob_ref).await,
1022 None => None,
1023 };
1024 Ok(Some(PersistedSessionRead {
1025 session_id: meta.session_id,
1026 head_revision: meta.head_revision,
1027 config: meta.config,
1028 agent_frames: meta.agent_frames,
1029 current_agent_frame_id: meta.current_agent_frame_id,
1030 graph,
1031 checkpoint_ref: meta.checkpoint_ref,
1032 checkpoint,
1033 token_ledger: merge_token_ledger_entries(
1034 load_usage_deltas(&self.pool, &session_id).await,
1035 ),
1036 }))
1037 }
1038
1039 async fn load_node(&self, node_id: &str) -> Result<Option<SessionNodeRecord>, StoreError> {
1040 let json: Option<String> = if let Some(session_id) = &self.session_id {
1041 sqlx::query_scalar(
1042 "SELECT node_json FROM lash_graph_nodes
1043 WHERE session_id = $1 AND node_id = $2 AND tombstoned = FALSE",
1044 )
1045 .bind(session_id)
1046 .bind(node_id)
1047 .fetch_optional(&self.pool)
1048 .await
1049 .map_err(store_sqlx_error)?
1050 } else {
1051 sqlx::query_scalar(
1052 "SELECT node_json FROM lash_graph_nodes
1053 WHERE node_id = $1 AND tombstoned = FALSE
1054 ORDER BY session_id ASC
1055 LIMIT 1",
1056 )
1057 .bind(node_id)
1058 .fetch_optional(&self.pool)
1059 .await
1060 .map_err(store_sqlx_error)?
1061 };
1062 json.map(|json| store_decode_json(&json, "session graph node"))
1063 .transpose()
1064 }
1065
1066 async fn commit_runtime_state(
1067 &self,
1068 commit: RuntimeCommit,
1069 ) -> Result<RuntimeCommitResult, StoreError> {
1070 let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1071 let existing = load_session_head_meta_tx(&mut tx, &commit.session_id, false).await?;
1075 if let Some(bound_session_id) = existing.as_ref().map(|meta| meta.session_id.as_str())
1076 && bound_session_id != commit.session_id
1077 {
1078 return Err(StoreError::SessionBindingMismatch {
1079 bound_session_id: bound_session_id.to_string(),
1080 attempted_session_id: commit.session_id,
1081 });
1082 }
1083 let effective_binding = self
1087 .session_id
1088 .clone()
1089 .or_else(|| self.bound_session.get().cloned());
1090 if let Some(bound_session_id) = &effective_binding
1091 && commit.session_id != *bound_session_id
1092 {
1093 return Err(StoreError::SessionBindingMismatch {
1094 bound_session_id: bound_session_id.clone(),
1095 attempted_session_id: commit.session_id,
1096 });
1097 }
1098 if self.session_id.is_none() {
1099 let _ = self.bound_session.set(commit.session_id.clone());
1100 }
1101 if let Some(completed) = &commit.turn_commit {
1102 if completed.session_id != commit.session_id {
1103 return Err(StoreError::RuntimeTurnCommitConflict {
1104 session_id: completed.session_id.clone(),
1105 turn_id: completed.turn_id.clone(),
1106 });
1107 }
1108 let prior = sqlx::query(
1109 "SELECT turn_commit_hash, result_json
1110 FROM lash_runtime_turn_commits
1111 WHERE session_id = $1 AND turn_id = $2",
1112 )
1113 .bind(&completed.session_id)
1114 .bind(&completed.turn_id)
1115 .fetch_optional(&mut *tx)
1116 .await
1117 .map_err(store_sqlx_error)?;
1118 if let Some(row) = prior {
1119 let hash: String = row.get(0);
1120 let result_json: String = row.get(1);
1121 if hash == completed.turn_commit_hash {
1122 return store_decode_json(&result_json, "runtime turn commit result");
1123 }
1124 return Err(StoreError::RuntimeTurnCommitConflict {
1125 session_id: completed.session_id.clone(),
1126 turn_id: completed.turn_id.clone(),
1127 });
1128 }
1129 }
1130 let actual_revision = existing.as_ref().map_or(0, |meta| meta.head_revision);
1131 if commit.expected_head_revision.is_some()
1132 && commit.expected_head_revision != Some(actual_revision)
1133 {
1134 return Err(StoreError::HeadRevisionConflict {
1135 expected: commit.expected_head_revision,
1136 actual: actual_revision,
1137 });
1138 }
1139 for completed in &commit.completed_queue_claims {
1140 if completed.session_id != commit.session_id {
1141 return Err(StoreError::QueuedWorkClaimExpired {
1142 session_id: completed.session_id.clone(),
1143 claim_id: completed.claim_id.clone(),
1144 });
1145 }
1146 ensure_queued_work_completion_tx(&mut tx, completed).await?;
1147 }
1148 let (checkpoint_ref, manifest) = put_checkpoint_tx(&mut tx, &commit.checkpoint).await?;
1149 for entry in &commit.usage_deltas {
1150 sqlx::query("INSERT INTO lash_usage_deltas (session_id, entry_json) VALUES ($1, $2)")
1151 .bind(&commit.session_id)
1152 .bind(encode_json(entry))
1153 .execute(&mut *tx)
1154 .await
1155 .map_err(store_sqlx_error)?;
1156 }
1157 let leaf_node_id = match &commit.graph {
1158 GraphCommitDelta::Unchanged { leaf_node_id } => leaf_node_id.clone(),
1159 GraphCommitDelta::Append {
1160 nodes,
1161 leaf_node_id,
1162 } => {
1163 for node in nodes {
1164 sqlx::query(
1165 "INSERT INTO lash_graph_nodes (session_id, node_id, node_json)
1166 VALUES ($1, $2, $3)
1167 ON CONFLICT (session_id, node_id) DO UPDATE SET
1168 node_json = EXCLUDED.node_json,
1169 tombstoned = FALSE",
1170 )
1171 .bind(&commit.session_id)
1172 .bind(&node.node_id)
1173 .bind(encode_json(node))
1174 .execute(&mut *tx)
1175 .await
1176 .map_err(store_sqlx_error)?;
1177 }
1178 leaf_node_id.clone()
1179 }
1180 GraphCommitDelta::ReplaceFull(graph) => {
1181 sqlx::query("DELETE FROM lash_graph_nodes WHERE session_id = $1")
1182 .bind(&commit.session_id)
1183 .execute(&mut *tx)
1184 .await
1185 .map_err(store_sqlx_error)?;
1186 for node in &graph.nodes {
1187 sqlx::query(
1188 "INSERT INTO lash_graph_nodes (session_id, node_id, node_json)
1189 VALUES ($1, $2, $3)",
1190 )
1191 .bind(&commit.session_id)
1192 .bind(&node.node_id)
1193 .bind(encode_json(node))
1194 .execute(&mut *tx)
1195 .await
1196 .map_err(store_sqlx_error)?;
1197 }
1198 graph.leaf_node_id.clone()
1199 }
1200 };
1201 let graph_node_count: i64 = sqlx::query_scalar(
1202 "SELECT COUNT(*) FROM lash_graph_nodes WHERE session_id = $1 AND tombstoned = FALSE",
1203 )
1204 .bind(&commit.session_id)
1205 .fetch_one(&mut *tx)
1206 .await
1207 .map_err(store_sqlx_error)?;
1208 let next_revision = actual_revision + 1;
1209 let meta = SessionHeadMeta {
1210 session_id: commit.session_id.clone(),
1211 head_revision: next_revision,
1212 config: commit.config.clone(),
1213 agent_frames: commit.agent_frames.clone(),
1214 current_agent_frame_id: commit.current_agent_frame_id.clone(),
1215 checkpoint_ref: Some(checkpoint_ref.clone()),
1216 leaf_node_id,
1217 graph_node_count: graph_node_count as usize,
1218 token_ledger: Vec::new(),
1219 };
1220 let head_write = sqlx::query(
1225 "INSERT INTO lash_sessions (session_id, head_revision, head_json, checkpoint_ref)
1226 VALUES ($1, $2, $3, $4)
1227 ON CONFLICT (session_id) DO UPDATE SET
1228 head_revision = EXCLUDED.head_revision,
1229 head_json = EXCLUDED.head_json,
1230 checkpoint_ref = EXCLUDED.checkpoint_ref
1231 WHERE lash_sessions.head_revision = $5",
1232 )
1233 .bind(&commit.session_id)
1234 .bind(next_revision as i64)
1235 .bind(encode_json(&meta))
1236 .bind(checkpoint_ref.as_str())
1237 .bind(actual_revision as i64)
1238 .execute(&mut *tx)
1239 .await
1240 .map_err(store_sqlx_error)?;
1241 if head_write.rows_affected() == 0 {
1242 return Err(StoreError::HeadRevisionConflict {
1246 expected: commit.expected_head_revision.or(Some(actual_revision)),
1247 actual: actual_revision,
1248 });
1249 }
1250 for completed in &commit.completed_queue_claims {
1251 for batch_id in &completed.batch_ids {
1252 sqlx::query(
1253 "DELETE FROM lash_queued_work_batches
1254 WHERE session_id = $1 AND batch_id = $2 AND claim_id = $3 AND claim_token = $4",
1255 )
1256 .bind(&completed.session_id)
1257 .bind(batch_id)
1258 .bind(&completed.claim_id)
1259 .bind(&completed.lease_token)
1260 .execute(&mut *tx)
1261 .await
1262 .map_err(store_sqlx_error)?;
1263 }
1264 }
1265 commit_attachment_refs_tx(
1266 &mut tx,
1267 &commit.session_id,
1268 &commit.committed_attachment_ids,
1269 )
1270 .await?;
1271 let result = RuntimeCommitResult {
1272 head_revision: next_revision,
1273 checkpoint_ref,
1274 manifest,
1275 };
1276 if let Some(completed) = &commit.turn_commit {
1277 sqlx::query(
1278 "INSERT INTO lash_runtime_turn_commits (
1279 session_id, turn_id, turn_commit_hash, result_json, committed_at_ms
1280 )
1281 VALUES ($1, $2, $3, $4, $5)",
1282 )
1283 .bind(&completed.session_id)
1284 .bind(&completed.turn_id)
1285 .bind(&completed.turn_commit_hash)
1286 .bind(encode_json(&result))
1287 .bind(current_epoch_ms() as i64)
1288 .execute(&mut *tx)
1289 .await
1290 .map_err(store_sqlx_error)?;
1291 }
1292 tx.commit().await.map_err(store_sqlx_error)?;
1293 Ok(result)
1294 }
1295
1296 async fn enqueue_queued_work(
1297 &self,
1298 batch: QueuedWorkBatchDraft,
1299 ) -> Result<QueuedWorkBatch, StoreError> {
1300 let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1301 if let Some(source_key) = batch.source_key.as_deref() {
1302 let existing_id: Option<String> = sqlx::query_scalar(
1303 "SELECT batch_id FROM lash_queued_work_batches
1304 WHERE session_id = $1 AND source_key = $2",
1305 )
1306 .bind(&batch.session_id)
1307 .bind(source_key)
1308 .fetch_optional(&mut *tx)
1309 .await
1310 .map_err(store_sqlx_error)?;
1311 if let Some(batch_id) = existing_id {
1312 let existing = load_queued_batch(&mut tx, &batch_id)
1313 .await?
1314 .ok_or_else(|| {
1315 StoreError::Backend("queued work source row disappeared".to_string())
1316 })?;
1317 tx.commit().await.map_err(store_sqlx_error)?;
1318 return Ok(existing);
1319 }
1320 }
1321 let now = current_epoch_ms();
1322 let batch_id = format!(
1323 "qwb:{:x}",
1324 Sha256::digest(format!("{}:{:?}:{now}", batch.session_id, batch.source_key).as_bytes())
1325 );
1326 let row = sqlx::query_scalar::<_, i64>(
1327 "INSERT INTO lash_queued_work_batches (
1328 batch_id, session_id, source_key, delivery_policy, slot_policy,
1329 merge_key_json, available_at_ms, enqueued_at_ms
1330 )
1331 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
1332 RETURNING enqueue_seq",
1333 )
1334 .bind(&batch_id)
1335 .bind(&batch.session_id)
1336 .bind(&batch.source_key)
1337 .bind(batch.delivery_policy.as_str())
1338 .bind(batch.slot_policy.as_str())
1339 .bind(encode_json(&batch.merge_key))
1340 .bind(batch.available_at_ms as i64)
1341 .bind(now as i64)
1342 .fetch_one(&mut *tx)
1343 .await
1344 .map_err(store_sqlx_error)?;
1345 for (index, payload) in batch.payloads.iter().enumerate() {
1346 let item_id = format!("{batch_id}:item:{index}");
1347 sqlx::query(
1348 "INSERT INTO lash_queued_work_items (batch_id, item_index, item_id, payload_json)
1349 VALUES ($1, $2, $3, $4)",
1350 )
1351 .bind(&batch_id)
1352 .bind(index as i32)
1353 .bind(item_id)
1354 .bind(encode_json(payload))
1355 .execute(&mut *tx)
1356 .await
1357 .map_err(store_sqlx_error)?;
1358 }
1359 let queued = load_queued_batch(&mut tx, &batch_id)
1360 .await?
1361 .ok_or_else(|| StoreError::Backend("queued work insert disappeared".to_string()))?;
1362 debug_assert_eq!(queued.enqueue_seq, row as u64);
1363 tx.commit().await.map_err(store_sqlx_error)?;
1364 Ok(queued)
1365 }
1366
1367 async fn claim_ready_queued_work(
1368 &self,
1369 session_id: &str,
1370 owner_id: &str,
1371 boundary: QueuedWorkClaimBoundary,
1372 lease_ttl_ms: u64,
1373 max_batches: usize,
1374 ) -> Result<Option<QueuedWorkClaim>, StoreError> {
1375 if max_batches == 0 {
1376 return Ok(None);
1377 }
1378 let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1379 let now = current_epoch_ms();
1380 let rows = sqlx::query(
1381 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1382 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1383 claim_fencing_token
1384 FROM lash_queued_work_batches
1385 WHERE session_id = $1
1386 AND available_at_ms <= $2
1387 AND (claim_token IS NULL OR claim_expires_at_ms <= $2)
1388 ORDER BY enqueue_seq ASC
1389 LIMIT $3
1390 FOR UPDATE SKIP LOCKED",
1391 )
1392 .bind(session_id)
1393 .bind(now as i64)
1394 .bind((max_batches as i64).saturating_add(32))
1395 .fetch_all(&mut *tx)
1396 .await
1397 .map_err(store_sqlx_error)?;
1398 let mut candidates = Vec::new();
1399 for row in rows {
1400 candidates.push(queued_batch_row(row)?);
1401 }
1402 let Some(first_row) = candidates.first() else {
1403 tx.commit().await.map_err(store_sqlx_error)?;
1404 return Ok(None);
1405 };
1406 if boundary == QueuedWorkClaimBoundary::ActiveTurnCheckpoint
1407 && first_row.delivery_policy != DeliveryPolicy::EarliestSafeBoundary
1408 {
1409 tx.commit().await.map_err(store_sqlx_error)?;
1410 return Ok(None);
1411 }
1412 let first_slot = first_row.slot_policy;
1413 let first_delivery = first_row.delivery_policy;
1414 let first_merge = first_row.merge_key.clone();
1415 let mut selected = Vec::new();
1416 for row in candidates {
1417 if selected.len() >= max_batches {
1418 break;
1419 }
1420 if selected.is_empty() {
1421 selected.push(row);
1422 if first_slot == SlotPolicy::Exclusive {
1423 break;
1424 }
1425 continue;
1426 }
1427 if first_slot != SlotPolicy::Join
1428 || row.slot_policy != SlotPolicy::Join
1429 || row.delivery_policy != first_delivery
1430 || row.merge_key != first_merge
1431 {
1432 break;
1433 }
1434 selected.push(row);
1435 }
1436 let Some(first) = selected.first() else {
1437 tx.commit().await.map_err(store_sqlx_error)?;
1438 return Ok(None);
1439 };
1440 let fencing_token = first.claim_fencing_token.saturating_add(1);
1441 let claim_id = format!("qwc:{}:{fencing_token}", first.enqueue_seq);
1442 let lease_token = format!(
1443 "{:x}",
1444 Sha256::digest(format!("{session_id}:{owner_id}:{claim_id}:{now}").as_bytes())
1445 );
1446 let expires_at = now.saturating_add(lease_ttl_ms);
1447 for row in &selected {
1448 let changed = sqlx::query(
1449 "UPDATE lash_queued_work_batches
1450 SET claim_id = $3,
1451 claim_owner_id = $4,
1452 claim_token = $5,
1453 claim_fencing_token = claim_fencing_token + 1,
1454 claim_claimed_at_ms = $6,
1455 claim_expires_at_ms = $7
1456 WHERE session_id = $1
1457 AND batch_id = $2
1458 AND (claim_token IS NULL OR claim_expires_at_ms <= $6)",
1459 )
1460 .bind(session_id)
1461 .bind(&row.batch_id)
1462 .bind(&claim_id)
1463 .bind(owner_id)
1464 .bind(&lease_token)
1465 .bind(now as i64)
1466 .bind(expires_at as i64)
1467 .execute(&mut *tx)
1468 .await
1469 .map_err(store_sqlx_error)?
1470 .rows_affected();
1471 if changed == 0 {
1472 tx.rollback().await.map_err(store_sqlx_error)?;
1473 return Ok(None);
1474 }
1475 }
1476 let mut batches = Vec::new();
1477 for row in selected {
1478 batches.push(queued_work_batch_from_row(&mut tx, row).await?);
1479 }
1480 tx.commit().await.map_err(store_sqlx_error)?;
1481 Ok(Some(QueuedWorkClaim {
1482 session_id: session_id.to_string(),
1483 claim_id,
1484 owner_id: owner_id.to_string(),
1485 lease_token,
1486 fencing_token,
1487 claimed_at_epoch_ms: now,
1488 expires_at_epoch_ms: expires_at,
1489 batches,
1490 }))
1491 }
1492
1493 async fn renew_queued_work_claim(
1494 &self,
1495 claim: &QueuedWorkClaim,
1496 lease_ttl_ms: u64,
1497 ) -> Result<QueuedWorkClaim, StoreError> {
1498 let expires_at = current_epoch_ms().saturating_add(lease_ttl_ms);
1499 let changed = sqlx::query(
1500 "UPDATE lash_queued_work_batches
1501 SET claim_expires_at_ms = $4
1502 WHERE session_id = $1 AND claim_id = $2 AND claim_token = $3",
1503 )
1504 .bind(&claim.session_id)
1505 .bind(&claim.claim_id)
1506 .bind(&claim.lease_token)
1507 .bind(expires_at as i64)
1508 .execute(&self.pool)
1509 .await
1510 .map_err(store_sqlx_error)?
1511 .rows_affected();
1512 if changed as usize != claim.batches.len() {
1513 return Err(StoreError::QueuedWorkClaimExpired {
1514 session_id: claim.session_id.clone(),
1515 claim_id: claim.claim_id.clone(),
1516 });
1517 }
1518 Ok(QueuedWorkClaim {
1519 expires_at_epoch_ms: expires_at,
1520 ..claim.clone()
1521 })
1522 }
1523
1524 async fn abandon_queued_work_claim(&self, claim: &QueuedWorkClaim) -> Result<(), StoreError> {
1525 sqlx::query(
1526 "UPDATE lash_queued_work_batches
1527 SET claim_id = NULL,
1528 claim_owner_id = NULL,
1529 claim_token = NULL,
1530 claim_claimed_at_ms = 0,
1531 claim_expires_at_ms = 0
1532 WHERE session_id = $1 AND claim_id = $2 AND claim_token = $3",
1533 )
1534 .bind(&claim.session_id)
1535 .bind(&claim.claim_id)
1536 .bind(&claim.lease_token)
1537 .execute(&self.pool)
1538 .await
1539 .map_err(store_sqlx_error)?;
1540 Ok(())
1541 }
1542
1543 async fn cancel_queued_work_batch(
1544 &self,
1545 session_id: &str,
1546 batch_id: &str,
1547 ) -> Result<Option<QueuedWorkBatch>, StoreError> {
1548 let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1549 let now = current_epoch_ms();
1550 let row = sqlx::query(
1551 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1552 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1553 claim_fencing_token
1554 FROM lash_queued_work_batches
1555 WHERE session_id = $1
1556 AND batch_id = $2
1557 AND (claim_token IS NULL OR claim_expires_at_ms <= $3)
1558 FOR UPDATE",
1559 )
1560 .bind(session_id)
1561 .bind(batch_id)
1562 .bind(now as i64)
1563 .fetch_optional(&mut *tx)
1564 .await
1565 .map_err(store_sqlx_error)?;
1566 let Some(row) = row else {
1567 tx.commit().await.map_err(store_sqlx_error)?;
1568 return Ok(None);
1569 };
1570 let batch = queued_work_batch_from_row(&mut tx, queued_batch_row(row)?).await?;
1571 sqlx::query("DELETE FROM lash_queued_work_batches WHERE batch_id = $1")
1572 .bind(batch_id)
1573 .execute(&mut *tx)
1574 .await
1575 .map_err(store_sqlx_error)?;
1576 tx.commit().await.map_err(store_sqlx_error)?;
1577 Ok(Some(batch))
1578 }
1579
1580 async fn list_queued_work(&self, session_id: &str) -> Result<Vec<QueuedWorkBatch>, StoreError> {
1581 let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1582 let rows = sqlx::query(
1583 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1584 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1585 claim_fencing_token
1586 FROM lash_queued_work_batches
1587 WHERE session_id = $1
1588 ORDER BY enqueue_seq ASC",
1589 )
1590 .bind(session_id)
1591 .fetch_all(&mut *tx)
1592 .await
1593 .map_err(store_sqlx_error)?;
1594 let mut batches = Vec::new();
1595 for row in rows {
1596 batches.push(queued_work_batch_from_row(&mut tx, queued_batch_row(row)?).await?);
1597 }
1598 tx.commit().await.map_err(store_sqlx_error)?;
1599 Ok(batches)
1600 }
1601
1602 async fn list_pending_queued_work(
1603 &self,
1604 session_id: &str,
1605 ) -> Result<Vec<QueuedWorkBatch>, StoreError> {
1606 let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1607 let now = current_epoch_ms();
1608 let rows = sqlx::query(
1609 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1610 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1611 claim_fencing_token
1612 FROM lash_queued_work_batches
1613 WHERE session_id = $1
1614 AND (claim_token IS NULL OR claim_expires_at_ms <= $2)
1615 ORDER BY enqueue_seq ASC",
1616 )
1617 .bind(session_id)
1618 .bind(now as i64)
1619 .fetch_all(&mut *tx)
1620 .await
1621 .map_err(store_sqlx_error)?;
1622 let mut batches = Vec::new();
1623 for row in rows {
1624 batches.push(queued_work_batch_from_row(&mut tx, queued_batch_row(row)?).await?);
1625 }
1626 tx.commit().await.map_err(store_sqlx_error)?;
1627 Ok(batches)
1628 }
1629
1630 async fn save_session_meta(&self, meta: SessionMeta) -> Result<(), StoreError> {
1631 sqlx::query(
1632 "INSERT INTO lash_session_meta (session_id, meta_json)
1633 VALUES ($1, $2)
1634 ON CONFLICT (session_id) DO UPDATE SET meta_json = EXCLUDED.meta_json",
1635 )
1636 .bind(&meta.session_id)
1637 .bind(encode_json(&meta))
1638 .execute(&self.pool)
1639 .await
1640 .map_err(store_sqlx_error)?;
1641 Ok(())
1642 }
1643
1644 async fn load_session_meta(&self) -> Result<Option<SessionMeta>, StoreError> {
1645 let json: Option<String> = if let Some(session_id) = &self.session_id {
1646 sqlx::query_scalar("SELECT meta_json FROM lash_session_meta WHERE session_id = $1")
1647 .bind(session_id)
1648 .fetch_optional(&self.pool)
1649 .await
1650 .map_err(store_sqlx_error)?
1651 } else {
1652 sqlx::query_scalar(
1653 "SELECT meta_json FROM lash_session_meta ORDER BY session_id ASC LIMIT 1",
1654 )
1655 .fetch_optional(&self.pool)
1656 .await
1657 .map_err(store_sqlx_error)?
1658 };
1659 json.map(|json| store_decode_json(&json, "session meta"))
1660 .transpose()
1661 }
1662
1663 async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), StoreError> {
1664 for id in ids {
1665 if let Some(session_id) = &self.session_id {
1666 sqlx::query(
1667 "UPDATE lash_graph_nodes
1668 SET tombstoned = TRUE
1669 WHERE session_id = $1 AND node_id = $2",
1670 )
1671 .bind(session_id)
1672 .bind(id)
1673 .execute(&self.pool)
1674 .await
1675 .map_err(store_sqlx_error)?;
1676 } else {
1677 sqlx::query(
1678 "UPDATE lash_graph_nodes
1679 SET tombstoned = TRUE
1680 WHERE node_id = $1",
1681 )
1682 .bind(id)
1683 .execute(&self.pool)
1684 .await
1685 .map_err(store_sqlx_error)?;
1686 }
1687 }
1688 Ok(())
1689 }
1690
1691 async fn vacuum(&self) -> Result<VacuumReport, StoreError> {
1692 let removed = if let Some(session_id) = &self.session_id {
1693 sqlx::query("DELETE FROM lash_graph_nodes WHERE session_id = $1 AND tombstoned = TRUE")
1694 .bind(session_id)
1695 .execute(&self.pool)
1696 .await
1697 .map_err(store_sqlx_error)?
1698 .rows_affected()
1699 } else {
1700 sqlx::query("DELETE FROM lash_graph_nodes WHERE tombstoned = TRUE")
1701 .execute(&self.pool)
1702 .await
1703 .map_err(store_sqlx_error)?
1704 .rows_affected()
1705 };
1706 Ok(VacuumReport {
1707 removed_node_count: removed as usize,
1708 })
1709 }
1710
1711 async fn gc_unreachable(&self) -> Result<GcReport, StoreError> {
1712 Ok(GcReport::default())
1713 }
1714}
1715
1716fn process_status_label(record: &ProcessRecord) -> &'static str {
1717 record.status.label()
1718}
1719
1720async fn load_process_tx(
1721 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1722 process_id: &str,
1723) -> Result<Option<ProcessRecord>, PluginError> {
1724 let json: Option<String> = sqlx::query_scalar(
1725 "SELECT record_json
1726 FROM lash_processes
1727 WHERE process_id = $1
1728 FOR UPDATE",
1729 )
1730 .bind(process_id)
1731 .fetch_optional(&mut **tx)
1732 .await
1733 .map_err(plugin_sqlx_error)?;
1734 json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
1735 .transpose()
1736}
1737
1738async fn load_process(
1739 pool: &PgPool,
1740 process_id: &str,
1741) -> Result<Option<ProcessRecord>, PluginError> {
1742 let json: Option<String> =
1743 sqlx::query_scalar("SELECT record_json FROM lash_processes WHERE process_id = $1")
1744 .bind(process_id)
1745 .fetch_optional(pool)
1746 .await
1747 .map_err(plugin_sqlx_error)?;
1748 json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
1749 .transpose()
1750}
1751
1752async fn save_process_tx(
1753 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1754 record: &ProcessRecord,
1755) -> Result<(), PluginError> {
1756 sqlx::query(
1757 "UPDATE lash_processes
1758 SET updated_at_ms = $2, status = $3, record_json = $4
1759 WHERE process_id = $1",
1760 )
1761 .bind(&record.id)
1762 .bind(record.updated_at_ms as i64)
1763 .bind(process_status_label(record))
1764 .bind(serde_json::to_string(record).map_err(process_decode_error)?)
1765 .execute(&mut **tx)
1766 .await
1767 .map_err(plugin_sqlx_error)?;
1768 Ok(())
1769}
1770
1771async fn load_event_by_key_tx(
1772 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1773 process_id: &str,
1774 replay_key: &str,
1775) -> Result<Option<(String, ProcessEvent)>, PluginError> {
1776 let row = sqlx::query(
1777 "SELECT payload_hash, event_json
1778 FROM lash_process_events
1779 WHERE process_id = $1 AND idempotency_key = $2",
1780 )
1781 .bind(process_id)
1782 .bind(replay_key)
1783 .fetch_optional(&mut **tx)
1784 .await
1785 .map_err(plugin_sqlx_error)?;
1786 row.map(|row| {
1787 let hash: String = row.get(0);
1788 let json: String = row.get(1);
1789 serde_json::from_str(&json)
1790 .map(|event| (hash, event))
1791 .map_err(process_decode_error)
1792 })
1793 .transpose()
1794}
1795
1796async fn load_process_lease_tx(
1797 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1798 process_id: &str,
1799) -> Result<Option<ProcessLease>, PluginError> {
1800 let row = sqlx::query(
1801 "SELECT lease_owner_id, lease_token, lease_fencing_token,
1802 lease_claimed_at_ms, lease_expires_at_ms
1803 FROM lash_process_leases
1804 WHERE process_id = $1",
1805 )
1806 .bind(process_id)
1807 .fetch_optional(&mut **tx)
1808 .await
1809 .map_err(plugin_sqlx_error)?;
1810 let Some(row) = row else {
1811 return Ok(None);
1812 };
1813 let owner_id: Option<String> = row.get(0);
1814 let lease_token: Option<String> = row.get(1);
1815 let (Some(owner_id), Some(lease_token)) = (owner_id, lease_token) else {
1816 return Ok(None);
1817 };
1818 Ok(Some(ProcessLease {
1819 schema_version: PROCESS_LEASE_SCHEMA_VERSION,
1820 process_id: process_id.to_string(),
1821 owner_id,
1822 lease_token,
1823 fencing_token: row.get::<i64, _>(2) as u64,
1824 claimed_at_epoch_ms: row.get::<i64, _>(3) as u64,
1825 expires_at_epoch_ms: row.get::<i64, _>(4) as u64,
1826 }))
1827}
1828
1829fn process_lease_conflict(process_id: &str, current: &ProcessLease) -> PluginError {
1830 PluginError::Session(format!(
1831 "process `{process_id}` is already leased by `{}` until {}",
1832 current.owner_id, current.expires_at_epoch_ms
1833 ))
1834}
1835
1836fn process_lease_expired(process_id: &str) -> PluginError {
1837 PluginError::Session(format!(
1838 "process lease for `{process_id}` is missing or expired"
1839 ))
1840}
1841
1842fn guard_lease(current: Option<&ProcessLease>, lease_token: &str, now: u64) -> bool {
1843 current
1844 .map(|current| current.lease_token == lease_token && current.expires_at_epoch_ms > now)
1845 .unwrap_or(false)
1846}
1847
1848async fn list_grants_for_scope(
1849 pool: &PgPool,
1850 owner_scope: &ProcessScope,
1851 live_only: bool,
1852) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
1853 let status_clause = if live_only {
1854 "AND p.status = 'running'"
1855 } else {
1856 ""
1857 };
1858 let sql = format!(
1859 "SELECT g.process_id, g.descriptor_json, p.record_json
1860 FROM lash_process_handle_grants g
1861 JOIN lash_processes p ON p.process_id = g.process_id
1862 WHERE g.scope_id = $1 {status_clause}
1863 ORDER BY g.process_id ASC"
1864 );
1865 let rows = sqlx::query(&sql)
1866 .bind(owner_scope.id().as_str())
1867 .fetch_all(pool)
1868 .await
1869 .map_err(plugin_sqlx_error)?;
1870 let mut entries = Vec::new();
1871 for row in rows {
1872 let process_id: String = row.get(0);
1873 let descriptor_json: String = row.get(1);
1874 let record_json: String = row.get(2);
1875 let descriptor: ProcessHandleDescriptor =
1876 serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
1877 let record: ProcessRecord =
1878 serde_json::from_str(&record_json).map_err(process_decode_error)?;
1879 entries.push((
1880 ProcessHandleGrant {
1881 session_id: owner_scope.session_id.clone(),
1882 process_id,
1883 descriptor,
1884 },
1885 record,
1886 ));
1887 }
1888 Ok(entries)
1889}
1890
1891#[async_trait::async_trait]
1892impl ProcessRegistry for PostgresProcessRegistry {
1893 fn durability_tier(&self) -> DurabilityTier {
1894 DurabilityTier::Durable
1895 }
1896
1897 async fn register_process(
1898 &self,
1899 registration: ProcessRegistration,
1900 ) -> Result<ProcessRecord, PluginError> {
1901 let (registration, registration_hash) =
1902 lash_core::runtime::prepare_process_registration(registration)?;
1903 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
1904 if let Some(existing) = load_process_tx(&mut tx, ®istration.id).await? {
1905 if existing.registration_hash == registration_hash {
1906 tx.commit().await.map_err(plugin_sqlx_error)?;
1907 return Ok(existing);
1908 }
1909 return Err(PluginError::Session(format!(
1910 "process `{}` registration hash conflict: existing {}, new {}",
1911 registration.id, existing.registration_hash, registration_hash
1912 )));
1913 }
1914 let now = current_epoch_ms();
1915 let record =
1916 ProcessRecord::from_prepared_registration(registration, registration_hash, now);
1917 let record_json = serde_json::to_string(&record).map_err(process_decode_error)?;
1918 sqlx::query(
1919 "INSERT INTO lash_processes (
1920 process_id, registration_hash, owner_scope_id, host_profile_id,
1921 created_at_ms, updated_at_ms, status, record_json
1922 )
1923 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
1924 )
1925 .bind(&record.id)
1926 .bind(&record.registration_hash)
1927 .bind(record.owner_scope_id().as_str())
1928 .bind(record.host_profile_id())
1929 .bind(record.created_at_ms as i64)
1930 .bind(record.updated_at_ms as i64)
1931 .bind(process_status_label(&record))
1932 .bind(record_json)
1933 .execute(&mut *tx)
1934 .await
1935 .map_err(plugin_sqlx_error)?;
1936 tx.commit().await.map_err(plugin_sqlx_error)?;
1937 Ok(record)
1938 }
1939
1940 async fn set_external_ref(
1941 &self,
1942 process_id: &str,
1943 external_ref: ProcessExternalRef,
1944 ) -> Result<ProcessRecord, PluginError> {
1945 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
1946 let mut record = load_process_tx(&mut tx, process_id)
1947 .await?
1948 .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
1949 record.external_ref = Some(external_ref);
1950 record.updated_at_ms = current_epoch_ms();
1951 save_process_tx(&mut tx, &record).await?;
1952 tx.commit().await.map_err(plugin_sqlx_error)?;
1953 Ok(record)
1954 }
1955
1956 async fn grant_handle(
1957 &self,
1958 owner_scope: &ProcessScope,
1959 process_id: &str,
1960 descriptor: ProcessHandleDescriptor,
1961 ) -> Result<ProcessHandleGrant, PluginError> {
1962 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
1963 if load_process_tx(&mut tx, process_id).await?.is_none() {
1964 return Err(PluginError::Session(format!(
1965 "unknown process `{process_id}`"
1966 )));
1967 }
1968 sqlx::query(
1969 "INSERT INTO lash_process_handle_grants (session_id, scope_id, process_id, descriptor_json)
1970 VALUES ($1, $2, $3, $4)
1971 ON CONFLICT (scope_id, process_id) DO UPDATE SET
1972 session_id = EXCLUDED.session_id,
1973 descriptor_json = EXCLUDED.descriptor_json",
1974 )
1975 .bind(&owner_scope.session_id)
1976 .bind(owner_scope.id().as_str())
1977 .bind(process_id)
1978 .bind(serde_json::to_string(&descriptor).map_err(process_decode_error)?)
1979 .execute(&mut *tx)
1980 .await
1981 .map_err(plugin_sqlx_error)?;
1982 tx.commit().await.map_err(plugin_sqlx_error)?;
1983 Ok(ProcessHandleGrant {
1984 session_id: owner_scope.session_id.clone(),
1985 process_id: process_id.to_string(),
1986 descriptor,
1987 })
1988 }
1989
1990 async fn revoke_handle(
1991 &self,
1992 owner_scope: &ProcessScope,
1993 process_id: &str,
1994 ) -> Result<(), PluginError> {
1995 sqlx::query(
1996 "DELETE FROM lash_process_handle_grants WHERE scope_id = $1 AND process_id = $2",
1997 )
1998 .bind(owner_scope.id().as_str())
1999 .bind(process_id)
2000 .execute(&self.pool)
2001 .await
2002 .map_err(plugin_sqlx_error)?;
2003 Ok(())
2004 }
2005
2006 async fn transfer_handle_grants(
2007 &self,
2008 from_scope: &ProcessScope,
2009 to_scope: &ProcessScope,
2010 process_ids: &[String],
2011 ) -> Result<(), PluginError> {
2012 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2013 for process_id in process_ids {
2014 let descriptor_json: Option<String> = sqlx::query_scalar(
2015 "SELECT descriptor_json FROM lash_process_handle_grants
2016 WHERE scope_id = $1 AND process_id = $2",
2017 )
2018 .bind(from_scope.id().as_str())
2019 .bind(process_id)
2020 .fetch_optional(&mut *tx)
2021 .await
2022 .map_err(plugin_sqlx_error)?;
2023 let Some(descriptor_json) = descriptor_json else {
2024 return Err(PluginError::Session(format!(
2025 "process handle `{process_id}` is not granted to session `{}`",
2026 from_scope.session_id
2027 )));
2028 };
2029 sqlx::query(
2030 "DELETE FROM lash_process_handle_grants WHERE scope_id = $1 AND process_id = $2",
2031 )
2032 .bind(from_scope.id().as_str())
2033 .bind(process_id)
2034 .execute(&mut *tx)
2035 .await
2036 .map_err(plugin_sqlx_error)?;
2037 sqlx::query(
2038 "INSERT INTO lash_process_handle_grants (session_id, scope_id, process_id, descriptor_json)
2039 VALUES ($1, $2, $3, $4)
2040 ON CONFLICT (scope_id, process_id) DO UPDATE SET
2041 session_id = EXCLUDED.session_id,
2042 descriptor_json = EXCLUDED.descriptor_json",
2043 )
2044 .bind(&to_scope.session_id)
2045 .bind(to_scope.id().as_str())
2046 .bind(process_id)
2047 .bind(descriptor_json)
2048 .execute(&mut *tx)
2049 .await
2050 .map_err(plugin_sqlx_error)?;
2051 }
2052 tx.commit().await.map_err(plugin_sqlx_error)
2053 }
2054
2055 async fn list_handle_grants(
2056 &self,
2057 owner_scope: &ProcessScope,
2058 ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
2059 list_grants_for_scope(&self.pool, owner_scope, false).await
2060 }
2061
2062 async fn list_live_handle_grants(
2063 &self,
2064 owner_scope: &ProcessScope,
2065 ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
2066 list_grants_for_scope(&self.pool, owner_scope, true).await
2067 }
2068
2069 async fn has_handle_grant(
2070 &self,
2071 owner_scope: &ProcessScope,
2072 process_id: &str,
2073 ) -> Result<bool, PluginError> {
2074 let exists: Option<i64> = sqlx::query_scalar(
2075 "SELECT 1::BIGINT FROM lash_process_handle_grants
2076 WHERE scope_id = $1 AND process_id = $2
2077 LIMIT 1",
2078 )
2079 .bind(owner_scope.id().as_str())
2080 .bind(process_id)
2081 .fetch_optional(&self.pool)
2082 .await
2083 .map_err(plugin_sqlx_error)?;
2084 Ok(exists.is_some())
2085 }
2086
2087 async fn handle_grants_for_process(
2088 &self,
2089 process_id: &str,
2090 ) -> Result<Vec<ProcessHandleGrant>, PluginError> {
2091 if load_process(&self.pool, process_id).await?.is_none() {
2092 return Err(PluginError::Session(format!(
2093 "unknown process `{process_id}`"
2094 )));
2095 }
2096 let rows = sqlx::query(
2097 "SELECT session_id, descriptor_json
2098 FROM lash_process_handle_grants
2099 WHERE process_id = $1
2100 ORDER BY session_id ASC, scope_id ASC",
2101 )
2102 .bind(process_id)
2103 .fetch_all(&self.pool)
2104 .await
2105 .map_err(plugin_sqlx_error)?;
2106 let mut grants = Vec::new();
2107 for row in rows {
2108 let descriptor_json: String = row.get(1);
2109 grants.push(ProcessHandleGrant {
2110 session_id: row.get(0),
2111 process_id: process_id.to_string(),
2112 descriptor: serde_json::from_str(&descriptor_json).map_err(process_decode_error)?,
2113 });
2114 }
2115 Ok(grants)
2116 }
2117
2118 async fn delete_session_process_state(
2119 &self,
2120 session_id: &str,
2121 ) -> Result<lash_core::ProcessSessionDeleteReport, PluginError> {
2122 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2123 let rows = sqlx::query(
2124 "SELECT g.process_id, p.record_json
2125 FROM lash_process_handle_grants g
2126 JOIN lash_processes p ON p.process_id = g.process_id
2127 WHERE g.session_id = $1
2128 ORDER BY g.process_id ASC",
2129 )
2130 .bind(session_id)
2131 .fetch_all(&mut *tx)
2132 .await
2133 .map_err(plugin_sqlx_error)?;
2134 let mut removed = Vec::new();
2135 for row in rows {
2136 let process_id: String = row.get(0);
2137 let record_json: String = row.get(1);
2138 let record: ProcessRecord =
2139 serde_json::from_str(&record_json).map_err(process_decode_error)?;
2140 removed.push((process_id, record));
2141 }
2142 let revoked = sqlx::query("DELETE FROM lash_process_handle_grants WHERE session_id = $1")
2143 .bind(session_id)
2144 .execute(&mut *tx)
2145 .await
2146 .map_err(plugin_sqlx_error)?
2147 .rows_affected() as usize;
2148 let mut cancel_process_ids = Vec::new();
2149 let mut preserved_process_ids = Vec::new();
2150 for (process_id, record) in removed {
2151 if record.is_terminal() {
2152 continue;
2153 }
2154 let remaining: i64 = sqlx::query_scalar(
2155 "SELECT COUNT(*) FROM lash_process_handle_grants WHERE process_id = $1",
2156 )
2157 .bind(&process_id)
2158 .fetch_one(&mut *tx)
2159 .await
2160 .map_err(plugin_sqlx_error)?;
2161 if remaining == 0 {
2162 cancel_process_ids.push(process_id);
2163 } else {
2164 preserved_process_ids.push(process_id);
2165 }
2166 }
2167 tx.commit().await.map_err(plugin_sqlx_error)?;
2168 cancel_process_ids.sort();
2169 cancel_process_ids.dedup();
2170 preserved_process_ids.sort();
2171 preserved_process_ids.dedup();
2172 Ok(lash_core::ProcessSessionDeleteReport {
2173 session_id: session_id.to_string(),
2174 revoked_handle_count: revoked,
2175 deleted_wake_count: 0,
2176 cancel_process_ids,
2177 preserved_process_ids,
2178 })
2179 }
2180
2181 async fn append_event(
2182 &self,
2183 process_id: &str,
2184 request: ProcessEventAppendRequest,
2185 ) -> Result<ProcessEventAppendResult, PluginError> {
2186 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2187 let mut record = load_process_tx(&mut tx, process_id)
2188 .await?
2189 .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
2190 let replay_lookup =
2191 if let Some(replay_key) = request.replay.as_ref().map(|r| r.key.as_str()) {
2192 load_event_by_key_tx(&mut tx, process_id, replay_key).await?
2193 } else {
2194 None
2195 };
2196 let sequence: i64 = sqlx::query_scalar(
2197 "SELECT COALESCE(MAX(sequence), 0) + 1 FROM lash_process_events WHERE process_id = $1",
2198 )
2199 .bind(process_id)
2200 .fetch_one(&mut *tx)
2201 .await
2202 .map_err(plugin_sqlx_error)?;
2203 let occurred_at_ms = current_epoch_ms();
2204 let prepared = lash_core::runtime::prepare_process_event_append(
2205 &record,
2206 request,
2207 sequence as u64,
2208 replay_lookup,
2209 occurred_at_ms,
2210 )?;
2211 if prepared.replayed {
2212 let repaired = if let Some(status) = prepared.status_update.clone() {
2213 record.status = status;
2214 record.updated_at_ms = prepared.occurred_at_ms;
2215 save_process_tx(&mut tx, &record).await?;
2216 true
2217 } else {
2218 false
2219 };
2220 tx.commit().await.map_err(plugin_sqlx_error)?;
2221 if repaired {
2222 self.notify.notify_waiters();
2223 }
2224 return Ok(ProcessEventAppendResult {
2225 event: prepared.event,
2226 wake_delivery: prepared.wake_delivery,
2227 });
2228 }
2229 let event = prepared.event;
2230 sqlx::query(
2231 "INSERT INTO lash_process_events (
2232 process_id, sequence, event_type, payload_hash, idempotency_key,
2233 occurred_at_ms, event_json
2234 )
2235 VALUES ($1, $2, $3, $4, $5, $6, $7)",
2236 )
2237 .bind(process_id)
2238 .bind(sequence)
2239 .bind(event.event_type.as_str())
2240 .bind(&prepared.payload_hash)
2241 .bind(event.invocation.replay_key())
2242 .bind(prepared.occurred_at_ms as i64)
2243 .bind(serde_json::to_string(&event).map_err(process_decode_error)?)
2244 .execute(&mut *tx)
2245 .await
2246 .map_err(plugin_sqlx_error)?;
2247 if let Some(status) = prepared.status_update.clone() {
2248 record.status = status;
2249 }
2250 record.updated_at_ms = prepared.occurred_at_ms;
2251 save_process_tx(&mut tx, &record).await?;
2252 tx.commit().await.map_err(plugin_sqlx_error)?;
2253 self.notify.notify_waiters();
2254 Ok(ProcessEventAppendResult {
2255 event,
2256 wake_delivery: prepared.wake_delivery,
2257 })
2258 }
2259
2260 async fn events_after(
2261 &self,
2262 process_id: &str,
2263 after_sequence: u64,
2264 ) -> Result<Vec<ProcessEvent>, PluginError> {
2265 if load_process(&self.pool, process_id).await?.is_none() {
2266 return Err(PluginError::Session(format!(
2267 "unknown process `{process_id}`"
2268 )));
2269 }
2270 let rows = sqlx::query(
2271 "SELECT event_json FROM lash_process_events
2272 WHERE process_id = $1 AND sequence > $2
2273 ORDER BY sequence ASC",
2274 )
2275 .bind(process_id)
2276 .bind(after_sequence as i64)
2277 .fetch_all(&self.pool)
2278 .await
2279 .map_err(plugin_sqlx_error)?;
2280 let mut events = Vec::new();
2281 for row in rows {
2282 let json: String = row.get(0);
2283 events.push(serde_json::from_str(&json).map_err(process_decode_error)?);
2284 }
2285 Ok(events)
2286 }
2287
2288 async fn wake_events_after(
2289 &self,
2290 process_id: &str,
2291 after_sequence: u64,
2292 ) -> Result<Vec<ProcessEvent>, PluginError> {
2293 let rows = sqlx::query("SELECT sequence FROM lash_process_wake_acks WHERE process_id = $1")
2294 .bind(process_id)
2295 .fetch_all(&self.pool)
2296 .await
2297 .map_err(plugin_sqlx_error)?;
2298 let acked = rows
2299 .into_iter()
2300 .map(|row| row.get::<i64, _>(0) as u64)
2301 .collect::<HashSet<_>>();
2302 Ok(self
2303 .events_after(process_id, after_sequence)
2304 .await?
2305 .into_iter()
2306 .filter(|event| event.semantics.wake.is_some() && !acked.contains(&event.sequence))
2307 .collect())
2308 }
2309
2310 async fn wait_event_after(
2311 &self,
2312 process_id: &str,
2313 event_type: &str,
2314 after_sequence: u64,
2315 ) -> Result<ProcessEvent, PluginError> {
2316 loop {
2317 if let Some(event) = self
2318 .events_after(process_id, after_sequence)
2319 .await?
2320 .into_iter()
2321 .find(|event| event.event_type == event_type)
2322 {
2323 return Ok(event);
2324 }
2325 tokio::select! {
2326 _ = self.notify.notified() => {}
2327 _ = tokio::time::sleep(Duration::from_millis(50)) => {}
2328 }
2329 }
2330 }
2331
2332 async fn await_process(&self, process_id: &str) -> Result<ProcessAwaitOutput, PluginError> {
2333 loop {
2334 let record = load_process(&self.pool, process_id)
2335 .await?
2336 .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
2337 if let Some(await_output) = record.status.await_output() {
2338 return Ok(await_output.clone());
2339 }
2340 tokio::select! {
2341 _ = self.notify.notified() => {}
2342 _ = tokio::time::sleep(Duration::from_millis(50)) => {}
2343 }
2344 }
2345 }
2346
2347 async fn complete_process(
2348 &self,
2349 process_id: &str,
2350 await_output: ProcessAwaitOutput,
2351 ) -> Result<ProcessRecord, PluginError> {
2352 let event_type = match await_output.terminal_state() {
2353 lash_core::ProcessTerminalState::Completed => "process.completed",
2354 lash_core::ProcessTerminalState::Failed => "process.failed",
2355 lash_core::ProcessTerminalState::Cancelled => "process.cancelled",
2356 };
2357 self.append_event(
2358 process_id,
2359 ProcessEventAppendRequest::new(
2360 event_type,
2361 serde_json::json!({ "await_output": await_output }),
2362 )
2363 .with_replay_key(format!("process:{process_id}:terminal:{event_type}")),
2364 )
2365 .await?;
2366 load_process(&self.pool, process_id).await?.ok_or_else(|| {
2367 PluginError::Session(format!(
2368 "unknown process `{process_id}` after terminal event"
2369 ))
2370 })
2371 }
2372
2373 async fn get_process(&self, process_id: &str) -> Option<ProcessRecord> {
2374 load_process(&self.pool, process_id).await.ok().flatten()
2375 }
2376
2377 async fn ack_wake(&self, process_id: &str, sequence: u64) -> Result<(), PluginError> {
2378 if load_process(&self.pool, process_id).await?.is_none() {
2379 return Err(PluginError::Session(format!(
2380 "unknown process `{process_id}`"
2381 )));
2382 }
2383 sqlx::query(
2384 "INSERT INTO lash_process_wake_acks (process_id, sequence)
2385 VALUES ($1, $2)
2386 ON CONFLICT DO NOTHING",
2387 )
2388 .bind(process_id)
2389 .bind(sequence as i64)
2390 .execute(&self.pool)
2391 .await
2392 .map_err(plugin_sqlx_error)?;
2393 Ok(())
2394 }
2395
2396 async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, PluginError> {
2397 let rows = sqlx::query(
2398 "SELECT record_json FROM lash_processes
2399 WHERE status = 'running'
2400 ORDER BY process_id ASC",
2401 )
2402 .fetch_all(&self.pool)
2403 .await
2404 .map_err(plugin_sqlx_error)?;
2405 let mut records = Vec::new();
2406 for row in rows {
2407 let json: String = row.get(0);
2408 records.push(serde_json::from_str(&json).map_err(process_decode_error)?);
2409 }
2410 Ok(records)
2411 }
2412
2413 async fn claim_process_lease(
2414 &self,
2415 process_id: &str,
2416 owner_id: &str,
2417 lease_ttl_ms: u64,
2418 ) -> Result<ProcessLease, PluginError> {
2419 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2420 if load_process_tx(&mut tx, process_id).await?.is_none() {
2421 return Err(PluginError::Session(format!(
2422 "unknown process `{process_id}`"
2423 )));
2424 }
2425 let now = current_epoch_ms();
2426 let current = load_process_lease_tx(&mut tx, process_id).await?;
2427 if let Some(current) = current.as_ref()
2428 && current.expires_at_epoch_ms > now
2429 && current.owner_id != owner_id
2430 {
2431 return Err(process_lease_conflict(process_id, current));
2432 }
2433 let existing_fence: Option<i64> = sqlx::query_scalar(
2434 "SELECT lease_fencing_token FROM lash_process_leases WHERE process_id = $1 FOR UPDATE",
2435 )
2436 .bind(process_id)
2437 .fetch_optional(&mut *tx)
2438 .await
2439 .map_err(plugin_sqlx_error)?;
2440 let fencing_token = existing_fence.unwrap_or(0) as u64 + 1;
2441 let lease = ProcessLease {
2442 schema_version: PROCESS_LEASE_SCHEMA_VERSION,
2443 process_id: process_id.to_string(),
2444 owner_id: owner_id.to_string(),
2445 lease_token: format!(
2446 "{:x}",
2447 Sha256::digest(format!("{process_id}:{owner_id}:{now}:{fencing_token}").as_bytes())
2448 ),
2449 fencing_token,
2450 claimed_at_epoch_ms: now,
2451 expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
2452 };
2453 sqlx::query(
2454 "INSERT INTO lash_process_leases (
2455 process_id, lease_owner_id, lease_token, lease_fencing_token,
2456 lease_claimed_at_ms, lease_expires_at_ms
2457 )
2458 VALUES ($1, $2, $3, $4, $5, $6)
2459 ON CONFLICT (process_id) DO UPDATE SET
2460 lease_owner_id = EXCLUDED.lease_owner_id,
2461 lease_token = EXCLUDED.lease_token,
2462 lease_fencing_token = EXCLUDED.lease_fencing_token,
2463 lease_claimed_at_ms = EXCLUDED.lease_claimed_at_ms,
2464 lease_expires_at_ms = EXCLUDED.lease_expires_at_ms",
2465 )
2466 .bind(&lease.process_id)
2467 .bind(&lease.owner_id)
2468 .bind(&lease.lease_token)
2469 .bind(lease.fencing_token as i64)
2470 .bind(lease.claimed_at_epoch_ms as i64)
2471 .bind(lease.expires_at_epoch_ms as i64)
2472 .execute(&mut *tx)
2473 .await
2474 .map_err(plugin_sqlx_error)?;
2475 tx.commit().await.map_err(plugin_sqlx_error)?;
2476 Ok(lease)
2477 }
2478
2479 async fn renew_process_lease(
2480 &self,
2481 lease: &ProcessLease,
2482 lease_ttl_ms: u64,
2483 ) -> Result<ProcessLease, PluginError> {
2484 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2485 let now = current_epoch_ms();
2486 let current = load_process_lease_tx(&mut tx, &lease.process_id).await?;
2487 if !guard_lease(current.as_ref(), &lease.lease_token, now) {
2488 return Err(process_lease_expired(&lease.process_id));
2489 }
2490 let renewed = ProcessLease {
2491 expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
2492 ..lease.clone()
2493 };
2494 sqlx::query(
2495 "UPDATE lash_process_leases
2496 SET lease_expires_at_ms = $2
2497 WHERE process_id = $1 AND lease_token = $3",
2498 )
2499 .bind(&renewed.process_id)
2500 .bind(renewed.expires_at_epoch_ms as i64)
2501 .bind(&renewed.lease_token)
2502 .execute(&mut *tx)
2503 .await
2504 .map_err(plugin_sqlx_error)?;
2505 tx.commit().await.map_err(plugin_sqlx_error)?;
2506 Ok(renewed)
2507 }
2508
2509 async fn complete_process_lease(
2510 &self,
2511 completion: &ProcessLeaseCompletion,
2512 ) -> Result<(), PluginError> {
2513 sqlx::query(
2514 "UPDATE lash_process_leases
2515 SET lease_owner_id = NULL,
2516 lease_token = NULL,
2517 lease_claimed_at_ms = 0,
2518 lease_expires_at_ms = 0
2519 WHERE process_id = $1 AND lease_token = $2",
2520 )
2521 .bind(&completion.process_id)
2522 .bind(&completion.lease_token)
2523 .execute(&self.pool)
2524 .await
2525 .map_err(plugin_sqlx_error)?;
2526 Ok(())
2527 }
2528}
2529
2530#[async_trait::async_trait]
2531impl HostEventStore for PostgresHostEventStore {
2532 fn durability_tier(&self) -> DurabilityTier {
2533 DurabilityTier::Durable
2534 }
2535
2536 async fn register_subscription(
2537 &self,
2538 draft: TriggerSubscriptionDraft,
2539 ) -> Result<TriggerSubscriptionRecord, PluginError> {
2540 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2541 let seq: i64 = sqlx::query_scalar("SELECT nextval('lash_host_event_subscription_seq')")
2542 .fetch_one(&mut *tx)
2543 .await
2544 .map_err(plugin_sqlx_error)?;
2545 let handle = format!("trigger:{seq}");
2546 let subscription_id = format!("subscription:{seq}");
2547 let now = current_epoch_ms();
2548 let record = TriggerSubscriptionRecord {
2549 subscription_id: subscription_id.clone(),
2550 session_id: draft.session_id,
2551 handle,
2552 name: draft.name,
2553 source_type: draft.source_type,
2554 source_key: draft.source_key,
2555 source: draft.source,
2556 event_ty: draft.event_ty,
2557 module_ref: draft.module_ref,
2558 required_surface_ref: draft.required_surface_ref,
2559 process_ref: draft.process_ref,
2560 process_name: draft.process_name,
2561 input_template: draft.input_template,
2562 enabled: true,
2563 created_at_ms: now,
2564 updated_at_ms: now,
2565 };
2566 sqlx::query(
2567 "INSERT INTO lash_host_event_trigger_subscriptions (
2568 subscription_id, session_id, handle, source_type, source_key,
2569 enabled, created_at_ms, updated_at_ms, record_json
2570 )
2571 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
2572 )
2573 .bind(&record.subscription_id)
2574 .bind(&record.session_id)
2575 .bind(&record.handle)
2576 .bind(&record.source_type)
2577 .bind(&record.source_key)
2578 .bind(record.enabled)
2579 .bind(record.created_at_ms as i64)
2580 .bind(record.updated_at_ms as i64)
2581 .bind(serde_json::to_string(&record).map_err(process_decode_error)?)
2582 .execute(&mut *tx)
2583 .await
2584 .map_err(plugin_sqlx_error)?;
2585 tx.commit().await.map_err(plugin_sqlx_error)?;
2586 Ok(record)
2587 }
2588
2589 async fn list_subscriptions(
2590 &self,
2591 filter: TriggerSubscriptionFilter,
2592 ) -> Result<Vec<TriggerSubscriptionRecord>, PluginError> {
2593 let rows = sqlx::query(
2594 "SELECT record_json FROM lash_host_event_trigger_subscriptions
2595 ORDER BY session_id ASC, handle ASC",
2596 )
2597 .fetch_all(&self.pool)
2598 .await
2599 .map_err(plugin_sqlx_error)?;
2600 let mut records = Vec::new();
2601 for row in rows {
2602 let json: String = row.get(0);
2603 let record: TriggerSubscriptionRecord =
2604 serde_json::from_str(&json).map_err(process_decode_error)?;
2605 if filter.matches(&record) {
2606 records.push(record);
2607 }
2608 }
2609 Ok(records)
2610 }
2611
2612 async fn cancel_subscription(
2613 &self,
2614 session_id: &str,
2615 handle: &str,
2616 ) -> Result<bool, PluginError> {
2617 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2618 let json: Option<String> = sqlx::query_scalar(
2619 "SELECT record_json FROM lash_host_event_trigger_subscriptions
2620 WHERE session_id = $1 AND handle = $2
2621 FOR UPDATE",
2622 )
2623 .bind(session_id)
2624 .bind(handle)
2625 .fetch_optional(&mut *tx)
2626 .await
2627 .map_err(plugin_sqlx_error)?;
2628 let Some(json) = json else {
2629 tx.commit().await.map_err(plugin_sqlx_error)?;
2630 return Ok(false);
2631 };
2632 let mut record: TriggerSubscriptionRecord =
2633 serde_json::from_str(&json).map_err(process_decode_error)?;
2634 let changed = record.enabled;
2635 record.enabled = false;
2636 record.updated_at_ms = current_epoch_ms();
2637 sqlx::query(
2638 "UPDATE lash_host_event_trigger_subscriptions
2639 SET enabled = $3, updated_at_ms = $4, record_json = $5
2640 WHERE session_id = $1 AND handle = $2",
2641 )
2642 .bind(session_id)
2643 .bind(handle)
2644 .bind(record.enabled)
2645 .bind(record.updated_at_ms as i64)
2646 .bind(serde_json::to_string(&record).map_err(process_decode_error)?)
2647 .execute(&mut *tx)
2648 .await
2649 .map_err(plugin_sqlx_error)?;
2650 tx.commit().await.map_err(plugin_sqlx_error)?;
2651 Ok(changed)
2652 }
2653
2654 async fn record_occurrence(
2655 &self,
2656 request: HostEventOccurrenceRequest,
2657 ) -> Result<HostEventOccurrenceRecord, PluginError> {
2658 lash_core::validate_host_event_occurrence_request(&request)?;
2659 let request_hash = lash_core::host_event_occurrence_request_hash(&request)?;
2660 let occurrence_id = lash_core::deterministic_occurrence_id(&request)?;
2661 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2662 let existing = sqlx::query(
2663 "SELECT request_hash, record_json
2664 FROM lash_host_event_occurrences
2665 WHERE idempotency_key = $1
2666 FOR UPDATE",
2667 )
2668 .bind(&request.idempotency_key)
2669 .fetch_optional(&mut *tx)
2670 .await
2671 .map_err(plugin_sqlx_error)?;
2672 if let Some(row) = existing {
2673 let existing_hash: String = row.get(0);
2674 let existing_json: String = row.get(1);
2675 if existing_hash != request_hash {
2676 return Err(PluginError::Session(format!(
2677 "host event occurrence idempotency conflict for `{}`",
2678 request.idempotency_key
2679 )));
2680 }
2681 let record = serde_json::from_str(&existing_json).map_err(process_decode_error)?;
2682 tx.commit().await.map_err(plugin_sqlx_error)?;
2683 return Ok(record);
2684 }
2685 let record = HostEventOccurrenceRecord {
2686 occurrence_id: occurrence_id.clone(),
2687 source_type: request.source_type,
2688 source_key: request.source_key,
2689 payload: request.payload,
2690 idempotency_key: request.idempotency_key.clone(),
2691 source: request.source,
2692 occurred_at_ms: current_epoch_ms(),
2693 };
2694 sqlx::query(
2695 "INSERT INTO lash_host_event_occurrences (
2696 occurrence_id, idempotency_key, request_hash, source_type, source_key,
2697 occurred_at_ms, record_json
2698 )
2699 VALUES ($1, $2, $3, $4, $5, $6, $7)",
2700 )
2701 .bind(&record.occurrence_id)
2702 .bind(&record.idempotency_key)
2703 .bind(&request_hash)
2704 .bind(&record.source_type)
2705 .bind(&record.source_key)
2706 .bind(record.occurred_at_ms as i64)
2707 .bind(serde_json::to_string(&record).map_err(process_decode_error)?)
2708 .execute(&mut *tx)
2709 .await
2710 .map_err(plugin_sqlx_error)?;
2711 tx.commit().await.map_err(plugin_sqlx_error)?;
2712 Ok(record)
2713 }
2714
2715 async fn reserve_matching_deliveries(
2716 &self,
2717 occurrence_id: &str,
2718 ) -> Result<Vec<TriggerDeliveryReservation>, PluginError> {
2719 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2720 let occurrence_json: Option<String> = sqlx::query_scalar(
2721 "SELECT record_json FROM lash_host_event_occurrences WHERE occurrence_id = $1",
2722 )
2723 .bind(occurrence_id)
2724 .fetch_optional(&mut *tx)
2725 .await
2726 .map_err(plugin_sqlx_error)?;
2727 let Some(occurrence_json) = occurrence_json else {
2728 return Err(PluginError::Session(format!(
2729 "unknown host event occurrence `{occurrence_id}`"
2730 )));
2731 };
2732 let occurrence: HostEventOccurrenceRecord =
2733 serde_json::from_str(&occurrence_json).map_err(process_decode_error)?;
2734 let rows = sqlx::query(
2735 "SELECT record_json FROM lash_host_event_trigger_subscriptions
2736 WHERE enabled = TRUE AND source_type = $1 AND source_key = $2
2737 ORDER BY session_id ASC, handle ASC",
2738 )
2739 .bind(&occurrence.source_type)
2740 .bind(&occurrence.source_key)
2741 .fetch_all(&mut *tx)
2742 .await
2743 .map_err(plugin_sqlx_error)?;
2744 let mut deliveries = Vec::new();
2745 for row in rows {
2746 let json: String = row.get(0);
2747 let subscription: TriggerSubscriptionRecord =
2748 serde_json::from_str(&json).map_err(process_decode_error)?;
2749 let process_id = lash_core::deterministic_delivery_process_id(
2750 &occurrence.occurrence_id,
2751 &subscription.subscription_id,
2752 )?;
2753 let inserted = sqlx::query(
2754 "INSERT INTO lash_host_event_deliveries (
2755 occurrence_id, subscription_id, process_id, created_at_ms
2756 )
2757 VALUES ($1, $2, $3, $4)
2758 ON CONFLICT DO NOTHING",
2759 )
2760 .bind(&occurrence.occurrence_id)
2761 .bind(&subscription.subscription_id)
2762 .bind(&process_id)
2763 .bind(current_epoch_ms() as i64)
2764 .execute(&mut *tx)
2765 .await
2766 .map_err(plugin_sqlx_error)?
2767 .rows_affected();
2768 if inserted == 0 {
2769 continue;
2770 }
2771 deliveries.push(TriggerDeliveryReservation {
2772 occurrence: occurrence.clone(),
2773 subscription,
2774 process_id,
2775 });
2776 }
2777 tx.commit().await.map_err(plugin_sqlx_error)?;
2778 Ok(deliveries)
2779 }
2780}
2781
2782#[async_trait::async_trait]
2783impl lashlang::LashlangArtifactStore for PostgresLashlangArtifactStore {
2784 fn durability_tier(&self) -> DurabilityTier {
2785 DurabilityTier::Durable
2786 }
2787
2788 async fn put_module_artifact(
2789 &self,
2790 artifact: &lashlang::ModuleArtifact,
2791 ) -> Result<(), lashlang::ArtifactStoreError> {
2792 let bytes = artifact
2793 .to_store_bytes()
2794 .map_err(lashlang::ArtifactStoreError::from)?;
2795 sqlx::query(
2796 "INSERT INTO lash_lashlang_artifacts (module_ref, artifact_bytes)
2797 VALUES ($1, $2)
2798 ON CONFLICT (module_ref) DO UPDATE SET artifact_bytes = EXCLUDED.artifact_bytes",
2799 )
2800 .bind(artifact.module_ref.as_str())
2801 .bind(bytes)
2802 .execute(&self.pool)
2803 .await
2804 .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?;
2805 Ok(())
2806 }
2807
2808 async fn get_module_artifact(
2809 &self,
2810 module_ref: &lashlang::ModuleRef,
2811 ) -> Result<Option<Arc<lashlang::ModuleArtifact>>, lashlang::ArtifactStoreError> {
2812 let bytes: Option<Vec<u8>> = sqlx::query_scalar(
2813 "SELECT artifact_bytes FROM lash_lashlang_artifacts WHERE module_ref = $1",
2814 )
2815 .bind(module_ref.as_str())
2816 .fetch_optional(&self.pool)
2817 .await
2818 .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?;
2819 bytes
2820 .map(|bytes| {
2821 lashlang::ModuleArtifact::from_store_bytes(&bytes)
2822 .map(Arc::new)
2823 .map_err(lashlang::ArtifactStoreError::from)
2824 })
2825 .transpose()
2826 }
2827}