1use std::collections::HashSet;
8use std::sync::{Arc, OnceLock};
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11use lash_core::runtime::{
12 ProcessHandleGrantEntry, QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim,
13 QueuedWorkClaimBoundary, QueuedWorkCompletion, QueuedWorkItem,
14};
15use lash_core::store::{
16 GraphCommitDelta, HydratedSessionCheckpoint, PersistedSessionRead, RuntimeCommit,
17 RuntimeCommitResult, SessionCheckpoint, SessionHeadMeta,
18};
19use lash_core::{
20 AttachmentId, AttachmentIntent, AttachmentManifest, AttachmentManifestEntry, BlobRef,
21 DeliveryPolicy, DurabilityTier, GcReport, MergeKey, ProcessAwaitOutput, ProcessEvent,
22 ProcessEventAppendRequest, ProcessEventAppendResult, ProcessExternalRef,
23 ProcessHandleDescriptor, ProcessHandleGrant, ProcessLease, ProcessLeaseCompletion,
24 ProcessRecord, ProcessRegistration, ProcessRegistry, ProcessScope, RuntimePersistence,
25 SessionMeta, SessionNodeRecord, SessionReadScope, SessionStoreCreateRequest,
26 SessionStoreFactory, SlotPolicy, StoreError, TokenLedgerEntry, VacuumReport,
27};
28use lash_core::{
29 HostEventOccurrenceRecord, HostEventOccurrenceRequest, HostEventStore, PluginError,
30 TriggerDeliveryReservation, TriggerSubscriptionDraft, TriggerSubscriptionFilter,
31 TriggerSubscriptionRecord,
32};
33use sha2::{Digest, Sha256};
34use sqlx::postgres::{PgPool, PgPoolOptions, PgRow};
35use sqlx::{Executor, Row};
36
37const SCHEMA_COMPONENT: &str = "lash-postgres-store";
38const SCHEMA_VERSION: i32 = 1;
39const PROCESS_LEASE_SCHEMA_VERSION: u32 = lash_core::PROCESS_LEASE_SCHEMA_VERSION;
40
41#[derive(Clone)]
42pub struct PostgresStorage {
43 pool: PgPool,
44}
45
46#[derive(Clone)]
47pub struct PostgresSessionStoreFactory {
48 pool: PgPool,
49}
50
51#[derive(Clone)]
52pub struct PostgresSessionStore {
53 pool: PgPool,
54 session_id: Option<String>,
56 bound_session: Arc<OnceLock<String>>,
62}
63
64#[derive(Clone)]
65pub struct PostgresProcessRegistry {
66 pool: PgPool,
67 notify: Arc<tokio::sync::Notify>,
68}
69
70#[derive(Clone)]
71pub struct PostgresHostEventStore {
72 pool: PgPool,
73}
74
75#[derive(Clone)]
76pub struct PostgresLashlangArtifactStore {
77 pool: PgPool,
78}
79
80#[derive(Clone, Debug)]
89pub struct PostgresStoreConfig {
90 pub max_connections: u32,
92 pub min_connections: u32,
94 pub acquire_timeout: Duration,
96 pub idle_timeout: Option<Duration>,
98 pub max_lifetime: Option<Duration>,
100 pub lock_timeout: Option<Duration>,
102 pub statement_timeout: Option<Duration>,
105}
106
107impl Default for PostgresStoreConfig {
108 fn default() -> Self {
109 Self {
110 max_connections: 16,
111 min_connections: 0,
112 acquire_timeout: Duration::from_secs(30),
113 idle_timeout: Some(Duration::from_secs(600)),
114 max_lifetime: Some(Duration::from_secs(1800)),
115 lock_timeout: Some(Duration::from_secs(10)),
116 statement_timeout: Some(Duration::from_secs(30)),
117 }
118 }
119}
120
121impl PostgresStorage {
122 pub async fn connect(database_url: &str) -> Result<Self, StoreError> {
124 Self::connect_with(database_url, PostgresStoreConfig::default()).await
125 }
126
127 pub async fn connect_with(
129 database_url: &str,
130 config: PostgresStoreConfig,
131 ) -> Result<Self, StoreError> {
132 let lock_ms = config.lock_timeout.map(|d| d.as_millis().max(1) as u64);
133 let statement_ms = config
134 .statement_timeout
135 .map(|d| d.as_millis().max(1) as u64);
136 let mut options = PgPoolOptions::new()
137 .max_connections(config.max_connections)
138 .min_connections(config.min_connections)
139 .acquire_timeout(config.acquire_timeout);
140 if let Some(timeout) = config.idle_timeout {
141 options = options.idle_timeout(timeout);
142 }
143 if let Some(timeout) = config.max_lifetime {
144 options = options.max_lifetime(timeout);
145 }
146 let pool = options
147 .after_connect(move |conn, _meta| {
148 Box::pin(async move {
149 if let Some(ms) = lock_ms {
150 conn.execute(format!("SET lock_timeout = {ms}").as_str())
151 .await?;
152 }
153 if let Some(ms) = statement_ms {
154 conn.execute(format!("SET statement_timeout = {ms}").as_str())
155 .await?;
156 }
157 Ok(())
158 })
159 })
160 .connect(database_url)
161 .await
162 .map_err(store_sqlx_error)?;
163 ensure_schema(&pool).await?;
164 Ok(Self { pool })
165 }
166
167 pub fn from_pool(pool: PgPool) -> Self {
168 Self { pool }
169 }
170
171 pub fn pool(&self) -> &PgPool {
172 &self.pool
173 }
174
175 pub fn session_store_factory(&self) -> PostgresSessionStoreFactory {
176 PostgresSessionStoreFactory {
177 pool: self.pool.clone(),
178 }
179 }
180
181 pub fn session_store(&self, session_id: impl Into<String>) -> PostgresSessionStore {
182 PostgresSessionStore {
183 pool: self.pool.clone(),
184 session_id: Some(session_id.into()),
185 bound_session: Arc::new(OnceLock::new()),
186 }
187 }
188
189 pub fn unbound_session_store(&self) -> PostgresSessionStore {
190 PostgresSessionStore {
191 pool: self.pool.clone(),
192 session_id: None,
193 bound_session: Arc::new(OnceLock::new()),
194 }
195 }
196
197 pub fn process_registry(&self) -> PostgresProcessRegistry {
198 PostgresProcessRegistry {
199 pool: self.pool.clone(),
200 notify: Arc::new(tokio::sync::Notify::new()),
201 }
202 }
203
204 pub fn host_event_store(&self) -> PostgresHostEventStore {
205 PostgresHostEventStore {
206 pool: self.pool.clone(),
207 }
208 }
209
210 pub fn lashlang_artifact_store(&self) -> PostgresLashlangArtifactStore {
211 PostgresLashlangArtifactStore {
212 pool: self.pool.clone(),
213 }
214 }
215}
216
217impl PostgresSessionStoreFactory {
218 pub fn new(storage: &PostgresStorage) -> Self {
219 storage.session_store_factory()
220 }
221}
222
223impl PostgresSessionStore {
224 pub fn unbound(storage: &PostgresStorage) -> Self {
225 storage.unbound_session_store()
226 }
227
228 async fn selected_session_id(&self) -> Result<Option<String>, StoreError> {
229 if let Some(session_id) = &self.session_id {
230 return Ok(Some(session_id.clone()));
231 }
232 sqlx::query_scalar("SELECT session_id FROM lash_sessions ORDER BY session_id ASC LIMIT 1")
233 .fetch_optional(&self.pool)
234 .await
235 .map_err(store_sqlx_error)
236 }
237}
238
239async fn ensure_schema(pool: &PgPool) -> Result<(), StoreError> {
240 let mut tx = pool.begin().await.map_err(store_sqlx_error)?;
241 tx.execute("SELECT pg_advisory_xact_lock(715421, 907001)")
242 .await
243 .map_err(store_sqlx_error)?;
244 tx.execute(
245 r#"
246 CREATE TABLE IF NOT EXISTS lash_schema_versions (
247 component TEXT PRIMARY KEY,
248 version INTEGER NOT NULL
249 );
250
251 CREATE TABLE IF NOT EXISTS lash_blobs (
252 hash TEXT PRIMARY KEY,
253 content BYTEA NOT NULL
254 );
255
256 CREATE TABLE IF NOT EXISTS lash_sessions (
257 session_id TEXT PRIMARY KEY,
258 head_revision BIGINT NOT NULL DEFAULT 0,
259 head_json TEXT NOT NULL,
260 checkpoint_ref TEXT
261 );
262
263 CREATE TABLE IF NOT EXISTS lash_graph_nodes (
264 session_id TEXT NOT NULL,
265 seq BIGSERIAL,
266 node_id TEXT NOT NULL,
267 node_json TEXT NOT NULL,
268 tombstoned BOOLEAN NOT NULL DEFAULT FALSE,
269 PRIMARY KEY (session_id, node_id)
270 );
271 CREATE INDEX IF NOT EXISTS idx_lash_graph_nodes_seq
272 ON lash_graph_nodes(session_id, seq);
273
274 CREATE TABLE IF NOT EXISTS lash_usage_deltas (
275 seq BIGSERIAL PRIMARY KEY,
276 session_id TEXT NOT NULL,
277 entry_json TEXT NOT NULL
278 );
279
280 CREATE TABLE IF NOT EXISTS lash_session_meta (
281 session_id TEXT PRIMARY KEY,
282 meta_json TEXT NOT NULL
283 );
284
285 CREATE TABLE IF NOT EXISTS lash_runtime_turn_commits (
286 session_id TEXT NOT NULL,
287 turn_id TEXT NOT NULL,
288 turn_commit_hash TEXT NOT NULL,
289 result_json TEXT NOT NULL,
290 committed_at_ms BIGINT NOT NULL,
291 PRIMARY KEY (session_id, turn_id)
292 );
293
294 CREATE TABLE IF NOT EXISTS lash_queued_work_batches (
295 enqueue_seq BIGSERIAL PRIMARY KEY,
296 batch_id TEXT NOT NULL UNIQUE,
297 session_id TEXT NOT NULL,
298 source_key TEXT,
299 delivery_policy TEXT NOT NULL,
300 slot_policy TEXT NOT NULL,
301 merge_key_json TEXT NOT NULL,
302 available_at_ms BIGINT NOT NULL,
303 enqueued_at_ms BIGINT NOT NULL,
304 claim_id TEXT,
305 claim_owner_id TEXT,
306 claim_token TEXT,
307 claim_fencing_token BIGINT NOT NULL DEFAULT 0,
308 claim_claimed_at_ms BIGINT NOT NULL DEFAULT 0,
309 claim_expires_at_ms BIGINT NOT NULL DEFAULT 0,
310 UNIQUE (session_id, source_key)
311 );
312 CREATE INDEX IF NOT EXISTS idx_lash_queued_work_ready
313 ON lash_queued_work_batches(session_id, available_at_ms, enqueue_seq);
314
315 CREATE TABLE IF NOT EXISTS lash_queued_work_items (
316 batch_id TEXT NOT NULL REFERENCES lash_queued_work_batches(batch_id) ON DELETE CASCADE,
317 item_index INTEGER NOT NULL,
318 item_id TEXT NOT NULL,
319 payload_json TEXT NOT NULL,
320 PRIMARY KEY (batch_id, item_index)
321 );
322
323 CREATE TABLE IF NOT EXISTS lash_attachment_manifest (
324 attachment_id TEXT PRIMARY KEY,
325 session_id TEXT NOT NULL,
326 canonical_uri TEXT NOT NULL,
327 intent_at_ms BIGINT NOT NULL,
328 committed_at_ms BIGINT
329 );
330 CREATE INDEX IF NOT EXISTS idx_lash_attachment_manifest_uncommitted
331 ON lash_attachment_manifest(committed_at_ms)
332 WHERE committed_at_ms IS NULL;
333
334 CREATE TABLE IF NOT EXISTS lash_processes (
335 process_id TEXT PRIMARY KEY,
336 registration_hash TEXT NOT NULL,
337 owner_scope_id TEXT NOT NULL,
338 host_profile_id TEXT NOT NULL,
339 created_at_ms BIGINT NOT NULL,
340 updated_at_ms BIGINT NOT NULL,
341 status TEXT NOT NULL,
342 record_json TEXT NOT NULL
343 );
344 CREATE INDEX IF NOT EXISTS idx_lash_processes_status
345 ON lash_processes(status);
346
347 CREATE TABLE IF NOT EXISTS lash_process_events (
348 process_id TEXT NOT NULL REFERENCES lash_processes(process_id) ON DELETE CASCADE,
349 sequence BIGINT NOT NULL,
350 event_type TEXT NOT NULL,
351 payload_hash TEXT NOT NULL,
352 idempotency_key TEXT,
353 occurred_at_ms BIGINT NOT NULL,
354 event_json TEXT NOT NULL,
355 PRIMARY KEY (process_id, sequence)
356 );
357 CREATE UNIQUE INDEX IF NOT EXISTS idx_lash_process_events_key
358 ON lash_process_events(process_id, idempotency_key)
359 WHERE idempotency_key IS NOT NULL;
360
361 CREATE TABLE IF NOT EXISTS lash_process_wake_acks (
362 process_id TEXT NOT NULL REFERENCES lash_processes(process_id) ON DELETE CASCADE,
363 sequence BIGINT NOT NULL,
364 PRIMARY KEY (process_id, sequence)
365 );
366
367 CREATE TABLE IF NOT EXISTS lash_process_handle_grants (
368 session_id TEXT NOT NULL,
369 scope_id TEXT NOT NULL,
370 process_id TEXT NOT NULL REFERENCES lash_processes(process_id) ON DELETE CASCADE,
371 descriptor_json TEXT NOT NULL,
372 PRIMARY KEY (scope_id, process_id)
373 );
374 CREATE INDEX IF NOT EXISTS idx_lash_process_handle_grants_session
375 ON lash_process_handle_grants(session_id);
376 CREATE INDEX IF NOT EXISTS idx_lash_process_handle_grants_process
377 ON lash_process_handle_grants(process_id);
378
379 CREATE TABLE IF NOT EXISTS lash_process_leases (
380 process_id TEXT PRIMARY KEY REFERENCES lash_processes(process_id) ON DELETE CASCADE,
381 lease_owner_id TEXT,
382 lease_token TEXT,
383 lease_fencing_token BIGINT NOT NULL DEFAULT 0,
384 lease_claimed_at_ms BIGINT NOT NULL DEFAULT 0,
385 lease_expires_at_ms BIGINT NOT NULL DEFAULT 0
386 );
387
388 CREATE SEQUENCE IF NOT EXISTS lash_host_event_subscription_seq;
389 CREATE TABLE IF NOT EXISTS lash_host_event_trigger_subscriptions (
390 subscription_id TEXT PRIMARY KEY,
391 session_id TEXT NOT NULL,
392 handle TEXT NOT NULL,
393 source_type TEXT NOT NULL,
394 source_key TEXT NOT NULL,
395 enabled BOOLEAN NOT NULL,
396 created_at_ms BIGINT NOT NULL,
397 updated_at_ms BIGINT NOT NULL,
398 record_json TEXT NOT NULL,
399 UNIQUE(session_id, handle)
400 );
401 CREATE INDEX IF NOT EXISTS idx_lash_host_event_subscriptions_source
402 ON lash_host_event_trigger_subscriptions(source_type, source_key, enabled);
403
404 CREATE TABLE IF NOT EXISTS lash_host_event_occurrences (
405 occurrence_id TEXT PRIMARY KEY,
406 idempotency_key TEXT NOT NULL UNIQUE,
407 request_hash TEXT NOT NULL,
408 source_type TEXT NOT NULL,
409 source_key TEXT NOT NULL,
410 occurred_at_ms BIGINT NOT NULL,
411 record_json TEXT NOT NULL
412 );
413
414 CREATE TABLE IF NOT EXISTS lash_host_event_deliveries (
415 occurrence_id TEXT NOT NULL REFERENCES lash_host_event_occurrences(occurrence_id) ON DELETE CASCADE,
416 subscription_id TEXT NOT NULL REFERENCES lash_host_event_trigger_subscriptions(subscription_id) ON DELETE CASCADE,
417 process_id TEXT NOT NULL,
418 created_at_ms BIGINT NOT NULL,
419 PRIMARY KEY (occurrence_id, subscription_id)
420 );
421
422 CREATE TABLE IF NOT EXISTS lash_lashlang_artifacts (
423 module_ref TEXT PRIMARY KEY,
424 artifact_bytes BYTEA NOT NULL
425 );
426 "#,
427 )
428 .await
429 .map_err(store_sqlx_error)?;
430
431 let existing: Option<i32> =
432 sqlx::query_scalar("SELECT version FROM lash_schema_versions WHERE component = $1")
433 .bind(SCHEMA_COMPONENT)
434 .fetch_optional(&mut *tx)
435 .await
436 .map_err(store_sqlx_error)?;
437 match existing {
438 Some(version) if version == SCHEMA_VERSION => {}
439 Some(version) => {
440 return Err(StoreError::Backend(format!(
441 "Postgres schema component `{SCHEMA_COMPONENT}` has version {version}, expected {SCHEMA_VERSION}"
442 )));
443 }
444 None => {
445 sqlx::query("INSERT INTO lash_schema_versions (component, version) VALUES ($1, $2)")
446 .bind(SCHEMA_COMPONENT)
447 .bind(SCHEMA_VERSION)
448 .execute(&mut *tx)
449 .await
450 .map_err(store_sqlx_error)?;
451 }
452 }
453 tx.commit().await.map_err(store_sqlx_error)
454}
455
456fn current_epoch_ms() -> u64 {
457 SystemTime::now()
458 .duration_since(UNIX_EPOCH)
459 .unwrap_or_default()
460 .as_millis() as u64
461}
462
463fn current_timestamp_string() -> String {
464 let now = SystemTime::now()
465 .duration_since(UNIX_EPOCH)
466 .unwrap_or_default();
467 format!("unix:{}", now.as_secs())
468}
469
470fn store_sqlx_error(err: sqlx::Error) -> StoreError {
471 StoreError::Backend(err.to_string())
472}
473
474fn is_contention_error(err: &sqlx::Error) -> bool {
479 matches!(
480 err.as_database_error()
481 .and_then(|db| db.code())
482 .as_deref(),
483 Some("40001" | "40P01" | "55P03")
484 )
485}
486
487fn plugin_sqlx_error(err: sqlx::Error) -> PluginError {
488 PluginError::Session(err.to_string())
489}
490
491fn process_decode_error(err: serde_json::Error) -> PluginError {
492 PluginError::Session(format!("failed to decode process registry row: {err}"))
493}
494
495fn store_decode_json<T: serde::de::DeserializeOwned>(
496 json: &str,
497 what: &str,
498) -> Result<T, StoreError> {
499 serde_json::from_str(json)
500 .map_err(|err| StoreError::Backend(format!("failed to decode {what}: {err}")))
501}
502
503fn encode_json<T: serde::Serialize>(value: &T) -> String {
504 serde_json::to_string(value).expect("persisted state should serialize")
505}
506
507fn encode_msgpack<T: serde::Serialize>(value: &T) -> Vec<u8> {
508 let mut buf = Vec::with_capacity(1024);
509 rmp_serde::encode::write_named(&mut buf, value).expect("value should serialize");
510 buf
511}
512
513fn decode_msgpack<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Option<T> {
514 rmp_serde::from_slice(bytes).ok()
515}
516
517fn block_on_detached<T: Send + 'static>(
518 future: impl std::future::Future<Output = T> + Send + 'static,
519) -> T {
520 std::thread::spawn(move || {
521 tokio::runtime::Builder::new_current_thread()
522 .enable_all()
523 .build()
524 .expect("postgres manifest runtime")
525 .block_on(future)
526 })
527 .join()
528 .expect("postgres manifest thread")
529}
530
531fn merge_token_ledger_entries(entries: Vec<TokenLedgerEntry>) -> Vec<TokenLedgerEntry> {
532 let mut merged = Vec::<TokenLedgerEntry>::new();
533 for entry in entries {
534 if entry.usage.total() == 0 {
535 continue;
536 }
537 if let Some(existing) = merged
538 .iter_mut()
539 .find(|existing| existing.source == entry.source && existing.model == entry.model)
540 {
541 existing.usage.input_tokens += entry.usage.input_tokens;
542 existing.usage.output_tokens += entry.usage.output_tokens;
543 existing.usage.cached_input_tokens += entry.usage.cached_input_tokens;
544 existing.usage.reasoning_tokens += entry.usage.reasoning_tokens;
545 } else {
546 merged.push(entry);
547 }
548 }
549 merged
550}
551
552#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
553struct SessionCheckpointEnvelope {
554 manifest: SessionCheckpoint,
555 tool_state: Option<lash_core::ToolState>,
556 plugin_snapshot: Option<lash_core::PluginSessionSnapshot>,
557 execution_state: Option<Vec<u8>>,
558}
559
560async fn put_blob_tx(
561 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
562 content: &[u8],
563) -> Result<BlobRef, StoreError> {
564 let hash = format!("{:x}", Sha256::digest(content));
565 sqlx::query(
566 "INSERT INTO lash_blobs (hash, content)
567 VALUES ($1, $2)
568 ON CONFLICT (hash) DO NOTHING",
569 )
570 .bind(&hash)
571 .bind(content)
572 .execute(&mut **tx)
573 .await
574 .map_err(store_sqlx_error)?;
575 Ok(BlobRef(hash))
576}
577
578async fn get_blob(pool: &PgPool, blob_ref: &BlobRef) -> Option<Vec<u8>> {
579 sqlx::query_scalar::<_, Vec<u8>>("SELECT content FROM lash_blobs WHERE hash = $1")
580 .bind(blob_ref.as_str())
581 .fetch_optional(pool)
582 .await
583 .ok()
584 .flatten()
585}
586
587async fn put_checkpoint_tx(
588 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
589 checkpoint: &HydratedSessionCheckpoint,
590) -> Result<(BlobRef, SessionCheckpoint), StoreError> {
591 let manifest = SessionCheckpoint {
592 turn_state: checkpoint.turn_state.clone(),
593 tool_state_ref: checkpoint.tool_state_ref.clone(),
594 plugin_snapshot_ref: checkpoint.plugin_snapshot_ref.clone(),
595 plugin_snapshot_revision: checkpoint.plugin_snapshot_revision,
596 execution_state_ref: checkpoint.execution_state_ref.clone(),
597 };
598 let envelope = SessionCheckpointEnvelope {
599 manifest: manifest.clone(),
600 tool_state: checkpoint.tool_state.clone(),
601 plugin_snapshot: checkpoint.plugin_snapshot.clone(),
602 execution_state: checkpoint.execution_state.clone(),
603 };
604 let bytes = encode_msgpack(&envelope);
605 let checkpoint_ref = put_blob_tx(tx, &bytes).await?;
606 Ok((checkpoint_ref, manifest))
607}
608
609async fn get_checkpoint(pool: &PgPool, blob_ref: &BlobRef) -> Option<HydratedSessionCheckpoint> {
610 let bytes = get_blob(pool, blob_ref).await?;
611 let envelope: SessionCheckpointEnvelope = decode_msgpack(&bytes)?;
612 Some(HydratedSessionCheckpoint {
613 turn_state: envelope.manifest.turn_state,
614 tool_state_ref: envelope.manifest.tool_state_ref,
615 tool_state: envelope.tool_state,
616 plugin_snapshot_ref: envelope.manifest.plugin_snapshot_ref,
617 plugin_snapshot: envelope.plugin_snapshot,
618 plugin_snapshot_revision: envelope.manifest.plugin_snapshot_revision,
619 execution_state_ref: envelope.manifest.execution_state_ref,
620 execution_state: envelope.execution_state,
621 })
622}
623
624async fn load_session_head_meta_tx(
625 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
626 session_id: &str,
627 for_update: bool,
628) -> Result<Option<SessionHeadMeta>, StoreError> {
629 let sql = if for_update {
630 "SELECT head_json, head_revision FROM lash_sessions WHERE session_id = $1 FOR UPDATE"
631 } else {
632 "SELECT head_json, head_revision FROM lash_sessions WHERE session_id = $1"
633 };
634 let row = sqlx::query(sql)
635 .bind(session_id)
636 .fetch_optional(&mut **tx)
637 .await
638 .map_err(store_sqlx_error)?;
639 let Some(row) = row else {
640 return Ok(None);
641 };
642 let head_json: String = row.get(0);
643 let head_revision: i64 = row.get(1);
644 let mut meta: SessionHeadMeta = store_decode_json(&head_json, "session head")?;
645 meta.head_revision = head_revision as u64;
646 Ok(Some(meta))
647}
648
649async fn load_usage_deltas(pool: &PgPool, session_id: &str) -> Vec<TokenLedgerEntry> {
650 let rows = sqlx::query(
651 "SELECT entry_json FROM lash_usage_deltas WHERE session_id = $1 ORDER BY seq ASC",
652 )
653 .bind(session_id)
654 .fetch_all(pool)
655 .await
656 .unwrap_or_default();
657 rows.into_iter()
658 .filter_map(|row| {
659 let json: String = row.get(0);
660 serde_json::from_str(&json).ok()
661 })
662 .collect()
663}
664
665async fn load_graph(
666 pool: &PgPool,
667 session_id: &str,
668 leaf_node_id: Option<String>,
669 active_path: bool,
670) -> Result<lash_core::SessionGraph, StoreError> {
671 let rows = sqlx::query(
672 "SELECT node_json FROM lash_graph_nodes
673 WHERE session_id = $1 AND tombstoned = FALSE
674 ORDER BY seq ASC",
675 )
676 .bind(session_id)
677 .fetch_all(pool)
678 .await
679 .map_err(store_sqlx_error)?;
680 let mut nodes = Vec::<SessionNodeRecord>::new();
681 for row in rows {
682 let json: String = row.get(0);
683 nodes.push(store_decode_json(&json, "session graph node")?);
684 }
685 if active_path {
686 if let Some(leaf) = leaf_node_id.clone() {
687 let wanted = active_path_node_ids(&nodes, &leaf);
688 nodes.retain(|node| wanted.contains(&node.node_id));
689 }
690 }
691 Ok(lash_core::SessionGraph::from_nodes(nodes, leaf_node_id))
692}
693
694fn active_path_node_ids(nodes: &[SessionNodeRecord], leaf_node_id: &str) -> HashSet<String> {
695 let mut parent_by_id = std::collections::BTreeMap::new();
696 for node in nodes {
697 parent_by_id.insert(node.node_id.clone(), node.parent_node_id.clone());
698 }
699 let mut wanted = HashSet::new();
700 let mut cursor = Some(leaf_node_id.to_string());
701 while let Some(node_id) = cursor {
702 if !wanted.insert(node_id.clone()) {
703 break;
704 }
705 cursor = parent_by_id.get(&node_id).cloned().flatten();
706 }
707 wanted
708}
709
710async fn commit_attachment_refs_tx(
711 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
712 session_id: &str,
713 attachment_ids: &[AttachmentId],
714) -> Result<(), StoreError> {
715 if attachment_ids.is_empty() {
716 return Ok(());
717 }
718 let now = current_epoch_ms() as i64;
719 for id in attachment_ids {
720 sqlx::query(
721 "UPDATE lash_attachment_manifest
722 SET committed_at_ms = COALESCE(committed_at_ms, $1)
723 WHERE attachment_id = $2 AND session_id = $3",
724 )
725 .bind(now)
726 .bind(id.as_str())
727 .bind(session_id)
728 .execute(&mut **tx)
729 .await
730 .map_err(store_sqlx_error)?;
731 }
732 Ok(())
733}
734
735impl AttachmentManifest for PostgresSessionStore {
736 fn record_intent(&self, intent: AttachmentIntent) -> Result<(), StoreError> {
737 let pool = self.pool.clone();
738 block_on_detached(async move {
739 sqlx::query(
740 "INSERT INTO lash_attachment_manifest (
741 attachment_id, session_id, canonical_uri, intent_at_ms, committed_at_ms
742 )
743 VALUES ($1, $2, $3, $4, NULL)
744 ON CONFLICT (attachment_id) DO UPDATE SET
745 session_id = EXCLUDED.session_id,
746 canonical_uri = EXCLUDED.canonical_uri,
747 intent_at_ms = EXCLUDED.intent_at_ms",
748 )
749 .bind(intent.attachment_id.as_str())
750 .bind(intent.session_id)
751 .bind(intent.canonical_uri)
752 .bind(intent.intent_at_epoch_ms as i64)
753 .execute(&pool)
754 .await
755 .map(|_| ())
756 .map_err(store_sqlx_error)
757 })
758 }
759
760 fn commit_refs(
761 &self,
762 session_id: &str,
763 attachment_ids: &[AttachmentId],
764 ) -> Result<(), StoreError> {
765 let pool = self.pool.clone();
766 let session_id = session_id.to_string();
767 let attachment_ids = attachment_ids.to_vec();
768 block_on_detached(async move {
769 let mut tx = pool.begin().await.map_err(store_sqlx_error)?;
770 commit_attachment_refs_tx(&mut tx, &session_id, &attachment_ids).await?;
771 tx.commit().await.map_err(store_sqlx_error)
772 })
773 }
774
775 fn list_uncommitted(
776 &self,
777 older_than_epoch_ms: u64,
778 ) -> Result<Vec<AttachmentManifestEntry>, StoreError> {
779 let pool = self.pool.clone();
780 block_on_detached(async move {
781 let rows = sqlx::query(
782 "SELECT attachment_id, session_id, canonical_uri, intent_at_ms, committed_at_ms
783 FROM lash_attachment_manifest
784 WHERE committed_at_ms IS NULL AND intent_at_ms <= $1
785 ORDER BY attachment_id ASC",
786 )
787 .bind(older_than_epoch_ms as i64)
788 .fetch_all(&pool)
789 .await
790 .map_err(store_sqlx_error)?;
791 Ok(rows
792 .into_iter()
793 .map(|row| AttachmentManifestEntry {
794 attachment_id: AttachmentId::new(row.get::<String, _>(0)),
795 session_id: row.get(1),
796 canonical_uri: row.get(2),
797 intent_at_epoch_ms: row.get::<i64, _>(3) as u64,
798 committed_at_epoch_ms: row.get::<Option<i64>, _>(4).map(|value| value as u64),
799 })
800 .collect())
801 })
802 }
803
804 fn forget(&self, attachment_id: &AttachmentId) -> Result<(), StoreError> {
805 let pool = self.pool.clone();
806 let attachment_id = attachment_id.to_string();
807 block_on_detached(async move {
808 sqlx::query("DELETE FROM lash_attachment_manifest WHERE attachment_id = $1")
809 .bind(attachment_id)
810 .execute(&pool)
811 .await
812 .map(|_| ())
813 .map_err(store_sqlx_error)
814 })
815 }
816}
817
818#[async_trait::async_trait]
819impl SessionStoreFactory for PostgresSessionStoreFactory {
820 fn durability_tier(&self) -> DurabilityTier {
821 DurabilityTier::Durable
822 }
823
824 async fn create_store(
825 &self,
826 request: &SessionStoreCreateRequest,
827 ) -> Result<Arc<dyn RuntimePersistence>, String> {
828 let store = PostgresSessionStore {
829 pool: self.pool.clone(),
830 session_id: Some(request.session_id.clone()),
831 bound_session: Arc::new(OnceLock::new()),
832 };
833 if store
834 .load_session_meta()
835 .await
836 .map_err(|err| err.to_string())?
837 .is_none()
838 {
839 store
840 .save_session_meta(SessionMeta {
841 session_id: request.session_id.clone(),
842 session_name: request.session_id.clone(),
843 created_at: current_timestamp_string(),
844 model: request.policy.model.id.clone(),
845 cwd: std::env::current_dir()
846 .ok()
847 .and_then(|path| path.to_str().map(str::to_string)),
848 relation: request.relation.clone(),
849 })
850 .await
851 .map_err(|err| err.to_string())?;
852 }
853 Ok(Arc::new(store))
854 }
855
856 async fn delete_session(&self, session_id: &str) -> Result<(), String> {
857 let mut tx = self.pool.begin().await.map_err(|err| err.to_string())?;
858 for sql in [
859 "DELETE FROM lash_queued_work_items WHERE batch_id IN (SELECT batch_id FROM lash_queued_work_batches WHERE session_id = $1)",
860 "DELETE FROM lash_queued_work_batches WHERE session_id = $1",
861 "DELETE FROM lash_usage_deltas WHERE session_id = $1",
862 "DELETE FROM lash_graph_nodes WHERE session_id = $1",
863 "DELETE FROM lash_runtime_turn_commits WHERE session_id = $1",
864 "DELETE FROM lash_session_meta WHERE session_id = $1",
865 "DELETE FROM lash_sessions WHERE session_id = $1",
866 "DELETE FROM lash_attachment_manifest WHERE session_id = $1",
867 ] {
868 sqlx::query(sql)
869 .bind(session_id)
870 .execute(&mut *tx)
871 .await
872 .map_err(|err| err.to_string())?;
873 }
874 tx.commit().await.map_err(|err| err.to_string())
875 }
876}
877
878#[derive(Clone, Debug)]
879struct QueuedBatchRow {
880 enqueue_seq: u64,
881 batch_id: String,
882 session_id: String,
883 source_key: Option<String>,
884 delivery_policy: DeliveryPolicy,
885 slot_policy: SlotPolicy,
886 merge_key: MergeKey,
887 available_at_ms: u64,
888 enqueued_at_ms: u64,
889 claim_fencing_token: u64,
890}
891
892fn queued_batch_row(row: PgRow) -> Result<QueuedBatchRow, StoreError> {
893 let delivery_policy =
894 DeliveryPolicy::from_wire_str(row.get::<String, _>("delivery_policy").as_str())
895 .ok_or_else(|| {
896 StoreError::Backend("invalid queued work delivery policy".to_string())
897 })?;
898 let slot_policy = SlotPolicy::from_wire_str(row.get::<String, _>("slot_policy").as_str())
899 .ok_or_else(|| StoreError::Backend("invalid queued work slot policy".to_string()))?;
900 let merge_json: String = row.get("merge_key_json");
901 Ok(QueuedBatchRow {
902 enqueue_seq: row.get::<i64, _>("enqueue_seq") as u64,
903 batch_id: row.get("batch_id"),
904 session_id: row.get("session_id"),
905 source_key: row.get("source_key"),
906 delivery_policy,
907 slot_policy,
908 merge_key: store_decode_json(&merge_json, "queued work merge key")?,
909 available_at_ms: row.get::<i64, _>("available_at_ms") as u64,
910 enqueued_at_ms: row.get::<i64, _>("enqueued_at_ms") as u64,
911 claim_fencing_token: row.get::<i64, _>("claim_fencing_token") as u64,
912 })
913}
914
915async fn load_queued_batch(
916 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
917 batch_id: &str,
918) -> Result<Option<QueuedWorkBatch>, StoreError> {
919 let row = sqlx::query(
920 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
921 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
922 claim_fencing_token
923 FROM lash_queued_work_batches
924 WHERE batch_id = $1",
925 )
926 .bind(batch_id)
927 .fetch_optional(&mut **tx)
928 .await
929 .map_err(store_sqlx_error)?;
930 let Some(row) = row else {
931 return Ok(None);
932 };
933 let row = queued_batch_row(row)?;
934 queued_work_batch_from_row(tx, row).await.map(Some)
935}
936
937async fn queued_work_batch_from_row(
938 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
939 row: QueuedBatchRow,
940) -> Result<QueuedWorkBatch, StoreError> {
941 let item_rows = sqlx::query(
942 "SELECT item_id, payload_json
943 FROM lash_queued_work_items
944 WHERE batch_id = $1
945 ORDER BY item_index ASC",
946 )
947 .bind(&row.batch_id)
948 .fetch_all(&mut **tx)
949 .await
950 .map_err(store_sqlx_error)?;
951 let mut items = Vec::new();
952 for item in item_rows {
953 let payload_json: String = item.get(1);
954 items.push(QueuedWorkItem {
955 item_id: item.get(0),
956 payload: store_decode_json(&payload_json, "queued work payload")?,
957 });
958 }
959 Ok(QueuedWorkBatch {
960 batch_id: row.batch_id,
961 session_id: row.session_id,
962 enqueue_seq: row.enqueue_seq,
963 source_key: row.source_key,
964 delivery_policy: row.delivery_policy,
965 slot_policy: row.slot_policy,
966 merge_key: row.merge_key,
967 available_at_ms: row.available_at_ms,
968 enqueued_at_ms: row.enqueued_at_ms,
969 items,
970 })
971}
972
973async fn ensure_queued_work_completion_tx(
974 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
975 completed: &QueuedWorkCompletion,
976) -> Result<(), StoreError> {
977 for batch_id in &completed.batch_ids {
978 let exists: Option<i64> = sqlx::query_scalar(
979 "SELECT 1::BIGINT FROM lash_queued_work_batches
980 WHERE session_id = $1
981 AND batch_id = $2
982 AND claim_id = $3
983 AND claim_token = $4
984 LIMIT 1",
985 )
986 .bind(&completed.session_id)
987 .bind(batch_id)
988 .bind(&completed.claim_id)
989 .bind(&completed.lease_token)
990 .fetch_optional(&mut **tx)
991 .await
992 .map_err(store_sqlx_error)?;
993 if exists.is_none() {
994 return Err(StoreError::QueuedWorkClaimExpired {
995 session_id: completed.session_id.clone(),
996 claim_id: completed.claim_id.clone(),
997 });
998 }
999 }
1000 Ok(())
1001}
1002
1003#[async_trait::async_trait]
1004impl RuntimePersistence for PostgresSessionStore {
1005 fn durability_tier(&self) -> DurabilityTier {
1006 DurabilityTier::Durable
1007 }
1008
1009 async fn load_session(
1010 &self,
1011 scope: SessionReadScope,
1012 ) -> Result<Option<PersistedSessionRead>, StoreError> {
1013 let Some(session_id) = self.selected_session_id().await? else {
1014 return Ok(None);
1015 };
1016 let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1017 let Some(meta) = load_session_head_meta_tx(&mut tx, &session_id, false).await? else {
1018 return Ok(None);
1019 };
1020 tx.commit().await.map_err(store_sqlx_error)?;
1021 let leaf_node_id = match &scope {
1022 SessionReadScope::FullGraph => meta.leaf_node_id.clone(),
1023 SessionReadScope::ActivePath { leaf_node_id } => {
1024 leaf_node_id.clone().or_else(|| meta.leaf_node_id.clone())
1025 }
1026 };
1027 let graph = load_graph(
1028 &self.pool,
1029 &session_id,
1030 leaf_node_id.clone(),
1031 matches!(scope, SessionReadScope::ActivePath { .. }),
1032 )
1033 .await?;
1034 let checkpoint = match meta.checkpoint_ref.as_ref() {
1035 Some(blob_ref) => get_checkpoint(&self.pool, blob_ref).await,
1036 None => None,
1037 };
1038 Ok(Some(PersistedSessionRead {
1039 session_id: meta.session_id,
1040 head_revision: meta.head_revision,
1041 config: meta.config,
1042 agent_frames: meta.agent_frames,
1043 current_agent_frame_id: meta.current_agent_frame_id,
1044 graph,
1045 checkpoint_ref: meta.checkpoint_ref,
1046 checkpoint,
1047 token_ledger: merge_token_ledger_entries(
1048 load_usage_deltas(&self.pool, &session_id).await,
1049 ),
1050 }))
1051 }
1052
1053 async fn load_node(&self, node_id: &str) -> Result<Option<SessionNodeRecord>, StoreError> {
1054 let json: Option<String> = if let Some(session_id) = &self.session_id {
1055 sqlx::query_scalar(
1056 "SELECT node_json FROM lash_graph_nodes
1057 WHERE session_id = $1 AND node_id = $2 AND tombstoned = FALSE",
1058 )
1059 .bind(session_id)
1060 .bind(node_id)
1061 .fetch_optional(&self.pool)
1062 .await
1063 .map_err(store_sqlx_error)?
1064 } else {
1065 sqlx::query_scalar(
1066 "SELECT node_json FROM lash_graph_nodes
1067 WHERE node_id = $1 AND tombstoned = FALSE
1068 ORDER BY session_id ASC
1069 LIMIT 1",
1070 )
1071 .bind(node_id)
1072 .fetch_optional(&self.pool)
1073 .await
1074 .map_err(store_sqlx_error)?
1075 };
1076 json.map(|json| store_decode_json(&json, "session graph node"))
1077 .transpose()
1078 }
1079
1080 async fn commit_runtime_state(
1081 &self,
1082 commit: RuntimeCommit,
1083 ) -> Result<RuntimeCommitResult, StoreError> {
1084 let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1085 let existing = load_session_head_meta_tx(&mut tx, &commit.session_id, false).await?;
1089 if let Some(bound_session_id) = existing.as_ref().map(|meta| meta.session_id.as_str())
1090 && bound_session_id != commit.session_id
1091 {
1092 return Err(StoreError::SessionBindingMismatch {
1093 bound_session_id: bound_session_id.to_string(),
1094 attempted_session_id: commit.session_id,
1095 });
1096 }
1097 let effective_binding = self
1101 .session_id
1102 .clone()
1103 .or_else(|| self.bound_session.get().cloned());
1104 if let Some(bound_session_id) = &effective_binding
1105 && commit.session_id != *bound_session_id
1106 {
1107 return Err(StoreError::SessionBindingMismatch {
1108 bound_session_id: bound_session_id.clone(),
1109 attempted_session_id: commit.session_id,
1110 });
1111 }
1112 if self.session_id.is_none() {
1113 let _ = self.bound_session.set(commit.session_id.clone());
1114 }
1115 if let Some(completed) = &commit.turn_commit {
1116 if completed.session_id != commit.session_id {
1117 return Err(StoreError::RuntimeTurnCommitConflict {
1118 session_id: completed.session_id.clone(),
1119 turn_id: completed.turn_id.clone(),
1120 });
1121 }
1122 let prior = sqlx::query(
1123 "SELECT turn_commit_hash, result_json
1124 FROM lash_runtime_turn_commits
1125 WHERE session_id = $1 AND turn_id = $2",
1126 )
1127 .bind(&completed.session_id)
1128 .bind(&completed.turn_id)
1129 .fetch_optional(&mut *tx)
1130 .await
1131 .map_err(store_sqlx_error)?;
1132 if let Some(row) = prior {
1133 let hash: String = row.get(0);
1134 let result_json: String = row.get(1);
1135 if hash == completed.turn_commit_hash {
1136 return store_decode_json(&result_json, "runtime turn commit result");
1137 }
1138 return Err(StoreError::RuntimeTurnCommitConflict {
1139 session_id: completed.session_id.clone(),
1140 turn_id: completed.turn_id.clone(),
1141 });
1142 }
1143 }
1144 let actual_revision = existing.as_ref().map_or(0, |meta| meta.head_revision);
1145 if commit.expected_head_revision.is_some()
1146 && commit.expected_head_revision != Some(actual_revision)
1147 {
1148 return Err(StoreError::HeadRevisionConflict {
1149 expected: commit.expected_head_revision,
1150 actual: actual_revision,
1151 });
1152 }
1153 for completed in &commit.completed_queue_claims {
1154 if completed.session_id != commit.session_id {
1155 return Err(StoreError::QueuedWorkClaimExpired {
1156 session_id: completed.session_id.clone(),
1157 claim_id: completed.claim_id.clone(),
1158 });
1159 }
1160 ensure_queued_work_completion_tx(&mut tx, completed).await?;
1161 }
1162 let (checkpoint_ref, manifest) = put_checkpoint_tx(&mut tx, &commit.checkpoint).await?;
1163 for entry in &commit.usage_deltas {
1164 sqlx::query("INSERT INTO lash_usage_deltas (session_id, entry_json) VALUES ($1, $2)")
1165 .bind(&commit.session_id)
1166 .bind(encode_json(entry))
1167 .execute(&mut *tx)
1168 .await
1169 .map_err(store_sqlx_error)?;
1170 }
1171 let leaf_node_id = match &commit.graph {
1172 GraphCommitDelta::Unchanged { leaf_node_id } => leaf_node_id.clone(),
1173 GraphCommitDelta::Append {
1174 nodes,
1175 leaf_node_id,
1176 } => {
1177 for node in nodes {
1178 sqlx::query(
1179 "INSERT INTO lash_graph_nodes (session_id, node_id, node_json)
1180 VALUES ($1, $2, $3)
1181 ON CONFLICT (session_id, node_id) DO UPDATE SET
1182 node_json = EXCLUDED.node_json,
1183 tombstoned = FALSE",
1184 )
1185 .bind(&commit.session_id)
1186 .bind(&node.node_id)
1187 .bind(encode_json(node))
1188 .execute(&mut *tx)
1189 .await
1190 .map_err(store_sqlx_error)?;
1191 }
1192 leaf_node_id.clone()
1193 }
1194 GraphCommitDelta::ReplaceFull(graph) => {
1195 sqlx::query("DELETE FROM lash_graph_nodes WHERE session_id = $1")
1196 .bind(&commit.session_id)
1197 .execute(&mut *tx)
1198 .await
1199 .map_err(store_sqlx_error)?;
1200 for node in &graph.nodes {
1201 sqlx::query(
1202 "INSERT INTO lash_graph_nodes (session_id, node_id, node_json)
1203 VALUES ($1, $2, $3)",
1204 )
1205 .bind(&commit.session_id)
1206 .bind(&node.node_id)
1207 .bind(encode_json(node))
1208 .execute(&mut *tx)
1209 .await
1210 .map_err(store_sqlx_error)?;
1211 }
1212 graph.leaf_node_id.clone()
1213 }
1214 };
1215 let graph_node_count: i64 = sqlx::query_scalar(
1216 "SELECT COUNT(*) FROM lash_graph_nodes WHERE session_id = $1 AND tombstoned = FALSE",
1217 )
1218 .bind(&commit.session_id)
1219 .fetch_one(&mut *tx)
1220 .await
1221 .map_err(store_sqlx_error)?;
1222 let next_revision = actual_revision + 1;
1223 let meta = SessionHeadMeta {
1224 session_id: commit.session_id.clone(),
1225 head_revision: next_revision,
1226 config: commit.config.clone(),
1227 agent_frames: commit.agent_frames.clone(),
1228 current_agent_frame_id: commit.current_agent_frame_id.clone(),
1229 checkpoint_ref: Some(checkpoint_ref.clone()),
1230 leaf_node_id,
1231 graph_node_count: graph_node_count as usize,
1232 token_ledger: Vec::new(),
1233 };
1234 let head_write = sqlx::query(
1239 "INSERT INTO lash_sessions (session_id, head_revision, head_json, checkpoint_ref)
1240 VALUES ($1, $2, $3, $4)
1241 ON CONFLICT (session_id) DO UPDATE SET
1242 head_revision = EXCLUDED.head_revision,
1243 head_json = EXCLUDED.head_json,
1244 checkpoint_ref = EXCLUDED.checkpoint_ref
1245 WHERE lash_sessions.head_revision = $5",
1246 )
1247 .bind(&commit.session_id)
1248 .bind(next_revision as i64)
1249 .bind(encode_json(&meta))
1250 .bind(checkpoint_ref.as_str())
1251 .bind(actual_revision as i64)
1252 .execute(&mut *tx)
1253 .await;
1254 let head_write = match head_write {
1255 Ok(result) => result,
1256 Err(err) if is_contention_error(&err) => {
1257 return Err(StoreError::HeadRevisionConflict {
1262 expected: commit.expected_head_revision.or(Some(actual_revision)),
1263 actual: actual_revision,
1264 });
1265 }
1266 Err(err) => return Err(store_sqlx_error(err)),
1267 };
1268 if head_write.rows_affected() == 0 {
1269 let actual_now = sqlx::query_scalar::<_, i64>(
1274 "SELECT head_revision FROM lash_sessions WHERE session_id = $1",
1275 )
1276 .bind(&commit.session_id)
1277 .fetch_optional(&mut *tx)
1278 .await
1279 .map_err(store_sqlx_error)?
1280 .map_or(actual_revision, |revision| revision as u64);
1281 return Err(StoreError::HeadRevisionConflict {
1282 expected: commit.expected_head_revision.or(Some(actual_revision)),
1283 actual: actual_now,
1284 });
1285 }
1286 for completed in &commit.completed_queue_claims {
1287 for batch_id in &completed.batch_ids {
1288 sqlx::query(
1289 "DELETE FROM lash_queued_work_batches
1290 WHERE session_id = $1 AND batch_id = $2 AND claim_id = $3 AND claim_token = $4",
1291 )
1292 .bind(&completed.session_id)
1293 .bind(batch_id)
1294 .bind(&completed.claim_id)
1295 .bind(&completed.lease_token)
1296 .execute(&mut *tx)
1297 .await
1298 .map_err(store_sqlx_error)?;
1299 }
1300 }
1301 commit_attachment_refs_tx(
1302 &mut tx,
1303 &commit.session_id,
1304 &commit.committed_attachment_ids,
1305 )
1306 .await?;
1307 let result = RuntimeCommitResult {
1308 head_revision: next_revision,
1309 checkpoint_ref,
1310 manifest,
1311 };
1312 if let Some(completed) = &commit.turn_commit {
1313 sqlx::query(
1314 "INSERT INTO lash_runtime_turn_commits (
1315 session_id, turn_id, turn_commit_hash, result_json, committed_at_ms
1316 )
1317 VALUES ($1, $2, $3, $4, $5)",
1318 )
1319 .bind(&completed.session_id)
1320 .bind(&completed.turn_id)
1321 .bind(&completed.turn_commit_hash)
1322 .bind(encode_json(&result))
1323 .bind(current_epoch_ms() as i64)
1324 .execute(&mut *tx)
1325 .await
1326 .map_err(store_sqlx_error)?;
1327 }
1328 tx.commit().await.map_err(store_sqlx_error)?;
1329 Ok(result)
1330 }
1331
1332 async fn enqueue_queued_work(
1333 &self,
1334 batch: QueuedWorkBatchDraft,
1335 ) -> Result<QueuedWorkBatch, StoreError> {
1336 let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1337 if let Some(source_key) = batch.source_key.as_deref() {
1338 let existing_id: Option<String> = sqlx::query_scalar(
1339 "SELECT batch_id FROM lash_queued_work_batches
1340 WHERE session_id = $1 AND source_key = $2",
1341 )
1342 .bind(&batch.session_id)
1343 .bind(source_key)
1344 .fetch_optional(&mut *tx)
1345 .await
1346 .map_err(store_sqlx_error)?;
1347 if let Some(batch_id) = existing_id {
1348 let existing = load_queued_batch(&mut tx, &batch_id)
1349 .await?
1350 .ok_or_else(|| {
1351 StoreError::Backend("queued work source row disappeared".to_string())
1352 })?;
1353 tx.commit().await.map_err(store_sqlx_error)?;
1354 return Ok(existing);
1355 }
1356 }
1357 let now = current_epoch_ms();
1358 let batch_id = format!(
1359 "qwb:{:x}",
1360 Sha256::digest(format!("{}:{:?}:{now}", batch.session_id, batch.source_key).as_bytes())
1361 );
1362 let row = sqlx::query_scalar::<_, i64>(
1363 "INSERT INTO lash_queued_work_batches (
1364 batch_id, session_id, source_key, delivery_policy, slot_policy,
1365 merge_key_json, available_at_ms, enqueued_at_ms
1366 )
1367 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
1368 RETURNING enqueue_seq",
1369 )
1370 .bind(&batch_id)
1371 .bind(&batch.session_id)
1372 .bind(&batch.source_key)
1373 .bind(batch.delivery_policy.as_str())
1374 .bind(batch.slot_policy.as_str())
1375 .bind(encode_json(&batch.merge_key))
1376 .bind(batch.available_at_ms as i64)
1377 .bind(now as i64)
1378 .fetch_one(&mut *tx)
1379 .await
1380 .map_err(store_sqlx_error)?;
1381 for (index, payload) in batch.payloads.iter().enumerate() {
1382 let item_id = format!("{batch_id}:item:{index}");
1383 sqlx::query(
1384 "INSERT INTO lash_queued_work_items (batch_id, item_index, item_id, payload_json)
1385 VALUES ($1, $2, $3, $4)",
1386 )
1387 .bind(&batch_id)
1388 .bind(index as i32)
1389 .bind(item_id)
1390 .bind(encode_json(payload))
1391 .execute(&mut *tx)
1392 .await
1393 .map_err(store_sqlx_error)?;
1394 }
1395 let queued = load_queued_batch(&mut tx, &batch_id)
1396 .await?
1397 .ok_or_else(|| StoreError::Backend("queued work insert disappeared".to_string()))?;
1398 debug_assert_eq!(queued.enqueue_seq, row as u64);
1399 tx.commit().await.map_err(store_sqlx_error)?;
1400 Ok(queued)
1401 }
1402
1403 async fn claim_ready_queued_work(
1404 &self,
1405 session_id: &str,
1406 owner_id: &str,
1407 boundary: QueuedWorkClaimBoundary,
1408 lease_ttl_ms: u64,
1409 max_batches: usize,
1410 ) -> Result<Option<QueuedWorkClaim>, StoreError> {
1411 if max_batches == 0 {
1412 return Ok(None);
1413 }
1414 let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1415 let now = current_epoch_ms();
1416 let rows = sqlx::query(
1417 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1418 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1419 claim_fencing_token
1420 FROM lash_queued_work_batches
1421 WHERE session_id = $1
1422 AND available_at_ms <= $2
1423 AND (claim_token IS NULL OR claim_expires_at_ms <= $2)
1424 ORDER BY enqueue_seq ASC
1425 LIMIT $3
1426 FOR UPDATE SKIP LOCKED",
1427 )
1428 .bind(session_id)
1429 .bind(now as i64)
1430 .bind((max_batches as i64).saturating_add(32))
1431 .fetch_all(&mut *tx)
1432 .await
1433 .map_err(store_sqlx_error)?;
1434 let mut candidates = Vec::new();
1435 for row in rows {
1436 candidates.push(queued_batch_row(row)?);
1437 }
1438 let Some(first_row) = candidates.first() else {
1439 tx.commit().await.map_err(store_sqlx_error)?;
1440 return Ok(None);
1441 };
1442 if boundary == QueuedWorkClaimBoundary::ActiveTurnCheckpoint
1443 && first_row.delivery_policy != DeliveryPolicy::EarliestSafeBoundary
1444 {
1445 tx.commit().await.map_err(store_sqlx_error)?;
1446 return Ok(None);
1447 }
1448 let first_slot = first_row.slot_policy;
1449 let first_delivery = first_row.delivery_policy;
1450 let first_merge = first_row.merge_key.clone();
1451 let mut selected = Vec::new();
1452 for row in candidates {
1453 if selected.len() >= max_batches {
1454 break;
1455 }
1456 if selected.is_empty() {
1457 selected.push(row);
1458 if first_slot == SlotPolicy::Exclusive {
1459 break;
1460 }
1461 continue;
1462 }
1463 if first_slot != SlotPolicy::Join
1464 || row.slot_policy != SlotPolicy::Join
1465 || row.delivery_policy != first_delivery
1466 || row.merge_key != first_merge
1467 {
1468 break;
1469 }
1470 selected.push(row);
1471 }
1472 let Some(first) = selected.first() else {
1473 tx.commit().await.map_err(store_sqlx_error)?;
1474 return Ok(None);
1475 };
1476 let fencing_token = first.claim_fencing_token.saturating_add(1);
1477 let claim_id = format!("qwc:{}:{fencing_token}", first.enqueue_seq);
1478 let lease_token = format!(
1479 "{:x}",
1480 Sha256::digest(format!("{session_id}:{owner_id}:{claim_id}:{now}").as_bytes())
1481 );
1482 let expires_at = now.saturating_add(lease_ttl_ms);
1483 for row in &selected {
1484 let changed = sqlx::query(
1485 "UPDATE lash_queued_work_batches
1486 SET claim_id = $3,
1487 claim_owner_id = $4,
1488 claim_token = $5,
1489 claim_fencing_token = claim_fencing_token + 1,
1490 claim_claimed_at_ms = $6,
1491 claim_expires_at_ms = $7
1492 WHERE session_id = $1
1493 AND batch_id = $2
1494 AND (claim_token IS NULL OR claim_expires_at_ms <= $6)",
1495 )
1496 .bind(session_id)
1497 .bind(&row.batch_id)
1498 .bind(&claim_id)
1499 .bind(owner_id)
1500 .bind(&lease_token)
1501 .bind(now as i64)
1502 .bind(expires_at as i64)
1503 .execute(&mut *tx)
1504 .await
1505 .map_err(store_sqlx_error)?
1506 .rows_affected();
1507 if changed == 0 {
1508 tx.rollback().await.map_err(store_sqlx_error)?;
1509 return Ok(None);
1510 }
1511 }
1512 let mut batches = Vec::new();
1513 for row in selected {
1514 batches.push(queued_work_batch_from_row(&mut tx, row).await?);
1515 }
1516 tx.commit().await.map_err(store_sqlx_error)?;
1517 Ok(Some(QueuedWorkClaim {
1518 session_id: session_id.to_string(),
1519 claim_id,
1520 owner_id: owner_id.to_string(),
1521 lease_token,
1522 fencing_token,
1523 claimed_at_epoch_ms: now,
1524 expires_at_epoch_ms: expires_at,
1525 batches,
1526 }))
1527 }
1528
1529 async fn renew_queued_work_claim(
1530 &self,
1531 claim: &QueuedWorkClaim,
1532 lease_ttl_ms: u64,
1533 ) -> Result<QueuedWorkClaim, StoreError> {
1534 let expires_at = current_epoch_ms().saturating_add(lease_ttl_ms);
1535 let changed = sqlx::query(
1536 "UPDATE lash_queued_work_batches
1537 SET claim_expires_at_ms = $4
1538 WHERE session_id = $1 AND claim_id = $2 AND claim_token = $3",
1539 )
1540 .bind(&claim.session_id)
1541 .bind(&claim.claim_id)
1542 .bind(&claim.lease_token)
1543 .bind(expires_at as i64)
1544 .execute(&self.pool)
1545 .await
1546 .map_err(store_sqlx_error)?
1547 .rows_affected();
1548 if changed as usize != claim.batches.len() {
1549 return Err(StoreError::QueuedWorkClaimExpired {
1550 session_id: claim.session_id.clone(),
1551 claim_id: claim.claim_id.clone(),
1552 });
1553 }
1554 Ok(QueuedWorkClaim {
1555 expires_at_epoch_ms: expires_at,
1556 ..claim.clone()
1557 })
1558 }
1559
1560 async fn abandon_queued_work_claim(&self, claim: &QueuedWorkClaim) -> Result<(), StoreError> {
1561 sqlx::query(
1562 "UPDATE lash_queued_work_batches
1563 SET claim_id = NULL,
1564 claim_owner_id = NULL,
1565 claim_token = NULL,
1566 claim_claimed_at_ms = 0,
1567 claim_expires_at_ms = 0
1568 WHERE session_id = $1 AND claim_id = $2 AND claim_token = $3",
1569 )
1570 .bind(&claim.session_id)
1571 .bind(&claim.claim_id)
1572 .bind(&claim.lease_token)
1573 .execute(&self.pool)
1574 .await
1575 .map_err(store_sqlx_error)?;
1576 Ok(())
1577 }
1578
1579 async fn cancel_queued_work_batch(
1580 &self,
1581 session_id: &str,
1582 batch_id: &str,
1583 ) -> Result<Option<QueuedWorkBatch>, StoreError> {
1584 let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1585 let now = current_epoch_ms();
1586 let row = sqlx::query(
1587 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1588 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1589 claim_fencing_token
1590 FROM lash_queued_work_batches
1591 WHERE session_id = $1
1592 AND batch_id = $2
1593 AND (claim_token IS NULL OR claim_expires_at_ms <= $3)
1594 FOR UPDATE",
1595 )
1596 .bind(session_id)
1597 .bind(batch_id)
1598 .bind(now as i64)
1599 .fetch_optional(&mut *tx)
1600 .await
1601 .map_err(store_sqlx_error)?;
1602 let Some(row) = row else {
1603 tx.commit().await.map_err(store_sqlx_error)?;
1604 return Ok(None);
1605 };
1606 let batch = queued_work_batch_from_row(&mut tx, queued_batch_row(row)?).await?;
1607 sqlx::query("DELETE FROM lash_queued_work_batches WHERE batch_id = $1")
1608 .bind(batch_id)
1609 .execute(&mut *tx)
1610 .await
1611 .map_err(store_sqlx_error)?;
1612 tx.commit().await.map_err(store_sqlx_error)?;
1613 Ok(Some(batch))
1614 }
1615
1616 async fn list_queued_work(&self, session_id: &str) -> Result<Vec<QueuedWorkBatch>, StoreError> {
1617 let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1618 let rows = sqlx::query(
1619 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1620 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1621 claim_fencing_token
1622 FROM lash_queued_work_batches
1623 WHERE session_id = $1
1624 ORDER BY enqueue_seq ASC",
1625 )
1626 .bind(session_id)
1627 .fetch_all(&mut *tx)
1628 .await
1629 .map_err(store_sqlx_error)?;
1630 let mut batches = Vec::new();
1631 for row in rows {
1632 batches.push(queued_work_batch_from_row(&mut tx, queued_batch_row(row)?).await?);
1633 }
1634 tx.commit().await.map_err(store_sqlx_error)?;
1635 Ok(batches)
1636 }
1637
1638 async fn list_pending_queued_work(
1639 &self,
1640 session_id: &str,
1641 ) -> Result<Vec<QueuedWorkBatch>, StoreError> {
1642 let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1643 let now = current_epoch_ms();
1644 let rows = sqlx::query(
1645 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1646 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1647 claim_fencing_token
1648 FROM lash_queued_work_batches
1649 WHERE session_id = $1
1650 AND (claim_token IS NULL OR claim_expires_at_ms <= $2)
1651 ORDER BY enqueue_seq ASC",
1652 )
1653 .bind(session_id)
1654 .bind(now as i64)
1655 .fetch_all(&mut *tx)
1656 .await
1657 .map_err(store_sqlx_error)?;
1658 let mut batches = Vec::new();
1659 for row in rows {
1660 batches.push(queued_work_batch_from_row(&mut tx, queued_batch_row(row)?).await?);
1661 }
1662 tx.commit().await.map_err(store_sqlx_error)?;
1663 Ok(batches)
1664 }
1665
1666 async fn save_session_meta(&self, meta: SessionMeta) -> Result<(), StoreError> {
1667 sqlx::query(
1668 "INSERT INTO lash_session_meta (session_id, meta_json)
1669 VALUES ($1, $2)
1670 ON CONFLICT (session_id) DO UPDATE SET meta_json = EXCLUDED.meta_json",
1671 )
1672 .bind(&meta.session_id)
1673 .bind(encode_json(&meta))
1674 .execute(&self.pool)
1675 .await
1676 .map_err(store_sqlx_error)?;
1677 Ok(())
1678 }
1679
1680 async fn load_session_meta(&self) -> Result<Option<SessionMeta>, StoreError> {
1681 let json: Option<String> = if let Some(session_id) = &self.session_id {
1682 sqlx::query_scalar("SELECT meta_json FROM lash_session_meta WHERE session_id = $1")
1683 .bind(session_id)
1684 .fetch_optional(&self.pool)
1685 .await
1686 .map_err(store_sqlx_error)?
1687 } else {
1688 sqlx::query_scalar(
1689 "SELECT meta_json FROM lash_session_meta ORDER BY session_id ASC LIMIT 1",
1690 )
1691 .fetch_optional(&self.pool)
1692 .await
1693 .map_err(store_sqlx_error)?
1694 };
1695 json.map(|json| store_decode_json(&json, "session meta"))
1696 .transpose()
1697 }
1698
1699 async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), StoreError> {
1700 for id in ids {
1701 if let Some(session_id) = &self.session_id {
1702 sqlx::query(
1703 "UPDATE lash_graph_nodes
1704 SET tombstoned = TRUE
1705 WHERE session_id = $1 AND node_id = $2",
1706 )
1707 .bind(session_id)
1708 .bind(id)
1709 .execute(&self.pool)
1710 .await
1711 .map_err(store_sqlx_error)?;
1712 } else {
1713 sqlx::query(
1714 "UPDATE lash_graph_nodes
1715 SET tombstoned = TRUE
1716 WHERE node_id = $1",
1717 )
1718 .bind(id)
1719 .execute(&self.pool)
1720 .await
1721 .map_err(store_sqlx_error)?;
1722 }
1723 }
1724 Ok(())
1725 }
1726
1727 async fn vacuum(&self) -> Result<VacuumReport, StoreError> {
1728 let removed = if let Some(session_id) = &self.session_id {
1729 sqlx::query("DELETE FROM lash_graph_nodes WHERE session_id = $1 AND tombstoned = TRUE")
1730 .bind(session_id)
1731 .execute(&self.pool)
1732 .await
1733 .map_err(store_sqlx_error)?
1734 .rows_affected()
1735 } else {
1736 sqlx::query("DELETE FROM lash_graph_nodes WHERE tombstoned = TRUE")
1737 .execute(&self.pool)
1738 .await
1739 .map_err(store_sqlx_error)?
1740 .rows_affected()
1741 };
1742 Ok(VacuumReport {
1743 removed_node_count: removed as usize,
1744 })
1745 }
1746
1747 async fn gc_unreachable(&self) -> Result<GcReport, StoreError> {
1748 Ok(GcReport::default())
1749 }
1750}
1751
1752fn process_status_label(record: &ProcessRecord) -> &'static str {
1753 record.status.label()
1754}
1755
1756async fn load_process_tx(
1757 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1758 process_id: &str,
1759) -> Result<Option<ProcessRecord>, PluginError> {
1760 let json: Option<String> = sqlx::query_scalar(
1761 "SELECT record_json
1762 FROM lash_processes
1763 WHERE process_id = $1
1764 FOR UPDATE",
1765 )
1766 .bind(process_id)
1767 .fetch_optional(&mut **tx)
1768 .await
1769 .map_err(plugin_sqlx_error)?;
1770 json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
1771 .transpose()
1772}
1773
1774async fn load_process(
1775 pool: &PgPool,
1776 process_id: &str,
1777) -> Result<Option<ProcessRecord>, PluginError> {
1778 let json: Option<String> =
1779 sqlx::query_scalar("SELECT record_json FROM lash_processes WHERE process_id = $1")
1780 .bind(process_id)
1781 .fetch_optional(pool)
1782 .await
1783 .map_err(plugin_sqlx_error)?;
1784 json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
1785 .transpose()
1786}
1787
1788async fn save_process_tx(
1789 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1790 record: &ProcessRecord,
1791) -> Result<(), PluginError> {
1792 sqlx::query(
1793 "UPDATE lash_processes
1794 SET updated_at_ms = $2, status = $3, record_json = $4
1795 WHERE process_id = $1",
1796 )
1797 .bind(&record.id)
1798 .bind(record.updated_at_ms as i64)
1799 .bind(process_status_label(record))
1800 .bind(serde_json::to_string(record).map_err(process_decode_error)?)
1801 .execute(&mut **tx)
1802 .await
1803 .map_err(plugin_sqlx_error)?;
1804 Ok(())
1805}
1806
1807async fn load_event_by_key_tx(
1808 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1809 process_id: &str,
1810 replay_key: &str,
1811) -> Result<Option<(String, ProcessEvent)>, PluginError> {
1812 let row = sqlx::query(
1813 "SELECT payload_hash, event_json
1814 FROM lash_process_events
1815 WHERE process_id = $1 AND idempotency_key = $2",
1816 )
1817 .bind(process_id)
1818 .bind(replay_key)
1819 .fetch_optional(&mut **tx)
1820 .await
1821 .map_err(plugin_sqlx_error)?;
1822 row.map(|row| {
1823 let hash: String = row.get(0);
1824 let json: String = row.get(1);
1825 serde_json::from_str(&json)
1826 .map(|event| (hash, event))
1827 .map_err(process_decode_error)
1828 })
1829 .transpose()
1830}
1831
1832async fn load_process_lease_tx(
1833 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1834 process_id: &str,
1835) -> Result<Option<ProcessLease>, PluginError> {
1836 let row = sqlx::query(
1837 "SELECT lease_owner_id, lease_token, lease_fencing_token,
1838 lease_claimed_at_ms, lease_expires_at_ms
1839 FROM lash_process_leases
1840 WHERE process_id = $1",
1841 )
1842 .bind(process_id)
1843 .fetch_optional(&mut **tx)
1844 .await
1845 .map_err(plugin_sqlx_error)?;
1846 let Some(row) = row else {
1847 return Ok(None);
1848 };
1849 let owner_id: Option<String> = row.get(0);
1850 let lease_token: Option<String> = row.get(1);
1851 let (Some(owner_id), Some(lease_token)) = (owner_id, lease_token) else {
1852 return Ok(None);
1853 };
1854 Ok(Some(ProcessLease {
1855 schema_version: PROCESS_LEASE_SCHEMA_VERSION,
1856 process_id: process_id.to_string(),
1857 owner_id,
1858 lease_token,
1859 fencing_token: row.get::<i64, _>(2) as u64,
1860 claimed_at_epoch_ms: row.get::<i64, _>(3) as u64,
1861 expires_at_epoch_ms: row.get::<i64, _>(4) as u64,
1862 }))
1863}
1864
1865fn process_lease_conflict(process_id: &str, current: &ProcessLease) -> PluginError {
1866 PluginError::Session(format!(
1867 "process `{process_id}` is already leased by `{}` until {}",
1868 current.owner_id, current.expires_at_epoch_ms
1869 ))
1870}
1871
1872fn process_lease_expired(process_id: &str) -> PluginError {
1873 PluginError::Session(format!(
1874 "process lease for `{process_id}` is missing or expired"
1875 ))
1876}
1877
1878fn guard_lease(current: Option<&ProcessLease>, lease_token: &str, now: u64) -> bool {
1879 current
1880 .map(|current| current.lease_token == lease_token && current.expires_at_epoch_ms > now)
1881 .unwrap_or(false)
1882}
1883
1884async fn list_grants_for_scope(
1885 pool: &PgPool,
1886 owner_scope: &ProcessScope,
1887 live_only: bool,
1888) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
1889 let status_clause = if live_only {
1890 "AND p.status = 'running'"
1891 } else {
1892 ""
1893 };
1894 let sql = format!(
1895 "SELECT g.process_id, g.descriptor_json, p.record_json
1896 FROM lash_process_handle_grants g
1897 JOIN lash_processes p ON p.process_id = g.process_id
1898 WHERE g.scope_id = $1 {status_clause}
1899 ORDER BY g.process_id ASC"
1900 );
1901 let rows = sqlx::query(&sql)
1902 .bind(owner_scope.id().as_str())
1903 .fetch_all(pool)
1904 .await
1905 .map_err(plugin_sqlx_error)?;
1906 let mut entries = Vec::new();
1907 for row in rows {
1908 let process_id: String = row.get(0);
1909 let descriptor_json: String = row.get(1);
1910 let record_json: String = row.get(2);
1911 let descriptor: ProcessHandleDescriptor =
1912 serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
1913 let record: ProcessRecord =
1914 serde_json::from_str(&record_json).map_err(process_decode_error)?;
1915 entries.push((
1916 ProcessHandleGrant {
1917 session_id: owner_scope.session_id.clone(),
1918 process_id,
1919 descriptor,
1920 },
1921 record,
1922 ));
1923 }
1924 Ok(entries)
1925}
1926
1927#[async_trait::async_trait]
1928impl ProcessRegistry for PostgresProcessRegistry {
1929 fn durability_tier(&self) -> DurabilityTier {
1930 DurabilityTier::Durable
1931 }
1932
1933 async fn register_process(
1934 &self,
1935 registration: ProcessRegistration,
1936 ) -> Result<ProcessRecord, PluginError> {
1937 let (registration, registration_hash) =
1938 lash_core::runtime::prepare_process_registration(registration)?;
1939 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
1940 if let Some(existing) = load_process_tx(&mut tx, ®istration.id).await? {
1941 if existing.registration_hash == registration_hash {
1942 tx.commit().await.map_err(plugin_sqlx_error)?;
1943 return Ok(existing);
1944 }
1945 return Err(PluginError::Session(format!(
1946 "process `{}` registration hash conflict: existing {}, new {}",
1947 registration.id, existing.registration_hash, registration_hash
1948 )));
1949 }
1950 let now = current_epoch_ms();
1951 let record =
1952 ProcessRecord::from_prepared_registration(registration, registration_hash, now);
1953 let record_json = serde_json::to_string(&record).map_err(process_decode_error)?;
1954 sqlx::query(
1955 "INSERT INTO lash_processes (
1956 process_id, registration_hash, owner_scope_id, host_profile_id,
1957 created_at_ms, updated_at_ms, status, record_json
1958 )
1959 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
1960 )
1961 .bind(&record.id)
1962 .bind(&record.registration_hash)
1963 .bind(record.owner_scope_id().as_str())
1964 .bind(record.host_profile_id())
1965 .bind(record.created_at_ms as i64)
1966 .bind(record.updated_at_ms as i64)
1967 .bind(process_status_label(&record))
1968 .bind(record_json)
1969 .execute(&mut *tx)
1970 .await
1971 .map_err(plugin_sqlx_error)?;
1972 tx.commit().await.map_err(plugin_sqlx_error)?;
1973 Ok(record)
1974 }
1975
1976 async fn set_external_ref(
1977 &self,
1978 process_id: &str,
1979 external_ref: ProcessExternalRef,
1980 ) -> Result<ProcessRecord, PluginError> {
1981 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
1982 let mut record = load_process_tx(&mut tx, process_id)
1983 .await?
1984 .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
1985 record.external_ref = Some(external_ref);
1986 record.updated_at_ms = current_epoch_ms();
1987 save_process_tx(&mut tx, &record).await?;
1988 tx.commit().await.map_err(plugin_sqlx_error)?;
1989 Ok(record)
1990 }
1991
1992 async fn grant_handle(
1993 &self,
1994 owner_scope: &ProcessScope,
1995 process_id: &str,
1996 descriptor: ProcessHandleDescriptor,
1997 ) -> Result<ProcessHandleGrant, PluginError> {
1998 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
1999 if load_process_tx(&mut tx, process_id).await?.is_none() {
2000 return Err(PluginError::Session(format!(
2001 "unknown process `{process_id}`"
2002 )));
2003 }
2004 sqlx::query(
2005 "INSERT INTO lash_process_handle_grants (session_id, scope_id, process_id, descriptor_json)
2006 VALUES ($1, $2, $3, $4)
2007 ON CONFLICT (scope_id, process_id) DO UPDATE SET
2008 session_id = EXCLUDED.session_id,
2009 descriptor_json = EXCLUDED.descriptor_json",
2010 )
2011 .bind(&owner_scope.session_id)
2012 .bind(owner_scope.id().as_str())
2013 .bind(process_id)
2014 .bind(serde_json::to_string(&descriptor).map_err(process_decode_error)?)
2015 .execute(&mut *tx)
2016 .await
2017 .map_err(plugin_sqlx_error)?;
2018 tx.commit().await.map_err(plugin_sqlx_error)?;
2019 Ok(ProcessHandleGrant {
2020 session_id: owner_scope.session_id.clone(),
2021 process_id: process_id.to_string(),
2022 descriptor,
2023 })
2024 }
2025
2026 async fn revoke_handle(
2027 &self,
2028 owner_scope: &ProcessScope,
2029 process_id: &str,
2030 ) -> Result<(), PluginError> {
2031 sqlx::query(
2032 "DELETE FROM lash_process_handle_grants WHERE scope_id = $1 AND process_id = $2",
2033 )
2034 .bind(owner_scope.id().as_str())
2035 .bind(process_id)
2036 .execute(&self.pool)
2037 .await
2038 .map_err(plugin_sqlx_error)?;
2039 Ok(())
2040 }
2041
2042 async fn transfer_handle_grants(
2043 &self,
2044 from_scope: &ProcessScope,
2045 to_scope: &ProcessScope,
2046 process_ids: &[String],
2047 ) -> Result<(), PluginError> {
2048 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2049 for process_id in process_ids {
2050 let descriptor_json: Option<String> = sqlx::query_scalar(
2051 "SELECT descriptor_json FROM lash_process_handle_grants
2052 WHERE scope_id = $1 AND process_id = $2",
2053 )
2054 .bind(from_scope.id().as_str())
2055 .bind(process_id)
2056 .fetch_optional(&mut *tx)
2057 .await
2058 .map_err(plugin_sqlx_error)?;
2059 let Some(descriptor_json) = descriptor_json else {
2060 return Err(PluginError::Session(format!(
2061 "process handle `{process_id}` is not granted to session `{}`",
2062 from_scope.session_id
2063 )));
2064 };
2065 sqlx::query(
2066 "DELETE FROM lash_process_handle_grants WHERE scope_id = $1 AND process_id = $2",
2067 )
2068 .bind(from_scope.id().as_str())
2069 .bind(process_id)
2070 .execute(&mut *tx)
2071 .await
2072 .map_err(plugin_sqlx_error)?;
2073 sqlx::query(
2074 "INSERT INTO lash_process_handle_grants (session_id, scope_id, process_id, descriptor_json)
2075 VALUES ($1, $2, $3, $4)
2076 ON CONFLICT (scope_id, process_id) DO UPDATE SET
2077 session_id = EXCLUDED.session_id,
2078 descriptor_json = EXCLUDED.descriptor_json",
2079 )
2080 .bind(&to_scope.session_id)
2081 .bind(to_scope.id().as_str())
2082 .bind(process_id)
2083 .bind(descriptor_json)
2084 .execute(&mut *tx)
2085 .await
2086 .map_err(plugin_sqlx_error)?;
2087 }
2088 tx.commit().await.map_err(plugin_sqlx_error)
2089 }
2090
2091 async fn list_handle_grants(
2092 &self,
2093 owner_scope: &ProcessScope,
2094 ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
2095 list_grants_for_scope(&self.pool, owner_scope, false).await
2096 }
2097
2098 async fn list_live_handle_grants(
2099 &self,
2100 owner_scope: &ProcessScope,
2101 ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
2102 list_grants_for_scope(&self.pool, owner_scope, true).await
2103 }
2104
2105 async fn has_handle_grant(
2106 &self,
2107 owner_scope: &ProcessScope,
2108 process_id: &str,
2109 ) -> Result<bool, PluginError> {
2110 let exists: Option<i64> = sqlx::query_scalar(
2111 "SELECT 1::BIGINT FROM lash_process_handle_grants
2112 WHERE scope_id = $1 AND process_id = $2
2113 LIMIT 1",
2114 )
2115 .bind(owner_scope.id().as_str())
2116 .bind(process_id)
2117 .fetch_optional(&self.pool)
2118 .await
2119 .map_err(plugin_sqlx_error)?;
2120 Ok(exists.is_some())
2121 }
2122
2123 async fn handle_grants_for_process(
2124 &self,
2125 process_id: &str,
2126 ) -> Result<Vec<ProcessHandleGrant>, PluginError> {
2127 if load_process(&self.pool, process_id).await?.is_none() {
2128 return Err(PluginError::Session(format!(
2129 "unknown process `{process_id}`"
2130 )));
2131 }
2132 let rows = sqlx::query(
2133 "SELECT session_id, descriptor_json
2134 FROM lash_process_handle_grants
2135 WHERE process_id = $1
2136 ORDER BY session_id ASC, scope_id ASC",
2137 )
2138 .bind(process_id)
2139 .fetch_all(&self.pool)
2140 .await
2141 .map_err(plugin_sqlx_error)?;
2142 let mut grants = Vec::new();
2143 for row in rows {
2144 let descriptor_json: String = row.get(1);
2145 grants.push(ProcessHandleGrant {
2146 session_id: row.get(0),
2147 process_id: process_id.to_string(),
2148 descriptor: serde_json::from_str(&descriptor_json).map_err(process_decode_error)?,
2149 });
2150 }
2151 Ok(grants)
2152 }
2153
2154 async fn delete_session_process_state(
2155 &self,
2156 session_id: &str,
2157 ) -> Result<lash_core::ProcessSessionDeleteReport, PluginError> {
2158 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2159 let rows = sqlx::query(
2160 "SELECT g.process_id, p.record_json
2161 FROM lash_process_handle_grants g
2162 JOIN lash_processes p ON p.process_id = g.process_id
2163 WHERE g.session_id = $1
2164 ORDER BY g.process_id ASC",
2165 )
2166 .bind(session_id)
2167 .fetch_all(&mut *tx)
2168 .await
2169 .map_err(plugin_sqlx_error)?;
2170 let mut removed = Vec::new();
2171 for row in rows {
2172 let process_id: String = row.get(0);
2173 let record_json: String = row.get(1);
2174 let record: ProcessRecord =
2175 serde_json::from_str(&record_json).map_err(process_decode_error)?;
2176 removed.push((process_id, record));
2177 }
2178 let revoked = sqlx::query("DELETE FROM lash_process_handle_grants WHERE session_id = $1")
2179 .bind(session_id)
2180 .execute(&mut *tx)
2181 .await
2182 .map_err(plugin_sqlx_error)?
2183 .rows_affected() as usize;
2184 let mut cancel_process_ids = Vec::new();
2185 let mut preserved_process_ids = Vec::new();
2186 for (process_id, record) in removed {
2187 if record.is_terminal() {
2188 continue;
2189 }
2190 let remaining: i64 = sqlx::query_scalar(
2191 "SELECT COUNT(*) FROM lash_process_handle_grants WHERE process_id = $1",
2192 )
2193 .bind(&process_id)
2194 .fetch_one(&mut *tx)
2195 .await
2196 .map_err(plugin_sqlx_error)?;
2197 if remaining == 0 {
2198 cancel_process_ids.push(process_id);
2199 } else {
2200 preserved_process_ids.push(process_id);
2201 }
2202 }
2203 tx.commit().await.map_err(plugin_sqlx_error)?;
2204 cancel_process_ids.sort();
2205 cancel_process_ids.dedup();
2206 preserved_process_ids.sort();
2207 preserved_process_ids.dedup();
2208 Ok(lash_core::ProcessSessionDeleteReport {
2209 session_id: session_id.to_string(),
2210 revoked_handle_count: revoked,
2211 deleted_wake_count: 0,
2212 cancel_process_ids,
2213 preserved_process_ids,
2214 })
2215 }
2216
2217 async fn append_event(
2218 &self,
2219 process_id: &str,
2220 request: ProcessEventAppendRequest,
2221 ) -> Result<ProcessEventAppendResult, PluginError> {
2222 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2223 let mut record = load_process_tx(&mut tx, process_id)
2224 .await?
2225 .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
2226 let replay_lookup =
2227 if let Some(replay_key) = request.replay.as_ref().map(|r| r.key.as_str()) {
2228 load_event_by_key_tx(&mut tx, process_id, replay_key).await?
2229 } else {
2230 None
2231 };
2232 let sequence: i64 = sqlx::query_scalar(
2233 "SELECT COALESCE(MAX(sequence), 0) + 1 FROM lash_process_events WHERE process_id = $1",
2234 )
2235 .bind(process_id)
2236 .fetch_one(&mut *tx)
2237 .await
2238 .map_err(plugin_sqlx_error)?;
2239 let occurred_at_ms = current_epoch_ms();
2240 let prepared = lash_core::runtime::prepare_process_event_append(
2241 &record,
2242 request,
2243 sequence as u64,
2244 replay_lookup,
2245 occurred_at_ms,
2246 )?;
2247 if prepared.replayed {
2248 let repaired = if let Some(status) = prepared.status_update.clone() {
2249 record.status = status;
2250 record.updated_at_ms = prepared.occurred_at_ms;
2251 save_process_tx(&mut tx, &record).await?;
2252 true
2253 } else {
2254 false
2255 };
2256 tx.commit().await.map_err(plugin_sqlx_error)?;
2257 if repaired {
2258 self.notify.notify_waiters();
2259 }
2260 return Ok(ProcessEventAppendResult {
2261 event: prepared.event,
2262 wake_delivery: prepared.wake_delivery,
2263 });
2264 }
2265 let event = prepared.event;
2266 sqlx::query(
2267 "INSERT INTO lash_process_events (
2268 process_id, sequence, event_type, payload_hash, idempotency_key,
2269 occurred_at_ms, event_json
2270 )
2271 VALUES ($1, $2, $3, $4, $5, $6, $7)",
2272 )
2273 .bind(process_id)
2274 .bind(sequence)
2275 .bind(event.event_type.as_str())
2276 .bind(&prepared.payload_hash)
2277 .bind(event.invocation.replay_key())
2278 .bind(prepared.occurred_at_ms as i64)
2279 .bind(serde_json::to_string(&event).map_err(process_decode_error)?)
2280 .execute(&mut *tx)
2281 .await
2282 .map_err(plugin_sqlx_error)?;
2283 if let Some(status) = prepared.status_update.clone() {
2284 record.status = status;
2285 }
2286 record.updated_at_ms = prepared.occurred_at_ms;
2287 save_process_tx(&mut tx, &record).await?;
2288 tx.commit().await.map_err(plugin_sqlx_error)?;
2289 self.notify.notify_waiters();
2290 Ok(ProcessEventAppendResult {
2291 event,
2292 wake_delivery: prepared.wake_delivery,
2293 })
2294 }
2295
2296 async fn events_after(
2297 &self,
2298 process_id: &str,
2299 after_sequence: u64,
2300 ) -> Result<Vec<ProcessEvent>, PluginError> {
2301 if load_process(&self.pool, process_id).await?.is_none() {
2302 return Err(PluginError::Session(format!(
2303 "unknown process `{process_id}`"
2304 )));
2305 }
2306 let rows = sqlx::query(
2307 "SELECT event_json FROM lash_process_events
2308 WHERE process_id = $1 AND sequence > $2
2309 ORDER BY sequence ASC",
2310 )
2311 .bind(process_id)
2312 .bind(after_sequence as i64)
2313 .fetch_all(&self.pool)
2314 .await
2315 .map_err(plugin_sqlx_error)?;
2316 let mut events = Vec::new();
2317 for row in rows {
2318 let json: String = row.get(0);
2319 events.push(serde_json::from_str(&json).map_err(process_decode_error)?);
2320 }
2321 Ok(events)
2322 }
2323
2324 async fn wake_events_after(
2325 &self,
2326 process_id: &str,
2327 after_sequence: u64,
2328 ) -> Result<Vec<ProcessEvent>, PluginError> {
2329 let rows = sqlx::query("SELECT sequence FROM lash_process_wake_acks WHERE process_id = $1")
2330 .bind(process_id)
2331 .fetch_all(&self.pool)
2332 .await
2333 .map_err(plugin_sqlx_error)?;
2334 let acked = rows
2335 .into_iter()
2336 .map(|row| row.get::<i64, _>(0) as u64)
2337 .collect::<HashSet<_>>();
2338 Ok(self
2339 .events_after(process_id, after_sequence)
2340 .await?
2341 .into_iter()
2342 .filter(|event| event.semantics.wake.is_some() && !acked.contains(&event.sequence))
2343 .collect())
2344 }
2345
2346 async fn wait_event_after(
2347 &self,
2348 process_id: &str,
2349 event_type: &str,
2350 after_sequence: u64,
2351 ) -> Result<ProcessEvent, PluginError> {
2352 loop {
2353 if let Some(event) = self
2354 .events_after(process_id, after_sequence)
2355 .await?
2356 .into_iter()
2357 .find(|event| event.event_type == event_type)
2358 {
2359 return Ok(event);
2360 }
2361 tokio::select! {
2362 _ = self.notify.notified() => {}
2363 _ = tokio::time::sleep(Duration::from_millis(50)) => {}
2364 }
2365 }
2366 }
2367
2368 async fn await_process(&self, process_id: &str) -> Result<ProcessAwaitOutput, PluginError> {
2369 loop {
2370 let record = load_process(&self.pool, process_id)
2371 .await?
2372 .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
2373 if let Some(await_output) = record.status.await_output() {
2374 return Ok(await_output.clone());
2375 }
2376 tokio::select! {
2377 _ = self.notify.notified() => {}
2378 _ = tokio::time::sleep(Duration::from_millis(50)) => {}
2379 }
2380 }
2381 }
2382
2383 async fn complete_process(
2384 &self,
2385 process_id: &str,
2386 await_output: ProcessAwaitOutput,
2387 ) -> Result<ProcessRecord, PluginError> {
2388 let event_type = match await_output.terminal_state() {
2389 lash_core::ProcessTerminalState::Completed => "process.completed",
2390 lash_core::ProcessTerminalState::Failed => "process.failed",
2391 lash_core::ProcessTerminalState::Cancelled => "process.cancelled",
2392 };
2393 self.append_event(
2394 process_id,
2395 ProcessEventAppendRequest::new(
2396 event_type,
2397 serde_json::json!({ "await_output": await_output }),
2398 )
2399 .with_replay_key(format!("process:{process_id}:terminal:{event_type}")),
2400 )
2401 .await?;
2402 load_process(&self.pool, process_id).await?.ok_or_else(|| {
2403 PluginError::Session(format!(
2404 "unknown process `{process_id}` after terminal event"
2405 ))
2406 })
2407 }
2408
2409 async fn get_process(&self, process_id: &str) -> Option<ProcessRecord> {
2410 load_process(&self.pool, process_id).await.ok().flatten()
2411 }
2412
2413 async fn ack_wake(&self, process_id: &str, sequence: u64) -> Result<(), PluginError> {
2414 if load_process(&self.pool, process_id).await?.is_none() {
2415 return Err(PluginError::Session(format!(
2416 "unknown process `{process_id}`"
2417 )));
2418 }
2419 sqlx::query(
2420 "INSERT INTO lash_process_wake_acks (process_id, sequence)
2421 VALUES ($1, $2)
2422 ON CONFLICT DO NOTHING",
2423 )
2424 .bind(process_id)
2425 .bind(sequence as i64)
2426 .execute(&self.pool)
2427 .await
2428 .map_err(plugin_sqlx_error)?;
2429 Ok(())
2430 }
2431
2432 async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, PluginError> {
2433 let rows = sqlx::query(
2434 "SELECT record_json FROM lash_processes
2435 WHERE status = 'running'
2436 ORDER BY process_id ASC",
2437 )
2438 .fetch_all(&self.pool)
2439 .await
2440 .map_err(plugin_sqlx_error)?;
2441 let mut records = Vec::new();
2442 for row in rows {
2443 let json: String = row.get(0);
2444 records.push(serde_json::from_str(&json).map_err(process_decode_error)?);
2445 }
2446 Ok(records)
2447 }
2448
2449 async fn claim_process_lease(
2450 &self,
2451 process_id: &str,
2452 owner_id: &str,
2453 lease_ttl_ms: u64,
2454 ) -> Result<ProcessLease, PluginError> {
2455 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2456 if load_process_tx(&mut tx, process_id).await?.is_none() {
2457 return Err(PluginError::Session(format!(
2458 "unknown process `{process_id}`"
2459 )));
2460 }
2461 let now = current_epoch_ms();
2462 let current = load_process_lease_tx(&mut tx, process_id).await?;
2463 if let Some(current) = current.as_ref()
2464 && current.expires_at_epoch_ms > now
2465 && current.owner_id != owner_id
2466 {
2467 return Err(process_lease_conflict(process_id, current));
2468 }
2469 let existing_fence: Option<i64> = sqlx::query_scalar(
2470 "SELECT lease_fencing_token FROM lash_process_leases WHERE process_id = $1 FOR UPDATE",
2471 )
2472 .bind(process_id)
2473 .fetch_optional(&mut *tx)
2474 .await
2475 .map_err(plugin_sqlx_error)?;
2476 let fencing_token = existing_fence.unwrap_or(0) as u64 + 1;
2477 let lease = ProcessLease {
2478 schema_version: PROCESS_LEASE_SCHEMA_VERSION,
2479 process_id: process_id.to_string(),
2480 owner_id: owner_id.to_string(),
2481 lease_token: format!(
2482 "{:x}",
2483 Sha256::digest(format!("{process_id}:{owner_id}:{now}:{fencing_token}").as_bytes())
2484 ),
2485 fencing_token,
2486 claimed_at_epoch_ms: now,
2487 expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
2488 };
2489 sqlx::query(
2490 "INSERT INTO lash_process_leases (
2491 process_id, lease_owner_id, lease_token, lease_fencing_token,
2492 lease_claimed_at_ms, lease_expires_at_ms
2493 )
2494 VALUES ($1, $2, $3, $4, $5, $6)
2495 ON CONFLICT (process_id) DO UPDATE SET
2496 lease_owner_id = EXCLUDED.lease_owner_id,
2497 lease_token = EXCLUDED.lease_token,
2498 lease_fencing_token = EXCLUDED.lease_fencing_token,
2499 lease_claimed_at_ms = EXCLUDED.lease_claimed_at_ms,
2500 lease_expires_at_ms = EXCLUDED.lease_expires_at_ms",
2501 )
2502 .bind(&lease.process_id)
2503 .bind(&lease.owner_id)
2504 .bind(&lease.lease_token)
2505 .bind(lease.fencing_token as i64)
2506 .bind(lease.claimed_at_epoch_ms as i64)
2507 .bind(lease.expires_at_epoch_ms as i64)
2508 .execute(&mut *tx)
2509 .await
2510 .map_err(plugin_sqlx_error)?;
2511 tx.commit().await.map_err(plugin_sqlx_error)?;
2512 Ok(lease)
2513 }
2514
2515 async fn renew_process_lease(
2516 &self,
2517 lease: &ProcessLease,
2518 lease_ttl_ms: u64,
2519 ) -> Result<ProcessLease, PluginError> {
2520 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2521 let now = current_epoch_ms();
2522 let current = load_process_lease_tx(&mut tx, &lease.process_id).await?;
2523 if !guard_lease(current.as_ref(), &lease.lease_token, now) {
2524 return Err(process_lease_expired(&lease.process_id));
2525 }
2526 let renewed = ProcessLease {
2527 expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
2528 ..lease.clone()
2529 };
2530 sqlx::query(
2531 "UPDATE lash_process_leases
2532 SET lease_expires_at_ms = $2
2533 WHERE process_id = $1 AND lease_token = $3",
2534 )
2535 .bind(&renewed.process_id)
2536 .bind(renewed.expires_at_epoch_ms as i64)
2537 .bind(&renewed.lease_token)
2538 .execute(&mut *tx)
2539 .await
2540 .map_err(plugin_sqlx_error)?;
2541 tx.commit().await.map_err(plugin_sqlx_error)?;
2542 Ok(renewed)
2543 }
2544
2545 async fn complete_process_lease(
2546 &self,
2547 completion: &ProcessLeaseCompletion,
2548 ) -> Result<(), PluginError> {
2549 sqlx::query(
2550 "UPDATE lash_process_leases
2551 SET lease_owner_id = NULL,
2552 lease_token = NULL,
2553 lease_claimed_at_ms = 0,
2554 lease_expires_at_ms = 0
2555 WHERE process_id = $1 AND lease_token = $2",
2556 )
2557 .bind(&completion.process_id)
2558 .bind(&completion.lease_token)
2559 .execute(&self.pool)
2560 .await
2561 .map_err(plugin_sqlx_error)?;
2562 Ok(())
2563 }
2564}
2565
2566#[async_trait::async_trait]
2567impl HostEventStore for PostgresHostEventStore {
2568 fn durability_tier(&self) -> DurabilityTier {
2569 DurabilityTier::Durable
2570 }
2571
2572 async fn register_subscription(
2573 &self,
2574 draft: TriggerSubscriptionDraft,
2575 ) -> Result<TriggerSubscriptionRecord, PluginError> {
2576 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2577 let seq: i64 = sqlx::query_scalar("SELECT nextval('lash_host_event_subscription_seq')")
2578 .fetch_one(&mut *tx)
2579 .await
2580 .map_err(plugin_sqlx_error)?;
2581 let handle = format!("trigger:{seq}");
2582 let subscription_id = format!("subscription:{seq}");
2583 let now = current_epoch_ms();
2584 let record = TriggerSubscriptionRecord {
2585 subscription_id: subscription_id.clone(),
2586 session_id: draft.session_id,
2587 handle,
2588 name: draft.name,
2589 source_type: draft.source_type,
2590 source_key: draft.source_key,
2591 source: draft.source,
2592 event_ty: draft.event_ty,
2593 module_ref: draft.module_ref,
2594 required_surface_ref: draft.required_surface_ref,
2595 process_ref: draft.process_ref,
2596 process_name: draft.process_name,
2597 input_template: draft.input_template,
2598 enabled: true,
2599 created_at_ms: now,
2600 updated_at_ms: now,
2601 };
2602 sqlx::query(
2603 "INSERT INTO lash_host_event_trigger_subscriptions (
2604 subscription_id, session_id, handle, source_type, source_key,
2605 enabled, created_at_ms, updated_at_ms, record_json
2606 )
2607 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
2608 )
2609 .bind(&record.subscription_id)
2610 .bind(&record.session_id)
2611 .bind(&record.handle)
2612 .bind(&record.source_type)
2613 .bind(&record.source_key)
2614 .bind(record.enabled)
2615 .bind(record.created_at_ms as i64)
2616 .bind(record.updated_at_ms as i64)
2617 .bind(serde_json::to_string(&record).map_err(process_decode_error)?)
2618 .execute(&mut *tx)
2619 .await
2620 .map_err(plugin_sqlx_error)?;
2621 tx.commit().await.map_err(plugin_sqlx_error)?;
2622 Ok(record)
2623 }
2624
2625 async fn list_subscriptions(
2626 &self,
2627 filter: TriggerSubscriptionFilter,
2628 ) -> Result<Vec<TriggerSubscriptionRecord>, PluginError> {
2629 let rows = sqlx::query(
2630 "SELECT record_json FROM lash_host_event_trigger_subscriptions
2631 ORDER BY session_id ASC, handle ASC",
2632 )
2633 .fetch_all(&self.pool)
2634 .await
2635 .map_err(plugin_sqlx_error)?;
2636 let mut records = Vec::new();
2637 for row in rows {
2638 let json: String = row.get(0);
2639 let record: TriggerSubscriptionRecord =
2640 serde_json::from_str(&json).map_err(process_decode_error)?;
2641 if filter.matches(&record) {
2642 records.push(record);
2643 }
2644 }
2645 Ok(records)
2646 }
2647
2648 async fn cancel_subscription(
2649 &self,
2650 session_id: &str,
2651 handle: &str,
2652 ) -> Result<bool, PluginError> {
2653 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2654 let json: Option<String> = sqlx::query_scalar(
2655 "SELECT record_json FROM lash_host_event_trigger_subscriptions
2656 WHERE session_id = $1 AND handle = $2
2657 FOR UPDATE",
2658 )
2659 .bind(session_id)
2660 .bind(handle)
2661 .fetch_optional(&mut *tx)
2662 .await
2663 .map_err(plugin_sqlx_error)?;
2664 let Some(json) = json else {
2665 tx.commit().await.map_err(plugin_sqlx_error)?;
2666 return Ok(false);
2667 };
2668 let mut record: TriggerSubscriptionRecord =
2669 serde_json::from_str(&json).map_err(process_decode_error)?;
2670 let changed = record.enabled;
2671 record.enabled = false;
2672 record.updated_at_ms = current_epoch_ms();
2673 sqlx::query(
2674 "UPDATE lash_host_event_trigger_subscriptions
2675 SET enabled = $3, updated_at_ms = $4, record_json = $5
2676 WHERE session_id = $1 AND handle = $2",
2677 )
2678 .bind(session_id)
2679 .bind(handle)
2680 .bind(record.enabled)
2681 .bind(record.updated_at_ms as i64)
2682 .bind(serde_json::to_string(&record).map_err(process_decode_error)?)
2683 .execute(&mut *tx)
2684 .await
2685 .map_err(plugin_sqlx_error)?;
2686 tx.commit().await.map_err(plugin_sqlx_error)?;
2687 Ok(changed)
2688 }
2689
2690 async fn record_occurrence(
2691 &self,
2692 request: HostEventOccurrenceRequest,
2693 ) -> Result<HostEventOccurrenceRecord, PluginError> {
2694 lash_core::validate_host_event_occurrence_request(&request)?;
2695 let request_hash = lash_core::host_event_occurrence_request_hash(&request)?;
2696 let occurrence_id = lash_core::deterministic_occurrence_id(&request)?;
2697 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2698 let existing = sqlx::query(
2699 "SELECT request_hash, record_json
2700 FROM lash_host_event_occurrences
2701 WHERE idempotency_key = $1
2702 FOR UPDATE",
2703 )
2704 .bind(&request.idempotency_key)
2705 .fetch_optional(&mut *tx)
2706 .await
2707 .map_err(plugin_sqlx_error)?;
2708 if let Some(row) = existing {
2709 let existing_hash: String = row.get(0);
2710 let existing_json: String = row.get(1);
2711 if existing_hash != request_hash {
2712 return Err(PluginError::Session(format!(
2713 "host event occurrence idempotency conflict for `{}`",
2714 request.idempotency_key
2715 )));
2716 }
2717 let record = serde_json::from_str(&existing_json).map_err(process_decode_error)?;
2718 tx.commit().await.map_err(plugin_sqlx_error)?;
2719 return Ok(record);
2720 }
2721 let record = HostEventOccurrenceRecord {
2722 occurrence_id: occurrence_id.clone(),
2723 source_type: request.source_type,
2724 source_key: request.source_key,
2725 payload: request.payload,
2726 idempotency_key: request.idempotency_key.clone(),
2727 source: request.source,
2728 occurred_at_ms: current_epoch_ms(),
2729 };
2730 sqlx::query(
2731 "INSERT INTO lash_host_event_occurrences (
2732 occurrence_id, idempotency_key, request_hash, source_type, source_key,
2733 occurred_at_ms, record_json
2734 )
2735 VALUES ($1, $2, $3, $4, $5, $6, $7)",
2736 )
2737 .bind(&record.occurrence_id)
2738 .bind(&record.idempotency_key)
2739 .bind(&request_hash)
2740 .bind(&record.source_type)
2741 .bind(&record.source_key)
2742 .bind(record.occurred_at_ms as i64)
2743 .bind(serde_json::to_string(&record).map_err(process_decode_error)?)
2744 .execute(&mut *tx)
2745 .await
2746 .map_err(plugin_sqlx_error)?;
2747 tx.commit().await.map_err(plugin_sqlx_error)?;
2748 Ok(record)
2749 }
2750
2751 async fn reserve_matching_deliveries(
2752 &self,
2753 occurrence_id: &str,
2754 ) -> Result<Vec<TriggerDeliveryReservation>, PluginError> {
2755 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2756 let occurrence_json: Option<String> = sqlx::query_scalar(
2757 "SELECT record_json FROM lash_host_event_occurrences WHERE occurrence_id = $1",
2758 )
2759 .bind(occurrence_id)
2760 .fetch_optional(&mut *tx)
2761 .await
2762 .map_err(plugin_sqlx_error)?;
2763 let Some(occurrence_json) = occurrence_json else {
2764 return Err(PluginError::Session(format!(
2765 "unknown host event occurrence `{occurrence_id}`"
2766 )));
2767 };
2768 let occurrence: HostEventOccurrenceRecord =
2769 serde_json::from_str(&occurrence_json).map_err(process_decode_error)?;
2770 let rows = sqlx::query(
2771 "SELECT record_json FROM lash_host_event_trigger_subscriptions
2772 WHERE enabled = TRUE AND source_type = $1 AND source_key = $2
2773 ORDER BY session_id ASC, handle ASC",
2774 )
2775 .bind(&occurrence.source_type)
2776 .bind(&occurrence.source_key)
2777 .fetch_all(&mut *tx)
2778 .await
2779 .map_err(plugin_sqlx_error)?;
2780 let mut deliveries = Vec::new();
2781 for row in rows {
2782 let json: String = row.get(0);
2783 let subscription: TriggerSubscriptionRecord =
2784 serde_json::from_str(&json).map_err(process_decode_error)?;
2785 let process_id = lash_core::deterministic_delivery_process_id(
2786 &occurrence.occurrence_id,
2787 &subscription.subscription_id,
2788 )?;
2789 let inserted = sqlx::query(
2790 "INSERT INTO lash_host_event_deliveries (
2791 occurrence_id, subscription_id, process_id, created_at_ms
2792 )
2793 VALUES ($1, $2, $3, $4)
2794 ON CONFLICT DO NOTHING",
2795 )
2796 .bind(&occurrence.occurrence_id)
2797 .bind(&subscription.subscription_id)
2798 .bind(&process_id)
2799 .bind(current_epoch_ms() as i64)
2800 .execute(&mut *tx)
2801 .await
2802 .map_err(plugin_sqlx_error)?
2803 .rows_affected();
2804 if inserted == 0 {
2805 continue;
2806 }
2807 deliveries.push(TriggerDeliveryReservation {
2808 occurrence: occurrence.clone(),
2809 subscription,
2810 process_id,
2811 });
2812 }
2813 tx.commit().await.map_err(plugin_sqlx_error)?;
2814 Ok(deliveries)
2815 }
2816}
2817
2818#[async_trait::async_trait]
2819impl lashlang::LashlangArtifactStore for PostgresLashlangArtifactStore {
2820 fn durability_tier(&self) -> DurabilityTier {
2821 DurabilityTier::Durable
2822 }
2823
2824 async fn put_module_artifact(
2825 &self,
2826 artifact: &lashlang::ModuleArtifact,
2827 ) -> Result<(), lashlang::ArtifactStoreError> {
2828 let bytes = artifact
2829 .to_store_bytes()
2830 .map_err(lashlang::ArtifactStoreError::from)?;
2831 sqlx::query(
2832 "INSERT INTO lash_lashlang_artifacts (module_ref, artifact_bytes)
2833 VALUES ($1, $2)
2834 ON CONFLICT (module_ref) DO UPDATE SET artifact_bytes = EXCLUDED.artifact_bytes",
2835 )
2836 .bind(artifact.module_ref.as_str())
2837 .bind(bytes)
2838 .execute(&self.pool)
2839 .await
2840 .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?;
2841 Ok(())
2842 }
2843
2844 async fn get_module_artifact(
2845 &self,
2846 module_ref: &lashlang::ModuleRef,
2847 ) -> Result<Option<Arc<lashlang::ModuleArtifact>>, lashlang::ArtifactStoreError> {
2848 let bytes: Option<Vec<u8>> = sqlx::query_scalar(
2849 "SELECT artifact_bytes FROM lash_lashlang_artifacts WHERE module_ref = $1",
2850 )
2851 .bind(module_ref.as_str())
2852 .fetch_optional(&self.pool)
2853 .await
2854 .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?;
2855 bytes
2856 .map(|bytes| {
2857 lashlang::ModuleArtifact::from_store_bytes(&bytes)
2858 .map(Arc::new)
2859 .map_err(lashlang::ArtifactStoreError::from)
2860 })
2861 .transpose()
2862 }
2863}