1use std::collections::HashSet;
8use std::sync::{Arc, OnceLock};
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11use lash_core::runtime::{
12 ProcessHandleGrantEntry, QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim,
13 QueuedWorkClaimBoundary, QueuedWorkCompletion, QueuedWorkItem,
14};
15use lash_core::store::{
16 GraphCommitDelta, HydratedSessionCheckpoint, PersistedSessionRead, RuntimeCommit,
17 RuntimeCommitResult, SessionCheckpoint, SessionHeadMeta,
18};
19use lash_core::{
20 AttachmentId, AttachmentIntent, AttachmentManifest, AttachmentManifestEntry, BlobRef,
21 DeliveryPolicy, DurabilityTier, GcReport, MergeKey, ProcessAwaitOutput, ProcessEvent,
22 ProcessEventAppendRequest, ProcessEventAppendResult, ProcessExternalRef,
23 ProcessHandleDescriptor, ProcessHandleGrant, ProcessLease, ProcessLeaseCompletion,
24 ProcessRecord, ProcessRegistration, ProcessRegistry, ProcessScope, RuntimePersistence,
25 SessionMeta, SessionNodeRecord, SessionReadScope, SessionStoreCreateRequest,
26 SessionStoreFactory, SlotPolicy, StoreError, TokenLedgerEntry, VacuumReport,
27};
28use lash_core::{
29 HostEventOccurrenceRecord, HostEventOccurrenceRequest, HostEventStore, PluginError,
30 TriggerDeliveryReservation, TriggerSubscriptionDraft, TriggerSubscriptionFilter,
31 TriggerSubscriptionRecord,
32};
33use sha2::{Digest, Sha256};
34use sqlx::postgres::{PgPool, PgPoolOptions, PgRow};
35use sqlx::{Executor, Row};
36
37const SCHEMA_COMPONENT: &str = "lash-postgres-store";
38const SCHEMA_VERSION: i32 = 1;
39const PROCESS_LEASE_SCHEMA_VERSION: u32 = lash_core::PROCESS_LEASE_SCHEMA_VERSION;
40
41#[derive(Clone)]
42pub struct PostgresStorage {
43 pool: PgPool,
44}
45
46#[derive(Clone)]
47pub struct PostgresSessionStoreFactory {
48 pool: PgPool,
49}
50
51#[derive(Clone)]
52pub struct PostgresSessionStore {
53 pool: PgPool,
54 session_id: Option<String>,
56 bound_session: Arc<OnceLock<String>>,
62}
63
64#[derive(Clone)]
65pub struct PostgresProcessRegistry {
66 pool: PgPool,
67 notify: Arc<tokio::sync::Notify>,
68}
69
70#[derive(Clone)]
71pub struct PostgresHostEventStore {
72 pool: PgPool,
73}
74
75#[derive(Clone)]
76pub struct PostgresLashlangArtifactStore {
77 pool: PgPool,
78}
79
80#[derive(Clone, Debug)]
89pub struct PostgresStoreConfig {
90 pub max_connections: u32,
92 pub min_connections: u32,
94 pub acquire_timeout: Duration,
96 pub idle_timeout: Option<Duration>,
98 pub max_lifetime: Option<Duration>,
100 pub lock_timeout: Option<Duration>,
102 pub statement_timeout: Option<Duration>,
105}
106
107impl Default for PostgresStoreConfig {
108 fn default() -> Self {
109 Self {
110 max_connections: 16,
111 min_connections: 0,
112 acquire_timeout: Duration::from_secs(30),
113 idle_timeout: Some(Duration::from_secs(600)),
114 max_lifetime: Some(Duration::from_secs(1800)),
115 lock_timeout: Some(Duration::from_secs(10)),
116 statement_timeout: Some(Duration::from_secs(30)),
117 }
118 }
119}
120
121impl PostgresStorage {
122 pub async fn connect(database_url: &str) -> Result<Self, StoreError> {
124 Self::connect_with(database_url, PostgresStoreConfig::default()).await
125 }
126
127 pub async fn connect_with(
129 database_url: &str,
130 config: PostgresStoreConfig,
131 ) -> Result<Self, StoreError> {
132 let lock_ms = config.lock_timeout.map(|d| d.as_millis().max(1) as u64);
133 let statement_ms = config
134 .statement_timeout
135 .map(|d| d.as_millis().max(1) as u64);
136 let mut options = PgPoolOptions::new()
137 .max_connections(config.max_connections)
138 .min_connections(config.min_connections)
139 .acquire_timeout(config.acquire_timeout);
140 if let Some(timeout) = config.idle_timeout {
141 options = options.idle_timeout(timeout);
142 }
143 if let Some(timeout) = config.max_lifetime {
144 options = options.max_lifetime(timeout);
145 }
146 let pool = options
147 .after_connect(move |conn, _meta| {
148 Box::pin(async move {
149 if let Some(ms) = lock_ms {
150 conn.execute(format!("SET lock_timeout = {ms}").as_str())
151 .await?;
152 }
153 if let Some(ms) = statement_ms {
154 conn.execute(format!("SET statement_timeout = {ms}").as_str())
155 .await?;
156 }
157 Ok(())
158 })
159 })
160 .connect(database_url)
161 .await
162 .map_err(store_sqlx_error)?;
163 ensure_schema(&pool).await?;
164 Ok(Self { pool })
165 }
166
167 pub fn from_pool(pool: PgPool) -> Self {
168 Self { pool }
169 }
170
171 pub fn pool(&self) -> &PgPool {
172 &self.pool
173 }
174
175 pub fn session_store_factory(&self) -> PostgresSessionStoreFactory {
176 PostgresSessionStoreFactory {
177 pool: self.pool.clone(),
178 }
179 }
180
181 pub fn session_store(&self, session_id: impl Into<String>) -> PostgresSessionStore {
182 PostgresSessionStore {
183 pool: self.pool.clone(),
184 session_id: Some(session_id.into()),
185 bound_session: Arc::new(OnceLock::new()),
186 }
187 }
188
189 pub fn unbound_session_store(&self) -> PostgresSessionStore {
190 PostgresSessionStore {
191 pool: self.pool.clone(),
192 session_id: None,
193 bound_session: Arc::new(OnceLock::new()),
194 }
195 }
196
197 pub fn process_registry(&self) -> PostgresProcessRegistry {
198 PostgresProcessRegistry {
199 pool: self.pool.clone(),
200 notify: Arc::new(tokio::sync::Notify::new()),
201 }
202 }
203
204 pub fn host_event_store(&self) -> PostgresHostEventStore {
205 PostgresHostEventStore {
206 pool: self.pool.clone(),
207 }
208 }
209
210 pub fn lashlang_artifact_store(&self) -> PostgresLashlangArtifactStore {
211 PostgresLashlangArtifactStore {
212 pool: self.pool.clone(),
213 }
214 }
215}
216
217impl PostgresSessionStoreFactory {
218 pub fn new(storage: &PostgresStorage) -> Self {
219 storage.session_store_factory()
220 }
221}
222
223impl PostgresSessionStore {
224 pub fn unbound(storage: &PostgresStorage) -> Self {
225 storage.unbound_session_store()
226 }
227
228 async fn selected_session_id(&self) -> Result<Option<String>, StoreError> {
229 if let Some(session_id) = &self.session_id {
230 return Ok(Some(session_id.clone()));
231 }
232 sqlx::query_scalar("SELECT session_id FROM lash_sessions ORDER BY session_id ASC LIMIT 1")
233 .fetch_optional(&self.pool)
234 .await
235 .map_err(store_sqlx_error)
236 }
237}
238
239async fn ensure_schema(pool: &PgPool) -> Result<(), StoreError> {
240 let mut tx = pool.begin().await.map_err(store_sqlx_error)?;
241 tx.execute("SELECT pg_advisory_xact_lock(715421, 907001)")
242 .await
243 .map_err(store_sqlx_error)?;
244 tx.execute(
245 r#"
246 CREATE TABLE IF NOT EXISTS lash_schema_versions (
247 component TEXT PRIMARY KEY,
248 version INTEGER NOT NULL
249 );
250
251 CREATE TABLE IF NOT EXISTS lash_blobs (
252 hash TEXT PRIMARY KEY,
253 content BYTEA NOT NULL
254 );
255
256 CREATE TABLE IF NOT EXISTS lash_sessions (
257 session_id TEXT PRIMARY KEY,
258 head_revision BIGINT NOT NULL DEFAULT 0,
259 head_json TEXT NOT NULL,
260 checkpoint_ref TEXT
261 );
262
263 CREATE TABLE IF NOT EXISTS lash_graph_nodes (
264 session_id TEXT NOT NULL,
265 seq BIGSERIAL,
266 node_id TEXT NOT NULL,
267 node_json TEXT NOT NULL,
268 tombstoned BOOLEAN NOT NULL DEFAULT FALSE,
269 PRIMARY KEY (session_id, node_id)
270 );
271 CREATE INDEX IF NOT EXISTS idx_lash_graph_nodes_seq
272 ON lash_graph_nodes(session_id, seq);
273
274 CREATE TABLE IF NOT EXISTS lash_usage_deltas (
275 seq BIGSERIAL PRIMARY KEY,
276 session_id TEXT NOT NULL,
277 entry_json TEXT NOT NULL
278 );
279
280 CREATE TABLE IF NOT EXISTS lash_session_meta (
281 session_id TEXT PRIMARY KEY,
282 meta_json TEXT NOT NULL
283 );
284
285 CREATE TABLE IF NOT EXISTS lash_runtime_turn_commits (
286 session_id TEXT NOT NULL,
287 turn_id TEXT NOT NULL,
288 turn_commit_hash TEXT NOT NULL,
289 result_json TEXT NOT NULL,
290 committed_at_ms BIGINT NOT NULL,
291 PRIMARY KEY (session_id, turn_id)
292 );
293
294 CREATE TABLE IF NOT EXISTS lash_queued_work_batches (
295 enqueue_seq BIGSERIAL PRIMARY KEY,
296 batch_id TEXT NOT NULL UNIQUE,
297 session_id TEXT NOT NULL,
298 source_key TEXT,
299 delivery_policy TEXT NOT NULL,
300 slot_policy TEXT NOT NULL,
301 merge_key_json TEXT NOT NULL,
302 available_at_ms BIGINT NOT NULL,
303 enqueued_at_ms BIGINT NOT NULL,
304 claim_id TEXT,
305 claim_owner_id TEXT,
306 claim_token TEXT,
307 claim_fencing_token BIGINT NOT NULL DEFAULT 0,
308 claim_claimed_at_ms BIGINT NOT NULL DEFAULT 0,
309 claim_expires_at_ms BIGINT NOT NULL DEFAULT 0,
310 UNIQUE (session_id, source_key)
311 );
312 CREATE INDEX IF NOT EXISTS idx_lash_queued_work_ready
313 ON lash_queued_work_batches(session_id, available_at_ms, enqueue_seq);
314
315 CREATE TABLE IF NOT EXISTS lash_queued_work_items (
316 batch_id TEXT NOT NULL REFERENCES lash_queued_work_batches(batch_id) ON DELETE CASCADE,
317 item_index INTEGER NOT NULL,
318 item_id TEXT NOT NULL,
319 payload_json TEXT NOT NULL,
320 PRIMARY KEY (batch_id, item_index)
321 );
322
323 CREATE TABLE IF NOT EXISTS lash_attachment_manifest (
324 attachment_id TEXT PRIMARY KEY,
325 session_id TEXT NOT NULL,
326 canonical_uri TEXT NOT NULL,
327 intent_at_ms BIGINT NOT NULL,
328 committed_at_ms BIGINT
329 );
330 CREATE INDEX IF NOT EXISTS idx_lash_attachment_manifest_uncommitted
331 ON lash_attachment_manifest(committed_at_ms)
332 WHERE committed_at_ms IS NULL;
333
334 CREATE TABLE IF NOT EXISTS lash_processes (
335 process_id TEXT PRIMARY KEY,
336 registration_hash TEXT NOT NULL,
337 owner_scope_id TEXT NOT NULL,
338 host_profile_id TEXT NOT NULL,
339 created_at_ms BIGINT NOT NULL,
340 updated_at_ms BIGINT NOT NULL,
341 status TEXT NOT NULL,
342 record_json TEXT NOT NULL
343 );
344 CREATE INDEX IF NOT EXISTS idx_lash_processes_status
345 ON lash_processes(status);
346
347 CREATE TABLE IF NOT EXISTS lash_process_events (
348 process_id TEXT NOT NULL REFERENCES lash_processes(process_id) ON DELETE CASCADE,
349 sequence BIGINT NOT NULL,
350 event_type TEXT NOT NULL,
351 payload_hash TEXT NOT NULL,
352 idempotency_key TEXT,
353 occurred_at_ms BIGINT NOT NULL,
354 event_json TEXT NOT NULL,
355 PRIMARY KEY (process_id, sequence)
356 );
357 CREATE UNIQUE INDEX IF NOT EXISTS idx_lash_process_events_key
358 ON lash_process_events(process_id, idempotency_key)
359 WHERE idempotency_key IS NOT NULL;
360
361 CREATE TABLE IF NOT EXISTS lash_process_wake_acks (
362 process_id TEXT NOT NULL REFERENCES lash_processes(process_id) ON DELETE CASCADE,
363 sequence BIGINT NOT NULL,
364 PRIMARY KEY (process_id, sequence)
365 );
366
367 CREATE TABLE IF NOT EXISTS lash_process_handle_grants (
368 session_id TEXT NOT NULL,
369 scope_id TEXT NOT NULL,
370 process_id TEXT NOT NULL REFERENCES lash_processes(process_id) ON DELETE CASCADE,
371 descriptor_json TEXT NOT NULL,
372 PRIMARY KEY (scope_id, process_id)
373 );
374 CREATE INDEX IF NOT EXISTS idx_lash_process_handle_grants_session
375 ON lash_process_handle_grants(session_id);
376 CREATE INDEX IF NOT EXISTS idx_lash_process_handle_grants_process
377 ON lash_process_handle_grants(process_id);
378
379 CREATE TABLE IF NOT EXISTS lash_process_leases (
380 process_id TEXT PRIMARY KEY REFERENCES lash_processes(process_id) ON DELETE CASCADE,
381 lease_owner_id TEXT,
382 lease_token TEXT,
383 lease_fencing_token BIGINT NOT NULL DEFAULT 0,
384 lease_claimed_at_ms BIGINT NOT NULL DEFAULT 0,
385 lease_expires_at_ms BIGINT NOT NULL DEFAULT 0
386 );
387
388 CREATE SEQUENCE IF NOT EXISTS lash_host_event_subscription_seq;
389 CREATE TABLE IF NOT EXISTS lash_host_event_trigger_subscriptions (
390 subscription_id TEXT PRIMARY KEY,
391 session_id TEXT NOT NULL,
392 handle TEXT NOT NULL,
393 source_type TEXT NOT NULL,
394 source_key TEXT NOT NULL,
395 enabled BOOLEAN NOT NULL,
396 created_at_ms BIGINT NOT NULL,
397 updated_at_ms BIGINT NOT NULL,
398 record_json TEXT NOT NULL,
399 UNIQUE(session_id, handle)
400 );
401 CREATE INDEX IF NOT EXISTS idx_lash_host_event_subscriptions_source
402 ON lash_host_event_trigger_subscriptions(source_type, source_key, enabled);
403
404 CREATE TABLE IF NOT EXISTS lash_host_event_occurrences (
405 occurrence_id TEXT PRIMARY KEY,
406 idempotency_key TEXT NOT NULL UNIQUE,
407 request_hash TEXT NOT NULL,
408 source_type TEXT NOT NULL,
409 source_key TEXT NOT NULL,
410 occurred_at_ms BIGINT NOT NULL,
411 record_json TEXT NOT NULL
412 );
413
414 CREATE TABLE IF NOT EXISTS lash_host_event_deliveries (
415 occurrence_id TEXT NOT NULL REFERENCES lash_host_event_occurrences(occurrence_id) ON DELETE CASCADE,
416 subscription_id TEXT NOT NULL REFERENCES lash_host_event_trigger_subscriptions(subscription_id) ON DELETE CASCADE,
417 process_id TEXT NOT NULL,
418 created_at_ms BIGINT NOT NULL,
419 PRIMARY KEY (occurrence_id, subscription_id)
420 );
421
422 CREATE TABLE IF NOT EXISTS lash_lashlang_artifacts (
423 module_ref TEXT PRIMARY KEY,
424 artifact_bytes BYTEA NOT NULL
425 );
426 "#,
427 )
428 .await
429 .map_err(store_sqlx_error)?;
430
431 let existing: Option<i32> =
432 sqlx::query_scalar("SELECT version FROM lash_schema_versions WHERE component = $1")
433 .bind(SCHEMA_COMPONENT)
434 .fetch_optional(&mut *tx)
435 .await
436 .map_err(store_sqlx_error)?;
437 match existing {
438 Some(version) if version == SCHEMA_VERSION => {}
439 Some(version) => {
440 return Err(StoreError::Backend(format!(
441 "Postgres schema component `{SCHEMA_COMPONENT}` has version {version}, expected {SCHEMA_VERSION}"
442 )));
443 }
444 None => {
445 sqlx::query("INSERT INTO lash_schema_versions (component, version) VALUES ($1, $2)")
446 .bind(SCHEMA_COMPONENT)
447 .bind(SCHEMA_VERSION)
448 .execute(&mut *tx)
449 .await
450 .map_err(store_sqlx_error)?;
451 }
452 }
453 tx.commit().await.map_err(store_sqlx_error)
454}
455
456fn current_epoch_ms() -> u64 {
457 SystemTime::now()
458 .duration_since(UNIX_EPOCH)
459 .unwrap_or_default()
460 .as_millis() as u64
461}
462
463fn current_timestamp_string() -> String {
464 let now = SystemTime::now()
465 .duration_since(UNIX_EPOCH)
466 .unwrap_or_default();
467 format!("unix:{}", now.as_secs())
468}
469
470fn store_sqlx_error(err: sqlx::Error) -> StoreError {
471 StoreError::Backend(err.to_string())
472}
473
474fn is_contention_error(err: &sqlx::Error) -> bool {
479 matches!(
480 err.as_database_error().and_then(|db| db.code()).as_deref(),
481 Some("40001" | "40P01" | "55P03")
482 )
483}
484
485fn plugin_sqlx_error(err: sqlx::Error) -> PluginError {
486 PluginError::Session(err.to_string())
487}
488
489fn process_decode_error(err: serde_json::Error) -> PluginError {
490 PluginError::Session(format!("failed to decode process registry row: {err}"))
491}
492
493fn store_decode_json<T: serde::de::DeserializeOwned>(
494 json: &str,
495 what: &str,
496) -> Result<T, StoreError> {
497 serde_json::from_str(json)
498 .map_err(|err| StoreError::Backend(format!("failed to decode {what}: {err}")))
499}
500
501fn encode_json<T: serde::Serialize>(value: &T) -> String {
502 serde_json::to_string(value).expect("persisted state should serialize")
503}
504
505fn encode_msgpack<T: serde::Serialize>(value: &T) -> Vec<u8> {
506 let mut buf = Vec::with_capacity(1024);
507 rmp_serde::encode::write_named(&mut buf, value).expect("value should serialize");
508 buf
509}
510
511fn decode_msgpack<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Option<T> {
512 rmp_serde::from_slice(bytes).ok()
513}
514
515fn block_on_detached<T: Send + 'static>(
516 future: impl std::future::Future<Output = T> + Send + 'static,
517) -> T {
518 std::thread::spawn(move || {
519 tokio::runtime::Builder::new_current_thread()
520 .enable_all()
521 .build()
522 .expect("postgres manifest runtime")
523 .block_on(future)
524 })
525 .join()
526 .expect("postgres manifest thread")
527}
528
529fn merge_token_ledger_entries(entries: Vec<TokenLedgerEntry>) -> Vec<TokenLedgerEntry> {
530 let mut merged = Vec::<TokenLedgerEntry>::new();
531 for entry in entries {
532 if entry.usage.total() == 0 {
533 continue;
534 }
535 if let Some(existing) = merged
536 .iter_mut()
537 .find(|existing| existing.source == entry.source && existing.model == entry.model)
538 {
539 existing.usage.input_tokens += entry.usage.input_tokens;
540 existing.usage.output_tokens += entry.usage.output_tokens;
541 existing.usage.cached_input_tokens += entry.usage.cached_input_tokens;
542 existing.usage.reasoning_tokens += entry.usage.reasoning_tokens;
543 } else {
544 merged.push(entry);
545 }
546 }
547 merged
548}
549
550#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
551struct SessionCheckpointEnvelope {
552 manifest: SessionCheckpoint,
553 tool_state: Option<lash_core::ToolState>,
554 plugin_snapshot: Option<lash_core::PluginSessionSnapshot>,
555 execution_state: Option<Vec<u8>>,
556}
557
558async fn put_blob_tx(
559 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
560 content: &[u8],
561) -> Result<BlobRef, StoreError> {
562 let hash = format!("{:x}", Sha256::digest(content));
563 sqlx::query(
564 "INSERT INTO lash_blobs (hash, content)
565 VALUES ($1, $2)
566 ON CONFLICT (hash) DO NOTHING",
567 )
568 .bind(&hash)
569 .bind(content)
570 .execute(&mut **tx)
571 .await
572 .map_err(store_sqlx_error)?;
573 Ok(BlobRef(hash))
574}
575
576async fn get_blob(pool: &PgPool, blob_ref: &BlobRef) -> Option<Vec<u8>> {
577 sqlx::query_scalar::<_, Vec<u8>>("SELECT content FROM lash_blobs WHERE hash = $1")
578 .bind(blob_ref.as_str())
579 .fetch_optional(pool)
580 .await
581 .ok()
582 .flatten()
583}
584
585async fn put_checkpoint_tx(
586 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
587 checkpoint: &HydratedSessionCheckpoint,
588) -> Result<(BlobRef, SessionCheckpoint), StoreError> {
589 let manifest = SessionCheckpoint {
590 turn_state: checkpoint.turn_state.clone(),
591 tool_state_ref: checkpoint.tool_state_ref.clone(),
592 plugin_snapshot_ref: checkpoint.plugin_snapshot_ref.clone(),
593 plugin_snapshot_revision: checkpoint.plugin_snapshot_revision,
594 execution_state_ref: checkpoint.execution_state_ref.clone(),
595 };
596 let envelope = SessionCheckpointEnvelope {
597 manifest: manifest.clone(),
598 tool_state: checkpoint.tool_state.clone(),
599 plugin_snapshot: checkpoint.plugin_snapshot.clone(),
600 execution_state: checkpoint.execution_state.clone(),
601 };
602 let bytes = encode_msgpack(&envelope);
603 let checkpoint_ref = put_blob_tx(tx, &bytes).await?;
604 Ok((checkpoint_ref, manifest))
605}
606
607async fn get_checkpoint(pool: &PgPool, blob_ref: &BlobRef) -> Option<HydratedSessionCheckpoint> {
608 let bytes = get_blob(pool, blob_ref).await?;
609 let envelope: SessionCheckpointEnvelope = decode_msgpack(&bytes)?;
610 Some(HydratedSessionCheckpoint {
611 turn_state: envelope.manifest.turn_state,
612 tool_state_ref: envelope.manifest.tool_state_ref,
613 tool_state: envelope.tool_state,
614 plugin_snapshot_ref: envelope.manifest.plugin_snapshot_ref,
615 plugin_snapshot: envelope.plugin_snapshot,
616 plugin_snapshot_revision: envelope.manifest.plugin_snapshot_revision,
617 execution_state_ref: envelope.manifest.execution_state_ref,
618 execution_state: envelope.execution_state,
619 })
620}
621
622async fn load_session_head_meta_tx(
623 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
624 session_id: &str,
625 for_update: bool,
626) -> Result<Option<SessionHeadMeta>, StoreError> {
627 let sql = if for_update {
628 "SELECT head_json, head_revision FROM lash_sessions WHERE session_id = $1 FOR UPDATE"
629 } else {
630 "SELECT head_json, head_revision FROM lash_sessions WHERE session_id = $1"
631 };
632 let row = sqlx::query(sql)
633 .bind(session_id)
634 .fetch_optional(&mut **tx)
635 .await
636 .map_err(store_sqlx_error)?;
637 let Some(row) = row else {
638 return Ok(None);
639 };
640 let head_json: String = row.get(0);
641 let head_revision: i64 = row.get(1);
642 let mut meta: SessionHeadMeta = store_decode_json(&head_json, "session head")?;
643 meta.head_revision = head_revision as u64;
644 Ok(Some(meta))
645}
646
647async fn load_usage_deltas(pool: &PgPool, session_id: &str) -> Vec<TokenLedgerEntry> {
648 let rows = sqlx::query(
649 "SELECT entry_json FROM lash_usage_deltas WHERE session_id = $1 ORDER BY seq ASC",
650 )
651 .bind(session_id)
652 .fetch_all(pool)
653 .await
654 .unwrap_or_default();
655 rows.into_iter()
656 .filter_map(|row| {
657 let json: String = row.get(0);
658 serde_json::from_str(&json).ok()
659 })
660 .collect()
661}
662
663async fn load_graph(
664 pool: &PgPool,
665 session_id: &str,
666 leaf_node_id: Option<String>,
667 active_path: bool,
668) -> Result<lash_core::SessionGraph, StoreError> {
669 let rows = sqlx::query(
670 "SELECT node_json FROM lash_graph_nodes
671 WHERE session_id = $1 AND tombstoned = FALSE
672 ORDER BY seq ASC",
673 )
674 .bind(session_id)
675 .fetch_all(pool)
676 .await
677 .map_err(store_sqlx_error)?;
678 let mut nodes = Vec::<SessionNodeRecord>::new();
679 for row in rows {
680 let json: String = row.get(0);
681 nodes.push(store_decode_json(&json, "session graph node")?);
682 }
683 if active_path {
684 if let Some(leaf) = leaf_node_id.clone() {
685 let wanted = active_path_node_ids(&nodes, &leaf);
686 nodes.retain(|node| wanted.contains(&node.node_id));
687 }
688 }
689 Ok(lash_core::SessionGraph::from_nodes(nodes, leaf_node_id))
690}
691
692fn active_path_node_ids(nodes: &[SessionNodeRecord], leaf_node_id: &str) -> HashSet<String> {
693 let mut parent_by_id = std::collections::BTreeMap::new();
694 for node in nodes {
695 parent_by_id.insert(node.node_id.clone(), node.parent_node_id.clone());
696 }
697 let mut wanted = HashSet::new();
698 let mut cursor = Some(leaf_node_id.to_string());
699 while let Some(node_id) = cursor {
700 if !wanted.insert(node_id.clone()) {
701 break;
702 }
703 cursor = parent_by_id.get(&node_id).cloned().flatten();
704 }
705 wanted
706}
707
708async fn commit_attachment_refs_tx(
709 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
710 session_id: &str,
711 attachment_ids: &[AttachmentId],
712) -> Result<(), StoreError> {
713 if attachment_ids.is_empty() {
714 return Ok(());
715 }
716 let now = current_epoch_ms() as i64;
717 for id in attachment_ids {
718 sqlx::query(
719 "UPDATE lash_attachment_manifest
720 SET committed_at_ms = COALESCE(committed_at_ms, $1)
721 WHERE attachment_id = $2 AND session_id = $3",
722 )
723 .bind(now)
724 .bind(id.as_str())
725 .bind(session_id)
726 .execute(&mut **tx)
727 .await
728 .map_err(store_sqlx_error)?;
729 }
730 Ok(())
731}
732
733impl AttachmentManifest for PostgresSessionStore {
734 fn record_intent(&self, intent: AttachmentIntent) -> Result<(), StoreError> {
735 let pool = self.pool.clone();
736 block_on_detached(async move {
737 sqlx::query(
738 "INSERT INTO lash_attachment_manifest (
739 attachment_id, session_id, canonical_uri, intent_at_ms, committed_at_ms
740 )
741 VALUES ($1, $2, $3, $4, NULL)
742 ON CONFLICT (attachment_id) DO UPDATE SET
743 session_id = EXCLUDED.session_id,
744 canonical_uri = EXCLUDED.canonical_uri,
745 intent_at_ms = EXCLUDED.intent_at_ms",
746 )
747 .bind(intent.attachment_id.as_str())
748 .bind(intent.session_id)
749 .bind(intent.canonical_uri)
750 .bind(intent.intent_at_epoch_ms as i64)
751 .execute(&pool)
752 .await
753 .map(|_| ())
754 .map_err(store_sqlx_error)
755 })
756 }
757
758 fn commit_refs(
759 &self,
760 session_id: &str,
761 attachment_ids: &[AttachmentId],
762 ) -> Result<(), StoreError> {
763 let pool = self.pool.clone();
764 let session_id = session_id.to_string();
765 let attachment_ids = attachment_ids.to_vec();
766 block_on_detached(async move {
767 let mut tx = pool.begin().await.map_err(store_sqlx_error)?;
768 commit_attachment_refs_tx(&mut tx, &session_id, &attachment_ids).await?;
769 tx.commit().await.map_err(store_sqlx_error)
770 })
771 }
772
773 fn list_uncommitted(
774 &self,
775 older_than_epoch_ms: u64,
776 ) -> Result<Vec<AttachmentManifestEntry>, StoreError> {
777 let pool = self.pool.clone();
778 block_on_detached(async move {
779 let rows = sqlx::query(
780 "SELECT attachment_id, session_id, canonical_uri, intent_at_ms, committed_at_ms
781 FROM lash_attachment_manifest
782 WHERE committed_at_ms IS NULL AND intent_at_ms <= $1
783 ORDER BY attachment_id ASC",
784 )
785 .bind(older_than_epoch_ms as i64)
786 .fetch_all(&pool)
787 .await
788 .map_err(store_sqlx_error)?;
789 Ok(rows
790 .into_iter()
791 .map(|row| AttachmentManifestEntry {
792 attachment_id: AttachmentId::new(row.get::<String, _>(0)),
793 session_id: row.get(1),
794 canonical_uri: row.get(2),
795 intent_at_epoch_ms: row.get::<i64, _>(3) as u64,
796 committed_at_epoch_ms: row.get::<Option<i64>, _>(4).map(|value| value as u64),
797 })
798 .collect())
799 })
800 }
801
802 fn forget(&self, attachment_id: &AttachmentId) -> Result<(), StoreError> {
803 let pool = self.pool.clone();
804 let attachment_id = attachment_id.to_string();
805 block_on_detached(async move {
806 sqlx::query("DELETE FROM lash_attachment_manifest WHERE attachment_id = $1")
807 .bind(attachment_id)
808 .execute(&pool)
809 .await
810 .map(|_| ())
811 .map_err(store_sqlx_error)
812 })
813 }
814}
815
816#[async_trait::async_trait]
817impl SessionStoreFactory for PostgresSessionStoreFactory {
818 fn durability_tier(&self) -> DurabilityTier {
819 DurabilityTier::Durable
820 }
821
822 async fn create_store(
823 &self,
824 request: &SessionStoreCreateRequest,
825 ) -> Result<Arc<dyn RuntimePersistence>, String> {
826 let store = PostgresSessionStore {
827 pool: self.pool.clone(),
828 session_id: Some(request.session_id.clone()),
829 bound_session: Arc::new(OnceLock::new()),
830 };
831 if store
832 .load_session_meta()
833 .await
834 .map_err(|err| err.to_string())?
835 .is_none()
836 {
837 store
838 .save_session_meta(SessionMeta {
839 session_id: request.session_id.clone(),
840 session_name: request.session_id.clone(),
841 created_at: current_timestamp_string(),
842 model: request.policy.model.id.clone(),
843 cwd: std::env::current_dir()
844 .ok()
845 .and_then(|path| path.to_str().map(str::to_string)),
846 relation: request.relation.clone(),
847 })
848 .await
849 .map_err(|err| err.to_string())?;
850 }
851 Ok(Arc::new(store))
852 }
853
854 async fn delete_session(&self, session_id: &str) -> Result<(), String> {
855 let mut tx = self.pool.begin().await.map_err(|err| err.to_string())?;
856 for sql in [
857 "DELETE FROM lash_queued_work_items WHERE batch_id IN (SELECT batch_id FROM lash_queued_work_batches WHERE session_id = $1)",
858 "DELETE FROM lash_queued_work_batches WHERE session_id = $1",
859 "DELETE FROM lash_usage_deltas WHERE session_id = $1",
860 "DELETE FROM lash_graph_nodes WHERE session_id = $1",
861 "DELETE FROM lash_runtime_turn_commits WHERE session_id = $1",
862 "DELETE FROM lash_session_meta WHERE session_id = $1",
863 "DELETE FROM lash_sessions WHERE session_id = $1",
864 "DELETE FROM lash_attachment_manifest WHERE session_id = $1",
865 ] {
866 sqlx::query(sql)
867 .bind(session_id)
868 .execute(&mut *tx)
869 .await
870 .map_err(|err| err.to_string())?;
871 }
872 tx.commit().await.map_err(|err| err.to_string())
873 }
874}
875
876#[derive(Clone, Debug)]
877struct QueuedBatchRow {
878 enqueue_seq: u64,
879 batch_id: String,
880 session_id: String,
881 source_key: Option<String>,
882 delivery_policy: DeliveryPolicy,
883 slot_policy: SlotPolicy,
884 merge_key: MergeKey,
885 available_at_ms: u64,
886 enqueued_at_ms: u64,
887 claim_fencing_token: u64,
888}
889
890fn queued_batch_row(row: PgRow) -> Result<QueuedBatchRow, StoreError> {
891 let delivery_policy =
892 DeliveryPolicy::from_wire_str(row.get::<String, _>("delivery_policy").as_str())
893 .ok_or_else(|| {
894 StoreError::Backend("invalid queued work delivery policy".to_string())
895 })?;
896 let slot_policy = SlotPolicy::from_wire_str(row.get::<String, _>("slot_policy").as_str())
897 .ok_or_else(|| StoreError::Backend("invalid queued work slot policy".to_string()))?;
898 let merge_json: String = row.get("merge_key_json");
899 Ok(QueuedBatchRow {
900 enqueue_seq: row.get::<i64, _>("enqueue_seq") as u64,
901 batch_id: row.get("batch_id"),
902 session_id: row.get("session_id"),
903 source_key: row.get("source_key"),
904 delivery_policy,
905 slot_policy,
906 merge_key: store_decode_json(&merge_json, "queued work merge key")?,
907 available_at_ms: row.get::<i64, _>("available_at_ms") as u64,
908 enqueued_at_ms: row.get::<i64, _>("enqueued_at_ms") as u64,
909 claim_fencing_token: row.get::<i64, _>("claim_fencing_token") as u64,
910 })
911}
912
913async fn load_queued_batch(
914 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
915 batch_id: &str,
916) -> Result<Option<QueuedWorkBatch>, StoreError> {
917 let row = sqlx::query(
918 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
919 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
920 claim_fencing_token
921 FROM lash_queued_work_batches
922 WHERE batch_id = $1",
923 )
924 .bind(batch_id)
925 .fetch_optional(&mut **tx)
926 .await
927 .map_err(store_sqlx_error)?;
928 let Some(row) = row else {
929 return Ok(None);
930 };
931 let row = queued_batch_row(row)?;
932 queued_work_batch_from_row(tx, row).await.map(Some)
933}
934
935async fn queued_work_batch_from_row(
936 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
937 row: QueuedBatchRow,
938) -> Result<QueuedWorkBatch, StoreError> {
939 let item_rows = sqlx::query(
940 "SELECT item_id, payload_json
941 FROM lash_queued_work_items
942 WHERE batch_id = $1
943 ORDER BY item_index ASC",
944 )
945 .bind(&row.batch_id)
946 .fetch_all(&mut **tx)
947 .await
948 .map_err(store_sqlx_error)?;
949 let mut items = Vec::new();
950 for item in item_rows {
951 let payload_json: String = item.get(1);
952 items.push(QueuedWorkItem {
953 item_id: item.get(0),
954 payload: store_decode_json(&payload_json, "queued work payload")?,
955 });
956 }
957 Ok(QueuedWorkBatch {
958 batch_id: row.batch_id,
959 session_id: row.session_id,
960 enqueue_seq: row.enqueue_seq,
961 source_key: row.source_key,
962 delivery_policy: row.delivery_policy,
963 slot_policy: row.slot_policy,
964 merge_key: row.merge_key,
965 available_at_ms: row.available_at_ms,
966 enqueued_at_ms: row.enqueued_at_ms,
967 items,
968 })
969}
970
971async fn ensure_queued_work_completion_tx(
972 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
973 completed: &QueuedWorkCompletion,
974) -> Result<(), StoreError> {
975 for batch_id in &completed.batch_ids {
976 let exists: Option<i64> = sqlx::query_scalar(
977 "SELECT 1::BIGINT FROM lash_queued_work_batches
978 WHERE session_id = $1
979 AND batch_id = $2
980 AND claim_id = $3
981 AND claim_token = $4
982 LIMIT 1",
983 )
984 .bind(&completed.session_id)
985 .bind(batch_id)
986 .bind(&completed.claim_id)
987 .bind(&completed.lease_token)
988 .fetch_optional(&mut **tx)
989 .await
990 .map_err(store_sqlx_error)?;
991 if exists.is_none() {
992 return Err(StoreError::QueuedWorkClaimExpired {
993 session_id: completed.session_id.clone(),
994 claim_id: completed.claim_id.clone(),
995 });
996 }
997 }
998 Ok(())
999}
1000
1001#[async_trait::async_trait]
1002impl RuntimePersistence for PostgresSessionStore {
1003 fn durability_tier(&self) -> DurabilityTier {
1004 DurabilityTier::Durable
1005 }
1006
1007 async fn load_session(
1008 &self,
1009 scope: SessionReadScope,
1010 ) -> Result<Option<PersistedSessionRead>, StoreError> {
1011 let Some(session_id) = self.selected_session_id().await? else {
1012 return Ok(None);
1013 };
1014 let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1015 let Some(meta) = load_session_head_meta_tx(&mut tx, &session_id, false).await? else {
1016 return Ok(None);
1017 };
1018 tx.commit().await.map_err(store_sqlx_error)?;
1019 let leaf_node_id = match &scope {
1020 SessionReadScope::FullGraph => meta.leaf_node_id.clone(),
1021 SessionReadScope::ActivePath { leaf_node_id } => {
1022 leaf_node_id.clone().or_else(|| meta.leaf_node_id.clone())
1023 }
1024 };
1025 let graph = load_graph(
1026 &self.pool,
1027 &session_id,
1028 leaf_node_id.clone(),
1029 matches!(scope, SessionReadScope::ActivePath { .. }),
1030 )
1031 .await?;
1032 let checkpoint = match meta.checkpoint_ref.as_ref() {
1033 Some(blob_ref) => get_checkpoint(&self.pool, blob_ref).await,
1034 None => None,
1035 };
1036 Ok(Some(PersistedSessionRead {
1037 session_id: meta.session_id,
1038 head_revision: meta.head_revision,
1039 config: meta.config,
1040 agent_frames: meta.agent_frames,
1041 current_agent_frame_id: meta.current_agent_frame_id,
1042 graph,
1043 checkpoint_ref: meta.checkpoint_ref,
1044 checkpoint,
1045 token_ledger: merge_token_ledger_entries(
1046 load_usage_deltas(&self.pool, &session_id).await,
1047 ),
1048 }))
1049 }
1050
1051 async fn load_node(&self, node_id: &str) -> Result<Option<SessionNodeRecord>, StoreError> {
1052 let json: Option<String> = if let Some(session_id) = &self.session_id {
1053 sqlx::query_scalar(
1054 "SELECT node_json FROM lash_graph_nodes
1055 WHERE session_id = $1 AND node_id = $2 AND tombstoned = FALSE",
1056 )
1057 .bind(session_id)
1058 .bind(node_id)
1059 .fetch_optional(&self.pool)
1060 .await
1061 .map_err(store_sqlx_error)?
1062 } else {
1063 sqlx::query_scalar(
1064 "SELECT node_json FROM lash_graph_nodes
1065 WHERE node_id = $1 AND tombstoned = FALSE
1066 ORDER BY session_id ASC
1067 LIMIT 1",
1068 )
1069 .bind(node_id)
1070 .fetch_optional(&self.pool)
1071 .await
1072 .map_err(store_sqlx_error)?
1073 };
1074 json.map(|json| store_decode_json(&json, "session graph node"))
1075 .transpose()
1076 }
1077
1078 async fn commit_runtime_state(
1079 &self,
1080 commit: RuntimeCommit,
1081 ) -> Result<RuntimeCommitResult, StoreError> {
1082 let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1083 let existing = load_session_head_meta_tx(&mut tx, &commit.session_id, false).await?;
1087 if let Some(bound_session_id) = existing.as_ref().map(|meta| meta.session_id.as_str())
1088 && bound_session_id != commit.session_id
1089 {
1090 return Err(StoreError::SessionBindingMismatch {
1091 bound_session_id: bound_session_id.to_string(),
1092 attempted_session_id: commit.session_id,
1093 });
1094 }
1095 let effective_binding = self
1099 .session_id
1100 .clone()
1101 .or_else(|| self.bound_session.get().cloned());
1102 if let Some(bound_session_id) = &effective_binding
1103 && commit.session_id != *bound_session_id
1104 {
1105 return Err(StoreError::SessionBindingMismatch {
1106 bound_session_id: bound_session_id.clone(),
1107 attempted_session_id: commit.session_id,
1108 });
1109 }
1110 if self.session_id.is_none() {
1111 let _ = self.bound_session.set(commit.session_id.clone());
1112 }
1113 if let Some(completed) = &commit.turn_commit {
1114 if completed.session_id != commit.session_id {
1115 return Err(StoreError::RuntimeTurnCommitConflict {
1116 session_id: completed.session_id.clone(),
1117 turn_id: completed.turn_id.clone(),
1118 });
1119 }
1120 let prior = sqlx::query(
1121 "SELECT turn_commit_hash, result_json
1122 FROM lash_runtime_turn_commits
1123 WHERE session_id = $1 AND turn_id = $2",
1124 )
1125 .bind(&completed.session_id)
1126 .bind(&completed.turn_id)
1127 .fetch_optional(&mut *tx)
1128 .await
1129 .map_err(store_sqlx_error)?;
1130 if let Some(row) = prior {
1131 let hash: String = row.get(0);
1132 let result_json: String = row.get(1);
1133 if hash == completed.turn_commit_hash {
1134 return store_decode_json(&result_json, "runtime turn commit result");
1135 }
1136 return Err(StoreError::RuntimeTurnCommitConflict {
1137 session_id: completed.session_id.clone(),
1138 turn_id: completed.turn_id.clone(),
1139 });
1140 }
1141 }
1142 let actual_revision = existing.as_ref().map_or(0, |meta| meta.head_revision);
1143 if commit.expected_head_revision.is_some()
1144 && commit.expected_head_revision != Some(actual_revision)
1145 {
1146 return Err(StoreError::HeadRevisionConflict {
1147 expected: commit.expected_head_revision,
1148 actual: actual_revision,
1149 });
1150 }
1151 for completed in &commit.completed_queue_claims {
1152 if completed.session_id != commit.session_id {
1153 return Err(StoreError::QueuedWorkClaimExpired {
1154 session_id: completed.session_id.clone(),
1155 claim_id: completed.claim_id.clone(),
1156 });
1157 }
1158 ensure_queued_work_completion_tx(&mut tx, completed).await?;
1159 }
1160 let (checkpoint_ref, manifest) = put_checkpoint_tx(&mut tx, &commit.checkpoint).await?;
1161 for entry in &commit.usage_deltas {
1162 sqlx::query("INSERT INTO lash_usage_deltas (session_id, entry_json) VALUES ($1, $2)")
1163 .bind(&commit.session_id)
1164 .bind(encode_json(entry))
1165 .execute(&mut *tx)
1166 .await
1167 .map_err(store_sqlx_error)?;
1168 }
1169 let leaf_node_id = match &commit.graph {
1170 GraphCommitDelta::Unchanged { leaf_node_id } => leaf_node_id.clone(),
1171 GraphCommitDelta::Append {
1172 nodes,
1173 leaf_node_id,
1174 } => {
1175 for node in nodes {
1176 sqlx::query(
1177 "INSERT INTO lash_graph_nodes (session_id, node_id, node_json)
1178 VALUES ($1, $2, $3)
1179 ON CONFLICT (session_id, node_id) DO UPDATE SET
1180 node_json = EXCLUDED.node_json,
1181 tombstoned = FALSE",
1182 )
1183 .bind(&commit.session_id)
1184 .bind(&node.node_id)
1185 .bind(encode_json(node))
1186 .execute(&mut *tx)
1187 .await
1188 .map_err(store_sqlx_error)?;
1189 }
1190 leaf_node_id.clone()
1191 }
1192 GraphCommitDelta::ReplaceFull(graph) => {
1193 sqlx::query("DELETE FROM lash_graph_nodes WHERE session_id = $1")
1194 .bind(&commit.session_id)
1195 .execute(&mut *tx)
1196 .await
1197 .map_err(store_sqlx_error)?;
1198 for node in &graph.nodes {
1199 sqlx::query(
1200 "INSERT INTO lash_graph_nodes (session_id, node_id, node_json)
1201 VALUES ($1, $2, $3)",
1202 )
1203 .bind(&commit.session_id)
1204 .bind(&node.node_id)
1205 .bind(encode_json(node))
1206 .execute(&mut *tx)
1207 .await
1208 .map_err(store_sqlx_error)?;
1209 }
1210 graph.leaf_node_id.clone()
1211 }
1212 };
1213 let graph_node_count: i64 = sqlx::query_scalar(
1214 "SELECT COUNT(*) FROM lash_graph_nodes WHERE session_id = $1 AND tombstoned = FALSE",
1215 )
1216 .bind(&commit.session_id)
1217 .fetch_one(&mut *tx)
1218 .await
1219 .map_err(store_sqlx_error)?;
1220 let next_revision = actual_revision + 1;
1221 let meta = SessionHeadMeta {
1222 session_id: commit.session_id.clone(),
1223 head_revision: next_revision,
1224 config: commit.config.clone(),
1225 agent_frames: commit.agent_frames.clone(),
1226 current_agent_frame_id: commit.current_agent_frame_id.clone(),
1227 checkpoint_ref: Some(checkpoint_ref.clone()),
1228 leaf_node_id,
1229 graph_node_count: graph_node_count as usize,
1230 token_ledger: Vec::new(),
1231 };
1232 let head_write = sqlx::query(
1237 "INSERT INTO lash_sessions (session_id, head_revision, head_json, checkpoint_ref)
1238 VALUES ($1, $2, $3, $4)
1239 ON CONFLICT (session_id) DO UPDATE SET
1240 head_revision = EXCLUDED.head_revision,
1241 head_json = EXCLUDED.head_json,
1242 checkpoint_ref = EXCLUDED.checkpoint_ref
1243 WHERE lash_sessions.head_revision = $5",
1244 )
1245 .bind(&commit.session_id)
1246 .bind(next_revision as i64)
1247 .bind(encode_json(&meta))
1248 .bind(checkpoint_ref.as_str())
1249 .bind(actual_revision as i64)
1250 .execute(&mut *tx)
1251 .await;
1252 let head_write = match head_write {
1253 Ok(result) => result,
1254 Err(err) if is_contention_error(&err) => {
1255 return Err(StoreError::HeadRevisionConflict {
1260 expected: commit.expected_head_revision.or(Some(actual_revision)),
1261 actual: actual_revision,
1262 });
1263 }
1264 Err(err) => return Err(store_sqlx_error(err)),
1265 };
1266 if head_write.rows_affected() == 0 {
1267 let actual_now = sqlx::query_scalar::<_, i64>(
1272 "SELECT head_revision FROM lash_sessions WHERE session_id = $1",
1273 )
1274 .bind(&commit.session_id)
1275 .fetch_optional(&mut *tx)
1276 .await
1277 .map_err(store_sqlx_error)?
1278 .map_or(actual_revision, |revision| revision as u64);
1279 return Err(StoreError::HeadRevisionConflict {
1280 expected: commit.expected_head_revision.or(Some(actual_revision)),
1281 actual: actual_now,
1282 });
1283 }
1284 for completed in &commit.completed_queue_claims {
1285 for batch_id in &completed.batch_ids {
1286 sqlx::query(
1287 "DELETE FROM lash_queued_work_batches
1288 WHERE session_id = $1 AND batch_id = $2 AND claim_id = $3 AND claim_token = $4",
1289 )
1290 .bind(&completed.session_id)
1291 .bind(batch_id)
1292 .bind(&completed.claim_id)
1293 .bind(&completed.lease_token)
1294 .execute(&mut *tx)
1295 .await
1296 .map_err(store_sqlx_error)?;
1297 }
1298 }
1299 commit_attachment_refs_tx(
1300 &mut tx,
1301 &commit.session_id,
1302 &commit.committed_attachment_ids,
1303 )
1304 .await?;
1305 let result = RuntimeCommitResult {
1306 head_revision: next_revision,
1307 checkpoint_ref,
1308 manifest,
1309 };
1310 if let Some(completed) = &commit.turn_commit {
1311 sqlx::query(
1312 "INSERT INTO lash_runtime_turn_commits (
1313 session_id, turn_id, turn_commit_hash, result_json, committed_at_ms
1314 )
1315 VALUES ($1, $2, $3, $4, $5)",
1316 )
1317 .bind(&completed.session_id)
1318 .bind(&completed.turn_id)
1319 .bind(&completed.turn_commit_hash)
1320 .bind(encode_json(&result))
1321 .bind(current_epoch_ms() as i64)
1322 .execute(&mut *tx)
1323 .await
1324 .map_err(store_sqlx_error)?;
1325 }
1326 tx.commit().await.map_err(store_sqlx_error)?;
1327 Ok(result)
1328 }
1329
1330 async fn enqueue_queued_work(
1331 &self,
1332 batch: QueuedWorkBatchDraft,
1333 ) -> Result<QueuedWorkBatch, StoreError> {
1334 let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1335 if let Some(source_key) = batch.source_key.as_deref() {
1336 let existing_id: Option<String> = sqlx::query_scalar(
1337 "SELECT batch_id FROM lash_queued_work_batches
1338 WHERE session_id = $1 AND source_key = $2",
1339 )
1340 .bind(&batch.session_id)
1341 .bind(source_key)
1342 .fetch_optional(&mut *tx)
1343 .await
1344 .map_err(store_sqlx_error)?;
1345 if let Some(batch_id) = existing_id {
1346 let existing = load_queued_batch(&mut tx, &batch_id)
1347 .await?
1348 .ok_or_else(|| {
1349 StoreError::Backend("queued work source row disappeared".to_string())
1350 })?;
1351 tx.commit().await.map_err(store_sqlx_error)?;
1352 return Ok(existing);
1353 }
1354 }
1355 let now = current_epoch_ms();
1356 let batch_id = format!(
1357 "qwb:{:x}",
1358 Sha256::digest(format!("{}:{:?}:{now}", batch.session_id, batch.source_key).as_bytes())
1359 );
1360 let row = sqlx::query_scalar::<_, i64>(
1361 "INSERT INTO lash_queued_work_batches (
1362 batch_id, session_id, source_key, delivery_policy, slot_policy,
1363 merge_key_json, available_at_ms, enqueued_at_ms
1364 )
1365 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
1366 RETURNING enqueue_seq",
1367 )
1368 .bind(&batch_id)
1369 .bind(&batch.session_id)
1370 .bind(&batch.source_key)
1371 .bind(batch.delivery_policy.as_str())
1372 .bind(batch.slot_policy.as_str())
1373 .bind(encode_json(&batch.merge_key))
1374 .bind(batch.available_at_ms as i64)
1375 .bind(now as i64)
1376 .fetch_one(&mut *tx)
1377 .await
1378 .map_err(store_sqlx_error)?;
1379 for (index, payload) in batch.payloads.iter().enumerate() {
1380 let item_id = format!("{batch_id}:item:{index}");
1381 sqlx::query(
1382 "INSERT INTO lash_queued_work_items (batch_id, item_index, item_id, payload_json)
1383 VALUES ($1, $2, $3, $4)",
1384 )
1385 .bind(&batch_id)
1386 .bind(index as i32)
1387 .bind(item_id)
1388 .bind(encode_json(payload))
1389 .execute(&mut *tx)
1390 .await
1391 .map_err(store_sqlx_error)?;
1392 }
1393 let queued = load_queued_batch(&mut tx, &batch_id)
1394 .await?
1395 .ok_or_else(|| StoreError::Backend("queued work insert disappeared".to_string()))?;
1396 debug_assert_eq!(queued.enqueue_seq, row as u64);
1397 tx.commit().await.map_err(store_sqlx_error)?;
1398 Ok(queued)
1399 }
1400
1401 async fn claim_ready_queued_work(
1402 &self,
1403 session_id: &str,
1404 owner_id: &str,
1405 boundary: QueuedWorkClaimBoundary,
1406 lease_ttl_ms: u64,
1407 max_batches: usize,
1408 ) -> Result<Option<QueuedWorkClaim>, StoreError> {
1409 if max_batches == 0 {
1410 return Ok(None);
1411 }
1412 let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1413 let now = current_epoch_ms();
1414 let rows = sqlx::query(
1415 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1416 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1417 claim_fencing_token
1418 FROM lash_queued_work_batches
1419 WHERE session_id = $1
1420 AND available_at_ms <= $2
1421 AND (claim_token IS NULL OR claim_expires_at_ms <= $2)
1422 ORDER BY enqueue_seq ASC
1423 LIMIT $3
1424 FOR UPDATE SKIP LOCKED",
1425 )
1426 .bind(session_id)
1427 .bind(now as i64)
1428 .bind((max_batches as i64).saturating_add(32))
1429 .fetch_all(&mut *tx)
1430 .await
1431 .map_err(store_sqlx_error)?;
1432 let mut candidates = Vec::new();
1433 for row in rows {
1434 candidates.push(queued_batch_row(row)?);
1435 }
1436 let Some(first_row) = candidates.first() else {
1437 tx.commit().await.map_err(store_sqlx_error)?;
1438 return Ok(None);
1439 };
1440 if boundary == QueuedWorkClaimBoundary::ActiveTurnCheckpoint
1441 && first_row.delivery_policy != DeliveryPolicy::EarliestSafeBoundary
1442 {
1443 tx.commit().await.map_err(store_sqlx_error)?;
1444 return Ok(None);
1445 }
1446 let first_slot = first_row.slot_policy;
1447 let first_delivery = first_row.delivery_policy;
1448 let first_merge = first_row.merge_key.clone();
1449 let mut selected = Vec::new();
1450 for row in candidates {
1451 if selected.len() >= max_batches {
1452 break;
1453 }
1454 if selected.is_empty() {
1455 selected.push(row);
1456 if first_slot == SlotPolicy::Exclusive {
1457 break;
1458 }
1459 continue;
1460 }
1461 if first_slot != SlotPolicy::Join
1462 || row.slot_policy != SlotPolicy::Join
1463 || row.delivery_policy != first_delivery
1464 || row.merge_key != first_merge
1465 {
1466 break;
1467 }
1468 selected.push(row);
1469 }
1470 let Some(first) = selected.first() else {
1471 tx.commit().await.map_err(store_sqlx_error)?;
1472 return Ok(None);
1473 };
1474 let fencing_token = first.claim_fencing_token.saturating_add(1);
1475 let claim_id = format!("qwc:{}:{fencing_token}", first.enqueue_seq);
1476 let lease_token = format!(
1477 "{:x}",
1478 Sha256::digest(format!("{session_id}:{owner_id}:{claim_id}:{now}").as_bytes())
1479 );
1480 let expires_at = now.saturating_add(lease_ttl_ms);
1481 for row in &selected {
1482 let changed = sqlx::query(
1483 "UPDATE lash_queued_work_batches
1484 SET claim_id = $3,
1485 claim_owner_id = $4,
1486 claim_token = $5,
1487 claim_fencing_token = claim_fencing_token + 1,
1488 claim_claimed_at_ms = $6,
1489 claim_expires_at_ms = $7
1490 WHERE session_id = $1
1491 AND batch_id = $2
1492 AND (claim_token IS NULL OR claim_expires_at_ms <= $6)",
1493 )
1494 .bind(session_id)
1495 .bind(&row.batch_id)
1496 .bind(&claim_id)
1497 .bind(owner_id)
1498 .bind(&lease_token)
1499 .bind(now as i64)
1500 .bind(expires_at as i64)
1501 .execute(&mut *tx)
1502 .await
1503 .map_err(store_sqlx_error)?
1504 .rows_affected();
1505 if changed == 0 {
1506 tx.rollback().await.map_err(store_sqlx_error)?;
1507 return Ok(None);
1508 }
1509 }
1510 let mut batches = Vec::new();
1511 for row in selected {
1512 batches.push(queued_work_batch_from_row(&mut tx, row).await?);
1513 }
1514 tx.commit().await.map_err(store_sqlx_error)?;
1515 Ok(Some(QueuedWorkClaim {
1516 session_id: session_id.to_string(),
1517 claim_id,
1518 owner_id: owner_id.to_string(),
1519 lease_token,
1520 fencing_token,
1521 claimed_at_epoch_ms: now,
1522 expires_at_epoch_ms: expires_at,
1523 batches,
1524 }))
1525 }
1526
1527 async fn renew_queued_work_claim(
1528 &self,
1529 claim: &QueuedWorkClaim,
1530 lease_ttl_ms: u64,
1531 ) -> Result<QueuedWorkClaim, StoreError> {
1532 let expires_at = current_epoch_ms().saturating_add(lease_ttl_ms);
1533 let changed = sqlx::query(
1534 "UPDATE lash_queued_work_batches
1535 SET claim_expires_at_ms = $4
1536 WHERE session_id = $1 AND claim_id = $2 AND claim_token = $3",
1537 )
1538 .bind(&claim.session_id)
1539 .bind(&claim.claim_id)
1540 .bind(&claim.lease_token)
1541 .bind(expires_at as i64)
1542 .execute(&self.pool)
1543 .await
1544 .map_err(store_sqlx_error)?
1545 .rows_affected();
1546 if changed as usize != claim.batches.len() {
1547 return Err(StoreError::QueuedWorkClaimExpired {
1548 session_id: claim.session_id.clone(),
1549 claim_id: claim.claim_id.clone(),
1550 });
1551 }
1552 Ok(QueuedWorkClaim {
1553 expires_at_epoch_ms: expires_at,
1554 ..claim.clone()
1555 })
1556 }
1557
1558 async fn abandon_queued_work_claim(&self, claim: &QueuedWorkClaim) -> Result<(), StoreError> {
1559 sqlx::query(
1560 "UPDATE lash_queued_work_batches
1561 SET claim_id = NULL,
1562 claim_owner_id = NULL,
1563 claim_token = NULL,
1564 claim_claimed_at_ms = 0,
1565 claim_expires_at_ms = 0
1566 WHERE session_id = $1 AND claim_id = $2 AND claim_token = $3",
1567 )
1568 .bind(&claim.session_id)
1569 .bind(&claim.claim_id)
1570 .bind(&claim.lease_token)
1571 .execute(&self.pool)
1572 .await
1573 .map_err(store_sqlx_error)?;
1574 Ok(())
1575 }
1576
1577 async fn cancel_queued_work_batch(
1578 &self,
1579 session_id: &str,
1580 batch_id: &str,
1581 ) -> Result<Option<QueuedWorkBatch>, StoreError> {
1582 let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1583 let now = current_epoch_ms();
1584 let row = sqlx::query(
1585 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1586 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1587 claim_fencing_token
1588 FROM lash_queued_work_batches
1589 WHERE session_id = $1
1590 AND batch_id = $2
1591 AND (claim_token IS NULL OR claim_expires_at_ms <= $3)
1592 FOR UPDATE",
1593 )
1594 .bind(session_id)
1595 .bind(batch_id)
1596 .bind(now as i64)
1597 .fetch_optional(&mut *tx)
1598 .await
1599 .map_err(store_sqlx_error)?;
1600 let Some(row) = row else {
1601 tx.commit().await.map_err(store_sqlx_error)?;
1602 return Ok(None);
1603 };
1604 let batch = queued_work_batch_from_row(&mut tx, queued_batch_row(row)?).await?;
1605 sqlx::query("DELETE FROM lash_queued_work_batches WHERE batch_id = $1")
1606 .bind(batch_id)
1607 .execute(&mut *tx)
1608 .await
1609 .map_err(store_sqlx_error)?;
1610 tx.commit().await.map_err(store_sqlx_error)?;
1611 Ok(Some(batch))
1612 }
1613
1614 async fn list_queued_work(&self, session_id: &str) -> Result<Vec<QueuedWorkBatch>, StoreError> {
1615 let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1616 let rows = sqlx::query(
1617 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1618 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1619 claim_fencing_token
1620 FROM lash_queued_work_batches
1621 WHERE session_id = $1
1622 ORDER BY enqueue_seq ASC",
1623 )
1624 .bind(session_id)
1625 .fetch_all(&mut *tx)
1626 .await
1627 .map_err(store_sqlx_error)?;
1628 let mut batches = Vec::new();
1629 for row in rows {
1630 batches.push(queued_work_batch_from_row(&mut tx, queued_batch_row(row)?).await?);
1631 }
1632 tx.commit().await.map_err(store_sqlx_error)?;
1633 Ok(batches)
1634 }
1635
1636 async fn list_pending_queued_work(
1637 &self,
1638 session_id: &str,
1639 ) -> Result<Vec<QueuedWorkBatch>, StoreError> {
1640 let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1641 let now = current_epoch_ms();
1642 let rows = sqlx::query(
1643 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1644 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1645 claim_fencing_token
1646 FROM lash_queued_work_batches
1647 WHERE session_id = $1
1648 AND (claim_token IS NULL OR claim_expires_at_ms <= $2)
1649 ORDER BY enqueue_seq ASC",
1650 )
1651 .bind(session_id)
1652 .bind(now as i64)
1653 .fetch_all(&mut *tx)
1654 .await
1655 .map_err(store_sqlx_error)?;
1656 let mut batches = Vec::new();
1657 for row in rows {
1658 batches.push(queued_work_batch_from_row(&mut tx, queued_batch_row(row)?).await?);
1659 }
1660 tx.commit().await.map_err(store_sqlx_error)?;
1661 Ok(batches)
1662 }
1663
1664 async fn save_session_meta(&self, meta: SessionMeta) -> Result<(), StoreError> {
1665 sqlx::query(
1666 "INSERT INTO lash_session_meta (session_id, meta_json)
1667 VALUES ($1, $2)
1668 ON CONFLICT (session_id) DO UPDATE SET meta_json = EXCLUDED.meta_json",
1669 )
1670 .bind(&meta.session_id)
1671 .bind(encode_json(&meta))
1672 .execute(&self.pool)
1673 .await
1674 .map_err(store_sqlx_error)?;
1675 Ok(())
1676 }
1677
1678 async fn load_session_meta(&self) -> Result<Option<SessionMeta>, StoreError> {
1679 let json: Option<String> = if let Some(session_id) = &self.session_id {
1680 sqlx::query_scalar("SELECT meta_json FROM lash_session_meta WHERE session_id = $1")
1681 .bind(session_id)
1682 .fetch_optional(&self.pool)
1683 .await
1684 .map_err(store_sqlx_error)?
1685 } else {
1686 sqlx::query_scalar(
1687 "SELECT meta_json FROM lash_session_meta ORDER BY session_id ASC LIMIT 1",
1688 )
1689 .fetch_optional(&self.pool)
1690 .await
1691 .map_err(store_sqlx_error)?
1692 };
1693 json.map(|json| store_decode_json(&json, "session meta"))
1694 .transpose()
1695 }
1696
1697 async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), StoreError> {
1698 for id in ids {
1699 if let Some(session_id) = &self.session_id {
1700 sqlx::query(
1701 "UPDATE lash_graph_nodes
1702 SET tombstoned = TRUE
1703 WHERE session_id = $1 AND node_id = $2",
1704 )
1705 .bind(session_id)
1706 .bind(id)
1707 .execute(&self.pool)
1708 .await
1709 .map_err(store_sqlx_error)?;
1710 } else {
1711 sqlx::query(
1712 "UPDATE lash_graph_nodes
1713 SET tombstoned = TRUE
1714 WHERE node_id = $1",
1715 )
1716 .bind(id)
1717 .execute(&self.pool)
1718 .await
1719 .map_err(store_sqlx_error)?;
1720 }
1721 }
1722 Ok(())
1723 }
1724
1725 async fn vacuum(&self) -> Result<VacuumReport, StoreError> {
1726 let removed = if let Some(session_id) = &self.session_id {
1727 sqlx::query("DELETE FROM lash_graph_nodes WHERE session_id = $1 AND tombstoned = TRUE")
1728 .bind(session_id)
1729 .execute(&self.pool)
1730 .await
1731 .map_err(store_sqlx_error)?
1732 .rows_affected()
1733 } else {
1734 sqlx::query("DELETE FROM lash_graph_nodes WHERE tombstoned = TRUE")
1735 .execute(&self.pool)
1736 .await
1737 .map_err(store_sqlx_error)?
1738 .rows_affected()
1739 };
1740 Ok(VacuumReport {
1741 removed_node_count: removed as usize,
1742 })
1743 }
1744
1745 async fn gc_unreachable(&self) -> Result<GcReport, StoreError> {
1746 Ok(GcReport::default())
1747 }
1748}
1749
1750fn process_status_label(record: &ProcessRecord) -> &'static str {
1751 record.status.label()
1752}
1753
1754async fn load_process_tx(
1755 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1756 process_id: &str,
1757) -> Result<Option<ProcessRecord>, PluginError> {
1758 let json: Option<String> = sqlx::query_scalar(
1759 "SELECT record_json
1760 FROM lash_processes
1761 WHERE process_id = $1
1762 FOR UPDATE",
1763 )
1764 .bind(process_id)
1765 .fetch_optional(&mut **tx)
1766 .await
1767 .map_err(plugin_sqlx_error)?;
1768 json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
1769 .transpose()
1770}
1771
1772async fn load_process(
1773 pool: &PgPool,
1774 process_id: &str,
1775) -> Result<Option<ProcessRecord>, PluginError> {
1776 let json: Option<String> =
1777 sqlx::query_scalar("SELECT record_json FROM lash_processes WHERE process_id = $1")
1778 .bind(process_id)
1779 .fetch_optional(pool)
1780 .await
1781 .map_err(plugin_sqlx_error)?;
1782 json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
1783 .transpose()
1784}
1785
1786async fn save_process_tx(
1787 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1788 record: &ProcessRecord,
1789) -> Result<(), PluginError> {
1790 sqlx::query(
1791 "UPDATE lash_processes
1792 SET updated_at_ms = $2, status = $3, record_json = $4
1793 WHERE process_id = $1",
1794 )
1795 .bind(&record.id)
1796 .bind(record.updated_at_ms as i64)
1797 .bind(process_status_label(record))
1798 .bind(serde_json::to_string(record).map_err(process_decode_error)?)
1799 .execute(&mut **tx)
1800 .await
1801 .map_err(plugin_sqlx_error)?;
1802 Ok(())
1803}
1804
1805async fn load_event_by_key_tx(
1806 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1807 process_id: &str,
1808 replay_key: &str,
1809) -> Result<Option<(String, ProcessEvent)>, PluginError> {
1810 let row = sqlx::query(
1811 "SELECT payload_hash, event_json
1812 FROM lash_process_events
1813 WHERE process_id = $1 AND idempotency_key = $2",
1814 )
1815 .bind(process_id)
1816 .bind(replay_key)
1817 .fetch_optional(&mut **tx)
1818 .await
1819 .map_err(plugin_sqlx_error)?;
1820 row.map(|row| {
1821 let hash: String = row.get(0);
1822 let json: String = row.get(1);
1823 serde_json::from_str(&json)
1824 .map(|event| (hash, event))
1825 .map_err(process_decode_error)
1826 })
1827 .transpose()
1828}
1829
1830async fn load_process_lease_tx(
1831 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1832 process_id: &str,
1833) -> Result<Option<ProcessLease>, PluginError> {
1834 let row = sqlx::query(
1835 "SELECT lease_owner_id, lease_token, lease_fencing_token,
1836 lease_claimed_at_ms, lease_expires_at_ms
1837 FROM lash_process_leases
1838 WHERE process_id = $1",
1839 )
1840 .bind(process_id)
1841 .fetch_optional(&mut **tx)
1842 .await
1843 .map_err(plugin_sqlx_error)?;
1844 let Some(row) = row else {
1845 return Ok(None);
1846 };
1847 let owner_id: Option<String> = row.get(0);
1848 let lease_token: Option<String> = row.get(1);
1849 let (Some(owner_id), Some(lease_token)) = (owner_id, lease_token) else {
1850 return Ok(None);
1851 };
1852 Ok(Some(ProcessLease {
1853 schema_version: PROCESS_LEASE_SCHEMA_VERSION,
1854 process_id: process_id.to_string(),
1855 owner_id,
1856 lease_token,
1857 fencing_token: row.get::<i64, _>(2) as u64,
1858 claimed_at_epoch_ms: row.get::<i64, _>(3) as u64,
1859 expires_at_epoch_ms: row.get::<i64, _>(4) as u64,
1860 }))
1861}
1862
1863fn process_lease_conflict(process_id: &str, current: &ProcessLease) -> PluginError {
1864 PluginError::Session(format!(
1865 "process `{process_id}` is already leased by `{}` until {}",
1866 current.owner_id, current.expires_at_epoch_ms
1867 ))
1868}
1869
1870fn process_lease_expired(process_id: &str) -> PluginError {
1871 PluginError::Session(format!(
1872 "process lease for `{process_id}` is missing or expired"
1873 ))
1874}
1875
1876fn guard_lease(current: Option<&ProcessLease>, lease_token: &str, now: u64) -> bool {
1877 current
1878 .map(|current| current.lease_token == lease_token && current.expires_at_epoch_ms > now)
1879 .unwrap_or(false)
1880}
1881
1882async fn list_grants_for_scope(
1883 pool: &PgPool,
1884 owner_scope: &ProcessScope,
1885 live_only: bool,
1886) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
1887 let status_clause = if live_only {
1888 "AND p.status = 'running'"
1889 } else {
1890 ""
1891 };
1892 let sql = format!(
1893 "SELECT g.process_id, g.descriptor_json, p.record_json
1894 FROM lash_process_handle_grants g
1895 JOIN lash_processes p ON p.process_id = g.process_id
1896 WHERE g.scope_id = $1 {status_clause}
1897 ORDER BY g.process_id ASC"
1898 );
1899 let rows = sqlx::query(&sql)
1900 .bind(owner_scope.id().as_str())
1901 .fetch_all(pool)
1902 .await
1903 .map_err(plugin_sqlx_error)?;
1904 let mut entries = Vec::new();
1905 for row in rows {
1906 let process_id: String = row.get(0);
1907 let descriptor_json: String = row.get(1);
1908 let record_json: String = row.get(2);
1909 let descriptor: ProcessHandleDescriptor =
1910 serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
1911 let record: ProcessRecord =
1912 serde_json::from_str(&record_json).map_err(process_decode_error)?;
1913 entries.push((
1914 ProcessHandleGrant {
1915 session_id: owner_scope.session_id.clone(),
1916 process_id,
1917 descriptor,
1918 },
1919 record,
1920 ));
1921 }
1922 Ok(entries)
1923}
1924
1925#[async_trait::async_trait]
1926impl ProcessRegistry for PostgresProcessRegistry {
1927 fn durability_tier(&self) -> DurabilityTier {
1928 DurabilityTier::Durable
1929 }
1930
1931 async fn register_process(
1932 &self,
1933 registration: ProcessRegistration,
1934 ) -> Result<ProcessRecord, PluginError> {
1935 let (registration, registration_hash) =
1936 lash_core::runtime::prepare_process_registration(registration)?;
1937 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
1938 if let Some(existing) = load_process_tx(&mut tx, ®istration.id).await? {
1939 if existing.registration_hash == registration_hash {
1940 tx.commit().await.map_err(plugin_sqlx_error)?;
1941 return Ok(existing);
1942 }
1943 return Err(PluginError::Session(format!(
1944 "process `{}` registration hash conflict: existing {}, new {}",
1945 registration.id, existing.registration_hash, registration_hash
1946 )));
1947 }
1948 let now = current_epoch_ms();
1949 let record =
1950 ProcessRecord::from_prepared_registration(registration, registration_hash, now);
1951 let record_json = serde_json::to_string(&record).map_err(process_decode_error)?;
1952 sqlx::query(
1953 "INSERT INTO lash_processes (
1954 process_id, registration_hash, owner_scope_id, host_profile_id,
1955 created_at_ms, updated_at_ms, status, record_json
1956 )
1957 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
1958 )
1959 .bind(&record.id)
1960 .bind(&record.registration_hash)
1961 .bind(record.owner_scope_id().as_str())
1962 .bind(record.host_profile_id())
1963 .bind(record.created_at_ms as i64)
1964 .bind(record.updated_at_ms as i64)
1965 .bind(process_status_label(&record))
1966 .bind(record_json)
1967 .execute(&mut *tx)
1968 .await
1969 .map_err(plugin_sqlx_error)?;
1970 tx.commit().await.map_err(plugin_sqlx_error)?;
1971 Ok(record)
1972 }
1973
1974 async fn set_external_ref(
1975 &self,
1976 process_id: &str,
1977 external_ref: ProcessExternalRef,
1978 ) -> Result<ProcessRecord, PluginError> {
1979 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
1980 let mut record = load_process_tx(&mut tx, process_id)
1981 .await?
1982 .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
1983 record.external_ref = Some(external_ref);
1984 record.updated_at_ms = current_epoch_ms();
1985 save_process_tx(&mut tx, &record).await?;
1986 tx.commit().await.map_err(plugin_sqlx_error)?;
1987 Ok(record)
1988 }
1989
1990 async fn grant_handle(
1991 &self,
1992 owner_scope: &ProcessScope,
1993 process_id: &str,
1994 descriptor: ProcessHandleDescriptor,
1995 ) -> Result<ProcessHandleGrant, PluginError> {
1996 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
1997 if load_process_tx(&mut tx, process_id).await?.is_none() {
1998 return Err(PluginError::Session(format!(
1999 "unknown process `{process_id}`"
2000 )));
2001 }
2002 sqlx::query(
2003 "INSERT INTO lash_process_handle_grants (session_id, scope_id, process_id, descriptor_json)
2004 VALUES ($1, $2, $3, $4)
2005 ON CONFLICT (scope_id, process_id) DO UPDATE SET
2006 session_id = EXCLUDED.session_id,
2007 descriptor_json = EXCLUDED.descriptor_json",
2008 )
2009 .bind(&owner_scope.session_id)
2010 .bind(owner_scope.id().as_str())
2011 .bind(process_id)
2012 .bind(serde_json::to_string(&descriptor).map_err(process_decode_error)?)
2013 .execute(&mut *tx)
2014 .await
2015 .map_err(plugin_sqlx_error)?;
2016 tx.commit().await.map_err(plugin_sqlx_error)?;
2017 Ok(ProcessHandleGrant {
2018 session_id: owner_scope.session_id.clone(),
2019 process_id: process_id.to_string(),
2020 descriptor,
2021 })
2022 }
2023
2024 async fn revoke_handle(
2025 &self,
2026 owner_scope: &ProcessScope,
2027 process_id: &str,
2028 ) -> Result<(), PluginError> {
2029 sqlx::query(
2030 "DELETE FROM lash_process_handle_grants WHERE scope_id = $1 AND process_id = $2",
2031 )
2032 .bind(owner_scope.id().as_str())
2033 .bind(process_id)
2034 .execute(&self.pool)
2035 .await
2036 .map_err(plugin_sqlx_error)?;
2037 Ok(())
2038 }
2039
2040 async fn transfer_handle_grants(
2041 &self,
2042 from_scope: &ProcessScope,
2043 to_scope: &ProcessScope,
2044 process_ids: &[String],
2045 ) -> Result<(), PluginError> {
2046 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2047 for process_id in process_ids {
2048 let descriptor_json: Option<String> = sqlx::query_scalar(
2049 "SELECT descriptor_json FROM lash_process_handle_grants
2050 WHERE scope_id = $1 AND process_id = $2",
2051 )
2052 .bind(from_scope.id().as_str())
2053 .bind(process_id)
2054 .fetch_optional(&mut *tx)
2055 .await
2056 .map_err(plugin_sqlx_error)?;
2057 let Some(descriptor_json) = descriptor_json else {
2058 return Err(PluginError::Session(format!(
2059 "process handle `{process_id}` is not granted to session `{}`",
2060 from_scope.session_id
2061 )));
2062 };
2063 sqlx::query(
2064 "DELETE FROM lash_process_handle_grants WHERE scope_id = $1 AND process_id = $2",
2065 )
2066 .bind(from_scope.id().as_str())
2067 .bind(process_id)
2068 .execute(&mut *tx)
2069 .await
2070 .map_err(plugin_sqlx_error)?;
2071 sqlx::query(
2072 "INSERT INTO lash_process_handle_grants (session_id, scope_id, process_id, descriptor_json)
2073 VALUES ($1, $2, $3, $4)
2074 ON CONFLICT (scope_id, process_id) DO UPDATE SET
2075 session_id = EXCLUDED.session_id,
2076 descriptor_json = EXCLUDED.descriptor_json",
2077 )
2078 .bind(&to_scope.session_id)
2079 .bind(to_scope.id().as_str())
2080 .bind(process_id)
2081 .bind(descriptor_json)
2082 .execute(&mut *tx)
2083 .await
2084 .map_err(plugin_sqlx_error)?;
2085 }
2086 tx.commit().await.map_err(plugin_sqlx_error)
2087 }
2088
2089 async fn list_handle_grants(
2090 &self,
2091 owner_scope: &ProcessScope,
2092 ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
2093 list_grants_for_scope(&self.pool, owner_scope, false).await
2094 }
2095
2096 async fn list_live_handle_grants(
2097 &self,
2098 owner_scope: &ProcessScope,
2099 ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
2100 list_grants_for_scope(&self.pool, owner_scope, true).await
2101 }
2102
2103 async fn has_handle_grant(
2104 &self,
2105 owner_scope: &ProcessScope,
2106 process_id: &str,
2107 ) -> Result<bool, PluginError> {
2108 let exists: Option<i64> = sqlx::query_scalar(
2109 "SELECT 1::BIGINT FROM lash_process_handle_grants
2110 WHERE scope_id = $1 AND process_id = $2
2111 LIMIT 1",
2112 )
2113 .bind(owner_scope.id().as_str())
2114 .bind(process_id)
2115 .fetch_optional(&self.pool)
2116 .await
2117 .map_err(plugin_sqlx_error)?;
2118 Ok(exists.is_some())
2119 }
2120
2121 async fn handle_grants_for_process(
2122 &self,
2123 process_id: &str,
2124 ) -> Result<Vec<ProcessHandleGrant>, PluginError> {
2125 if load_process(&self.pool, process_id).await?.is_none() {
2126 return Err(PluginError::Session(format!(
2127 "unknown process `{process_id}`"
2128 )));
2129 }
2130 let rows = sqlx::query(
2131 "SELECT session_id, descriptor_json
2132 FROM lash_process_handle_grants
2133 WHERE process_id = $1
2134 ORDER BY session_id ASC, scope_id ASC",
2135 )
2136 .bind(process_id)
2137 .fetch_all(&self.pool)
2138 .await
2139 .map_err(plugin_sqlx_error)?;
2140 let mut grants = Vec::new();
2141 for row in rows {
2142 let descriptor_json: String = row.get(1);
2143 grants.push(ProcessHandleGrant {
2144 session_id: row.get(0),
2145 process_id: process_id.to_string(),
2146 descriptor: serde_json::from_str(&descriptor_json).map_err(process_decode_error)?,
2147 });
2148 }
2149 Ok(grants)
2150 }
2151
2152 async fn delete_session_process_state(
2153 &self,
2154 session_id: &str,
2155 ) -> Result<lash_core::ProcessSessionDeleteReport, PluginError> {
2156 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2157 let rows = sqlx::query(
2158 "SELECT g.process_id, p.record_json
2159 FROM lash_process_handle_grants g
2160 JOIN lash_processes p ON p.process_id = g.process_id
2161 WHERE g.session_id = $1
2162 ORDER BY g.process_id ASC",
2163 )
2164 .bind(session_id)
2165 .fetch_all(&mut *tx)
2166 .await
2167 .map_err(plugin_sqlx_error)?;
2168 let mut removed = Vec::new();
2169 for row in rows {
2170 let process_id: String = row.get(0);
2171 let record_json: String = row.get(1);
2172 let record: ProcessRecord =
2173 serde_json::from_str(&record_json).map_err(process_decode_error)?;
2174 removed.push((process_id, record));
2175 }
2176 let revoked = sqlx::query("DELETE FROM lash_process_handle_grants WHERE session_id = $1")
2177 .bind(session_id)
2178 .execute(&mut *tx)
2179 .await
2180 .map_err(plugin_sqlx_error)?
2181 .rows_affected() as usize;
2182 let mut cancel_process_ids = Vec::new();
2183 let mut preserved_process_ids = Vec::new();
2184 for (process_id, record) in removed {
2185 if record.is_terminal() {
2186 continue;
2187 }
2188 let remaining: i64 = sqlx::query_scalar(
2189 "SELECT COUNT(*) FROM lash_process_handle_grants WHERE process_id = $1",
2190 )
2191 .bind(&process_id)
2192 .fetch_one(&mut *tx)
2193 .await
2194 .map_err(plugin_sqlx_error)?;
2195 if remaining == 0 {
2196 cancel_process_ids.push(process_id);
2197 } else {
2198 preserved_process_ids.push(process_id);
2199 }
2200 }
2201 tx.commit().await.map_err(plugin_sqlx_error)?;
2202 cancel_process_ids.sort();
2203 cancel_process_ids.dedup();
2204 preserved_process_ids.sort();
2205 preserved_process_ids.dedup();
2206 Ok(lash_core::ProcessSessionDeleteReport {
2207 session_id: session_id.to_string(),
2208 revoked_handle_count: revoked,
2209 deleted_wake_count: 0,
2210 cancel_process_ids,
2211 preserved_process_ids,
2212 })
2213 }
2214
2215 async fn append_event(
2216 &self,
2217 process_id: &str,
2218 request: ProcessEventAppendRequest,
2219 ) -> Result<ProcessEventAppendResult, PluginError> {
2220 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2221 let mut record = load_process_tx(&mut tx, process_id)
2222 .await?
2223 .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
2224 let replay_lookup =
2225 if let Some(replay_key) = request.replay.as_ref().map(|r| r.key.as_str()) {
2226 load_event_by_key_tx(&mut tx, process_id, replay_key).await?
2227 } else {
2228 None
2229 };
2230 let sequence: i64 = sqlx::query_scalar(
2231 "SELECT COALESCE(MAX(sequence), 0) + 1 FROM lash_process_events WHERE process_id = $1",
2232 )
2233 .bind(process_id)
2234 .fetch_one(&mut *tx)
2235 .await
2236 .map_err(plugin_sqlx_error)?;
2237 let occurred_at_ms = current_epoch_ms();
2238 let prepared = lash_core::runtime::prepare_process_event_append(
2239 &record,
2240 request,
2241 sequence as u64,
2242 replay_lookup,
2243 occurred_at_ms,
2244 )?;
2245 if prepared.replayed {
2246 let repaired = if let Some(status) = prepared.status_update.clone() {
2247 record.status = status;
2248 record.updated_at_ms = prepared.occurred_at_ms;
2249 save_process_tx(&mut tx, &record).await?;
2250 true
2251 } else {
2252 false
2253 };
2254 tx.commit().await.map_err(plugin_sqlx_error)?;
2255 if repaired {
2256 self.notify.notify_waiters();
2257 }
2258 return Ok(ProcessEventAppendResult {
2259 event: prepared.event,
2260 wake_delivery: prepared.wake_delivery,
2261 });
2262 }
2263 let event = prepared.event;
2264 sqlx::query(
2265 "INSERT INTO lash_process_events (
2266 process_id, sequence, event_type, payload_hash, idempotency_key,
2267 occurred_at_ms, event_json
2268 )
2269 VALUES ($1, $2, $3, $4, $5, $6, $7)",
2270 )
2271 .bind(process_id)
2272 .bind(sequence)
2273 .bind(event.event_type.as_str())
2274 .bind(&prepared.payload_hash)
2275 .bind(event.invocation.replay_key())
2276 .bind(prepared.occurred_at_ms as i64)
2277 .bind(serde_json::to_string(&event).map_err(process_decode_error)?)
2278 .execute(&mut *tx)
2279 .await
2280 .map_err(plugin_sqlx_error)?;
2281 if let Some(status) = prepared.status_update.clone() {
2282 record.status = status;
2283 }
2284 record.updated_at_ms = prepared.occurred_at_ms;
2285 save_process_tx(&mut tx, &record).await?;
2286 tx.commit().await.map_err(plugin_sqlx_error)?;
2287 self.notify.notify_waiters();
2288 Ok(ProcessEventAppendResult {
2289 event,
2290 wake_delivery: prepared.wake_delivery,
2291 })
2292 }
2293
2294 async fn events_after(
2295 &self,
2296 process_id: &str,
2297 after_sequence: u64,
2298 ) -> Result<Vec<ProcessEvent>, PluginError> {
2299 if load_process(&self.pool, process_id).await?.is_none() {
2300 return Err(PluginError::Session(format!(
2301 "unknown process `{process_id}`"
2302 )));
2303 }
2304 let rows = sqlx::query(
2305 "SELECT event_json FROM lash_process_events
2306 WHERE process_id = $1 AND sequence > $2
2307 ORDER BY sequence ASC",
2308 )
2309 .bind(process_id)
2310 .bind(after_sequence as i64)
2311 .fetch_all(&self.pool)
2312 .await
2313 .map_err(plugin_sqlx_error)?;
2314 let mut events = Vec::new();
2315 for row in rows {
2316 let json: String = row.get(0);
2317 events.push(serde_json::from_str(&json).map_err(process_decode_error)?);
2318 }
2319 Ok(events)
2320 }
2321
2322 async fn wake_events_after(
2323 &self,
2324 process_id: &str,
2325 after_sequence: u64,
2326 ) -> Result<Vec<ProcessEvent>, PluginError> {
2327 let rows = sqlx::query("SELECT sequence FROM lash_process_wake_acks WHERE process_id = $1")
2328 .bind(process_id)
2329 .fetch_all(&self.pool)
2330 .await
2331 .map_err(plugin_sqlx_error)?;
2332 let acked = rows
2333 .into_iter()
2334 .map(|row| row.get::<i64, _>(0) as u64)
2335 .collect::<HashSet<_>>();
2336 Ok(self
2337 .events_after(process_id, after_sequence)
2338 .await?
2339 .into_iter()
2340 .filter(|event| event.semantics.wake.is_some() && !acked.contains(&event.sequence))
2341 .collect())
2342 }
2343
2344 async fn wait_event_after(
2345 &self,
2346 process_id: &str,
2347 event_type: &str,
2348 after_sequence: u64,
2349 ) -> Result<ProcessEvent, PluginError> {
2350 loop {
2351 if let Some(event) = self
2352 .events_after(process_id, after_sequence)
2353 .await?
2354 .into_iter()
2355 .find(|event| event.event_type == event_type)
2356 {
2357 return Ok(event);
2358 }
2359 tokio::select! {
2360 _ = self.notify.notified() => {}
2361 _ = tokio::time::sleep(Duration::from_millis(50)) => {}
2362 }
2363 }
2364 }
2365
2366 async fn await_process(&self, process_id: &str) -> Result<ProcessAwaitOutput, PluginError> {
2367 loop {
2368 let record = load_process(&self.pool, process_id)
2369 .await?
2370 .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
2371 if let Some(await_output) = record.status.await_output() {
2372 return Ok(await_output.clone());
2373 }
2374 tokio::select! {
2375 _ = self.notify.notified() => {}
2376 _ = tokio::time::sleep(Duration::from_millis(50)) => {}
2377 }
2378 }
2379 }
2380
2381 async fn complete_process(
2382 &self,
2383 process_id: &str,
2384 await_output: ProcessAwaitOutput,
2385 ) -> Result<ProcessRecord, PluginError> {
2386 let event_type = match await_output.terminal_state() {
2387 lash_core::ProcessTerminalState::Completed => "process.completed",
2388 lash_core::ProcessTerminalState::Failed => "process.failed",
2389 lash_core::ProcessTerminalState::Cancelled => "process.cancelled",
2390 };
2391 self.append_event(
2392 process_id,
2393 ProcessEventAppendRequest::new(
2394 event_type,
2395 serde_json::json!({ "await_output": await_output }),
2396 )
2397 .with_replay_key(format!("process:{process_id}:terminal:{event_type}")),
2398 )
2399 .await?;
2400 load_process(&self.pool, process_id).await?.ok_or_else(|| {
2401 PluginError::Session(format!(
2402 "unknown process `{process_id}` after terminal event"
2403 ))
2404 })
2405 }
2406
2407 async fn get_process(&self, process_id: &str) -> Option<ProcessRecord> {
2408 load_process(&self.pool, process_id).await.ok().flatten()
2409 }
2410
2411 async fn ack_wake(&self, process_id: &str, sequence: u64) -> Result<(), PluginError> {
2412 if load_process(&self.pool, process_id).await?.is_none() {
2413 return Err(PluginError::Session(format!(
2414 "unknown process `{process_id}`"
2415 )));
2416 }
2417 sqlx::query(
2418 "INSERT INTO lash_process_wake_acks (process_id, sequence)
2419 VALUES ($1, $2)
2420 ON CONFLICT DO NOTHING",
2421 )
2422 .bind(process_id)
2423 .bind(sequence as i64)
2424 .execute(&self.pool)
2425 .await
2426 .map_err(plugin_sqlx_error)?;
2427 Ok(())
2428 }
2429
2430 async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, PluginError> {
2431 let rows = sqlx::query(
2432 "SELECT record_json FROM lash_processes
2433 WHERE status = 'running'
2434 ORDER BY process_id ASC",
2435 )
2436 .fetch_all(&self.pool)
2437 .await
2438 .map_err(plugin_sqlx_error)?;
2439 let mut records = Vec::new();
2440 for row in rows {
2441 let json: String = row.get(0);
2442 records.push(serde_json::from_str(&json).map_err(process_decode_error)?);
2443 }
2444 Ok(records)
2445 }
2446
2447 async fn claim_process_lease(
2448 &self,
2449 process_id: &str,
2450 owner_id: &str,
2451 lease_ttl_ms: u64,
2452 ) -> Result<ProcessLease, PluginError> {
2453 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2454 if load_process_tx(&mut tx, process_id).await?.is_none() {
2455 return Err(PluginError::Session(format!(
2456 "unknown process `{process_id}`"
2457 )));
2458 }
2459 let now = current_epoch_ms();
2460 let current = load_process_lease_tx(&mut tx, process_id).await?;
2461 if let Some(current) = current.as_ref()
2462 && current.expires_at_epoch_ms > now
2463 && current.owner_id != owner_id
2464 {
2465 return Err(process_lease_conflict(process_id, current));
2466 }
2467 let existing_fence: Option<i64> = sqlx::query_scalar(
2468 "SELECT lease_fencing_token FROM lash_process_leases WHERE process_id = $1 FOR UPDATE",
2469 )
2470 .bind(process_id)
2471 .fetch_optional(&mut *tx)
2472 .await
2473 .map_err(plugin_sqlx_error)?;
2474 let fencing_token = existing_fence.unwrap_or(0) as u64 + 1;
2475 let lease = ProcessLease {
2476 schema_version: PROCESS_LEASE_SCHEMA_VERSION,
2477 process_id: process_id.to_string(),
2478 owner_id: owner_id.to_string(),
2479 lease_token: format!(
2480 "{:x}",
2481 Sha256::digest(format!("{process_id}:{owner_id}:{now}:{fencing_token}").as_bytes())
2482 ),
2483 fencing_token,
2484 claimed_at_epoch_ms: now,
2485 expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
2486 };
2487 sqlx::query(
2488 "INSERT INTO lash_process_leases (
2489 process_id, lease_owner_id, lease_token, lease_fencing_token,
2490 lease_claimed_at_ms, lease_expires_at_ms
2491 )
2492 VALUES ($1, $2, $3, $4, $5, $6)
2493 ON CONFLICT (process_id) DO UPDATE SET
2494 lease_owner_id = EXCLUDED.lease_owner_id,
2495 lease_token = EXCLUDED.lease_token,
2496 lease_fencing_token = EXCLUDED.lease_fencing_token,
2497 lease_claimed_at_ms = EXCLUDED.lease_claimed_at_ms,
2498 lease_expires_at_ms = EXCLUDED.lease_expires_at_ms",
2499 )
2500 .bind(&lease.process_id)
2501 .bind(&lease.owner_id)
2502 .bind(&lease.lease_token)
2503 .bind(lease.fencing_token as i64)
2504 .bind(lease.claimed_at_epoch_ms as i64)
2505 .bind(lease.expires_at_epoch_ms as i64)
2506 .execute(&mut *tx)
2507 .await
2508 .map_err(plugin_sqlx_error)?;
2509 tx.commit().await.map_err(plugin_sqlx_error)?;
2510 Ok(lease)
2511 }
2512
2513 async fn renew_process_lease(
2514 &self,
2515 lease: &ProcessLease,
2516 lease_ttl_ms: u64,
2517 ) -> Result<ProcessLease, PluginError> {
2518 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2519 let now = current_epoch_ms();
2520 let current = load_process_lease_tx(&mut tx, &lease.process_id).await?;
2521 if !guard_lease(current.as_ref(), &lease.lease_token, now) {
2522 return Err(process_lease_expired(&lease.process_id));
2523 }
2524 let renewed = ProcessLease {
2525 expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
2526 ..lease.clone()
2527 };
2528 sqlx::query(
2529 "UPDATE lash_process_leases
2530 SET lease_expires_at_ms = $2
2531 WHERE process_id = $1 AND lease_token = $3",
2532 )
2533 .bind(&renewed.process_id)
2534 .bind(renewed.expires_at_epoch_ms as i64)
2535 .bind(&renewed.lease_token)
2536 .execute(&mut *tx)
2537 .await
2538 .map_err(plugin_sqlx_error)?;
2539 tx.commit().await.map_err(plugin_sqlx_error)?;
2540 Ok(renewed)
2541 }
2542
2543 async fn complete_process_lease(
2544 &self,
2545 completion: &ProcessLeaseCompletion,
2546 ) -> Result<(), PluginError> {
2547 sqlx::query(
2548 "UPDATE lash_process_leases
2549 SET lease_owner_id = NULL,
2550 lease_token = NULL,
2551 lease_claimed_at_ms = 0,
2552 lease_expires_at_ms = 0
2553 WHERE process_id = $1 AND lease_token = $2",
2554 )
2555 .bind(&completion.process_id)
2556 .bind(&completion.lease_token)
2557 .execute(&self.pool)
2558 .await
2559 .map_err(plugin_sqlx_error)?;
2560 Ok(())
2561 }
2562}
2563
2564#[async_trait::async_trait]
2565impl HostEventStore for PostgresHostEventStore {
2566 fn durability_tier(&self) -> DurabilityTier {
2567 DurabilityTier::Durable
2568 }
2569
2570 async fn register_subscription(
2571 &self,
2572 draft: TriggerSubscriptionDraft,
2573 ) -> Result<TriggerSubscriptionRecord, PluginError> {
2574 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2575 let seq: i64 = sqlx::query_scalar("SELECT nextval('lash_host_event_subscription_seq')")
2576 .fetch_one(&mut *tx)
2577 .await
2578 .map_err(plugin_sqlx_error)?;
2579 let handle = format!("trigger:{seq}");
2580 let subscription_id = format!("subscription:{seq}");
2581 let now = current_epoch_ms();
2582 let record = TriggerSubscriptionRecord {
2583 subscription_id: subscription_id.clone(),
2584 session_id: draft.session_id,
2585 handle,
2586 name: draft.name,
2587 source_type: draft.source_type,
2588 source_key: draft.source_key,
2589 source: draft.source,
2590 event_ty: draft.event_ty,
2591 module_ref: draft.module_ref,
2592 required_surface_ref: draft.required_surface_ref,
2593 process_ref: draft.process_ref,
2594 process_name: draft.process_name,
2595 input_template: draft.input_template,
2596 enabled: true,
2597 created_at_ms: now,
2598 updated_at_ms: now,
2599 };
2600 sqlx::query(
2601 "INSERT INTO lash_host_event_trigger_subscriptions (
2602 subscription_id, session_id, handle, source_type, source_key,
2603 enabled, created_at_ms, updated_at_ms, record_json
2604 )
2605 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
2606 )
2607 .bind(&record.subscription_id)
2608 .bind(&record.session_id)
2609 .bind(&record.handle)
2610 .bind(&record.source_type)
2611 .bind(&record.source_key)
2612 .bind(record.enabled)
2613 .bind(record.created_at_ms as i64)
2614 .bind(record.updated_at_ms as i64)
2615 .bind(serde_json::to_string(&record).map_err(process_decode_error)?)
2616 .execute(&mut *tx)
2617 .await
2618 .map_err(plugin_sqlx_error)?;
2619 tx.commit().await.map_err(plugin_sqlx_error)?;
2620 Ok(record)
2621 }
2622
2623 async fn list_subscriptions(
2624 &self,
2625 filter: TriggerSubscriptionFilter,
2626 ) -> Result<Vec<TriggerSubscriptionRecord>, PluginError> {
2627 let rows = sqlx::query(
2628 "SELECT record_json FROM lash_host_event_trigger_subscriptions
2629 ORDER BY session_id ASC, handle ASC",
2630 )
2631 .fetch_all(&self.pool)
2632 .await
2633 .map_err(plugin_sqlx_error)?;
2634 let mut records = Vec::new();
2635 for row in rows {
2636 let json: String = row.get(0);
2637 let record: TriggerSubscriptionRecord =
2638 serde_json::from_str(&json).map_err(process_decode_error)?;
2639 if filter.matches(&record) {
2640 records.push(record);
2641 }
2642 }
2643 Ok(records)
2644 }
2645
2646 async fn cancel_subscription(
2647 &self,
2648 session_id: &str,
2649 handle: &str,
2650 ) -> Result<bool, PluginError> {
2651 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2652 let json: Option<String> = sqlx::query_scalar(
2653 "SELECT record_json FROM lash_host_event_trigger_subscriptions
2654 WHERE session_id = $1 AND handle = $2
2655 FOR UPDATE",
2656 )
2657 .bind(session_id)
2658 .bind(handle)
2659 .fetch_optional(&mut *tx)
2660 .await
2661 .map_err(plugin_sqlx_error)?;
2662 let Some(json) = json else {
2663 tx.commit().await.map_err(plugin_sqlx_error)?;
2664 return Ok(false);
2665 };
2666 let mut record: TriggerSubscriptionRecord =
2667 serde_json::from_str(&json).map_err(process_decode_error)?;
2668 let changed = record.enabled;
2669 record.enabled = false;
2670 record.updated_at_ms = current_epoch_ms();
2671 sqlx::query(
2672 "UPDATE lash_host_event_trigger_subscriptions
2673 SET enabled = $3, updated_at_ms = $4, record_json = $5
2674 WHERE session_id = $1 AND handle = $2",
2675 )
2676 .bind(session_id)
2677 .bind(handle)
2678 .bind(record.enabled)
2679 .bind(record.updated_at_ms as i64)
2680 .bind(serde_json::to_string(&record).map_err(process_decode_error)?)
2681 .execute(&mut *tx)
2682 .await
2683 .map_err(plugin_sqlx_error)?;
2684 tx.commit().await.map_err(plugin_sqlx_error)?;
2685 Ok(changed)
2686 }
2687
2688 async fn record_occurrence(
2689 &self,
2690 request: HostEventOccurrenceRequest,
2691 ) -> Result<HostEventOccurrenceRecord, PluginError> {
2692 lash_core::validate_host_event_occurrence_request(&request)?;
2693 let request_hash = lash_core::host_event_occurrence_request_hash(&request)?;
2694 let occurrence_id = lash_core::deterministic_occurrence_id(&request)?;
2695 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2696 let existing = sqlx::query(
2697 "SELECT request_hash, record_json
2698 FROM lash_host_event_occurrences
2699 WHERE idempotency_key = $1
2700 FOR UPDATE",
2701 )
2702 .bind(&request.idempotency_key)
2703 .fetch_optional(&mut *tx)
2704 .await
2705 .map_err(plugin_sqlx_error)?;
2706 if let Some(row) = existing {
2707 let existing_hash: String = row.get(0);
2708 let existing_json: String = row.get(1);
2709 if existing_hash != request_hash {
2710 return Err(PluginError::Session(format!(
2711 "host event occurrence idempotency conflict for `{}`",
2712 request.idempotency_key
2713 )));
2714 }
2715 let record = serde_json::from_str(&existing_json).map_err(process_decode_error)?;
2716 tx.commit().await.map_err(plugin_sqlx_error)?;
2717 return Ok(record);
2718 }
2719 let record = HostEventOccurrenceRecord {
2720 occurrence_id: occurrence_id.clone(),
2721 source_type: request.source_type,
2722 source_key: request.source_key,
2723 payload: request.payload,
2724 idempotency_key: request.idempotency_key.clone(),
2725 source: request.source,
2726 occurred_at_ms: current_epoch_ms(),
2727 };
2728 sqlx::query(
2729 "INSERT INTO lash_host_event_occurrences (
2730 occurrence_id, idempotency_key, request_hash, source_type, source_key,
2731 occurred_at_ms, record_json
2732 )
2733 VALUES ($1, $2, $3, $4, $5, $6, $7)",
2734 )
2735 .bind(&record.occurrence_id)
2736 .bind(&record.idempotency_key)
2737 .bind(&request_hash)
2738 .bind(&record.source_type)
2739 .bind(&record.source_key)
2740 .bind(record.occurred_at_ms as i64)
2741 .bind(serde_json::to_string(&record).map_err(process_decode_error)?)
2742 .execute(&mut *tx)
2743 .await
2744 .map_err(plugin_sqlx_error)?;
2745 tx.commit().await.map_err(plugin_sqlx_error)?;
2746 Ok(record)
2747 }
2748
2749 async fn reserve_matching_deliveries(
2750 &self,
2751 occurrence_id: &str,
2752 ) -> Result<Vec<TriggerDeliveryReservation>, PluginError> {
2753 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2754 let occurrence_json: Option<String> = sqlx::query_scalar(
2755 "SELECT record_json FROM lash_host_event_occurrences WHERE occurrence_id = $1",
2756 )
2757 .bind(occurrence_id)
2758 .fetch_optional(&mut *tx)
2759 .await
2760 .map_err(plugin_sqlx_error)?;
2761 let Some(occurrence_json) = occurrence_json else {
2762 return Err(PluginError::Session(format!(
2763 "unknown host event occurrence `{occurrence_id}`"
2764 )));
2765 };
2766 let occurrence: HostEventOccurrenceRecord =
2767 serde_json::from_str(&occurrence_json).map_err(process_decode_error)?;
2768 let rows = sqlx::query(
2769 "SELECT record_json FROM lash_host_event_trigger_subscriptions
2770 WHERE enabled = TRUE AND source_type = $1 AND source_key = $2
2771 ORDER BY session_id ASC, handle ASC",
2772 )
2773 .bind(&occurrence.source_type)
2774 .bind(&occurrence.source_key)
2775 .fetch_all(&mut *tx)
2776 .await
2777 .map_err(plugin_sqlx_error)?;
2778 let mut deliveries = Vec::new();
2779 for row in rows {
2780 let json: String = row.get(0);
2781 let subscription: TriggerSubscriptionRecord =
2782 serde_json::from_str(&json).map_err(process_decode_error)?;
2783 let process_id = lash_core::deterministic_delivery_process_id(
2784 &occurrence.occurrence_id,
2785 &subscription.subscription_id,
2786 )?;
2787 let inserted = sqlx::query(
2788 "INSERT INTO lash_host_event_deliveries (
2789 occurrence_id, subscription_id, process_id, created_at_ms
2790 )
2791 VALUES ($1, $2, $3, $4)
2792 ON CONFLICT DO NOTHING",
2793 )
2794 .bind(&occurrence.occurrence_id)
2795 .bind(&subscription.subscription_id)
2796 .bind(&process_id)
2797 .bind(current_epoch_ms() as i64)
2798 .execute(&mut *tx)
2799 .await
2800 .map_err(plugin_sqlx_error)?
2801 .rows_affected();
2802 if inserted == 0 {
2803 continue;
2804 }
2805 deliveries.push(TriggerDeliveryReservation {
2806 occurrence: occurrence.clone(),
2807 subscription,
2808 process_id,
2809 });
2810 }
2811 tx.commit().await.map_err(plugin_sqlx_error)?;
2812 Ok(deliveries)
2813 }
2814}
2815
2816#[async_trait::async_trait]
2817impl lashlang::LashlangArtifactStore for PostgresLashlangArtifactStore {
2818 fn durability_tier(&self) -> DurabilityTier {
2819 DurabilityTier::Durable
2820 }
2821
2822 async fn put_module_artifact(
2823 &self,
2824 artifact: &lashlang::ModuleArtifact,
2825 ) -> Result<(), lashlang::ArtifactStoreError> {
2826 let bytes = artifact
2827 .to_store_bytes()
2828 .map_err(lashlang::ArtifactStoreError::from)?;
2829 sqlx::query(
2830 "INSERT INTO lash_lashlang_artifacts (module_ref, artifact_bytes)
2831 VALUES ($1, $2)
2832 ON CONFLICT (module_ref) DO UPDATE SET artifact_bytes = EXCLUDED.artifact_bytes",
2833 )
2834 .bind(artifact.module_ref.as_str())
2835 .bind(bytes)
2836 .execute(&self.pool)
2837 .await
2838 .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?;
2839 Ok(())
2840 }
2841
2842 async fn get_module_artifact(
2843 &self,
2844 module_ref: &lashlang::ModuleRef,
2845 ) -> Result<Option<Arc<lashlang::ModuleArtifact>>, lashlang::ArtifactStoreError> {
2846 let bytes: Option<Vec<u8>> = sqlx::query_scalar(
2847 "SELECT artifact_bytes FROM lash_lashlang_artifacts WHERE module_ref = $1",
2848 )
2849 .bind(module_ref.as_str())
2850 .fetch_optional(&self.pool)
2851 .await
2852 .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?;
2853 bytes
2854 .map(|bytes| {
2855 lashlang::ModuleArtifact::from_store_bytes(&bytes)
2856 .map(Arc::new)
2857 .map_err(lashlang::ArtifactStoreError::from)
2858 })
2859 .transpose()
2860 }
2861}