1use std::collections::HashSet;
8use std::sync::{Arc, OnceLock};
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11use lash_core::runtime::{
12 ProcessHandleGrantEntry, QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim,
13 QueuedWorkClaimBoundary, QueuedWorkCompletion, QueuedWorkItem,
14};
15use lash_core::store::queued_work::{
16 ClaimCandidate, QueuedWorkClaimLease, claim_scan_limit, derive_batch_id, renewed_claim,
17 select_claim_prefix,
18};
19use lash_core::store::{
20 GraphCommitDelta, HydratedSessionCheckpoint, PersistedSessionRead, RuntimeCommit,
21 RuntimeCommitResult, SessionCheckpoint, SessionHeadMeta,
22};
23use lash_core::{
24 AttachmentId, AttachmentIntent, AttachmentManifest, AttachmentManifestEntry, BlobRef,
25 DeliveryPolicy, DurabilityTier, GcReport, MergeKey, ProcessAwaitOutput, ProcessEvent,
26 ProcessEventAppendRequest, ProcessEventAppendResult, ProcessExternalRef,
27 ProcessHandleDescriptor, ProcessHandleGrant, ProcessLease, ProcessLeaseCompletion,
28 ProcessRecord, ProcessRegistration, ProcessRegistry, RuntimePersistence, SessionMeta,
29 SessionNodeRecord, SessionReadScope, SessionScope, SessionStoreCreateRequest,
30 SessionStoreFactory, SlotPolicy, StoreError, TokenLedgerEntry, VacuumReport,
31};
32use lash_core::{
33 HostEventOccurrenceRecord, HostEventOccurrenceRequest, HostEventStore, PluginError,
34 TriggerDeliveryReservation, TriggerSubscriptionDraft, TriggerSubscriptionFilter,
35 TriggerSubscriptionRecord,
36};
37use sha2::{Digest, Sha256};
38use sqlx::postgres::{PgPool, PgPoolOptions, PgRow};
39use sqlx::{Executor, Row};
40
41const SCHEMA_COMPONENT: &str = "lash-postgres-store";
42const SCHEMA_VERSION: i32 = 1;
43const PROCESS_LEASE_SCHEMA_VERSION: u32 = lash_core::PROCESS_LEASE_SCHEMA_VERSION;
44
45#[derive(Clone)]
46pub struct PostgresStorage {
47 pool: PgPool,
48}
49
50#[derive(Clone)]
51pub struct PostgresSessionStoreFactory {
52 pool: PgPool,
53}
54
55#[derive(Clone)]
56pub struct PostgresSessionStore {
57 pool: PgPool,
58 session_id: Option<String>,
60 bound_session: Arc<OnceLock<String>>,
66}
67
68#[derive(Clone)]
69pub struct PostgresProcessRegistry {
70 pool: PgPool,
71 notify: Arc<tokio::sync::Notify>,
72}
73
74#[derive(Clone)]
75pub struct PostgresHostEventStore {
76 pool: PgPool,
77}
78
79#[derive(Clone)]
80pub struct PostgresLashlangArtifactStore {
81 pool: PgPool,
82}
83
84#[derive(Clone, Debug)]
93pub struct PostgresStoreConfig {
94 pub max_connections: u32,
96 pub min_connections: u32,
98 pub acquire_timeout: Duration,
100 pub idle_timeout: Option<Duration>,
102 pub max_lifetime: Option<Duration>,
104 pub lock_timeout: Option<Duration>,
106 pub statement_timeout: Option<Duration>,
109}
110
111impl Default for PostgresStoreConfig {
112 fn default() -> Self {
113 Self {
114 max_connections: 16,
115 min_connections: 0,
116 acquire_timeout: Duration::from_secs(30),
117 idle_timeout: Some(Duration::from_secs(600)),
118 max_lifetime: Some(Duration::from_secs(1800)),
119 lock_timeout: Some(Duration::from_secs(10)),
120 statement_timeout: Some(Duration::from_secs(30)),
121 }
122 }
123}
124
125impl PostgresStorage {
126 pub async fn connect(database_url: &str) -> Result<Self, StoreError> {
128 Self::connect_with(database_url, PostgresStoreConfig::default()).await
129 }
130
131 pub async fn connect_with(
133 database_url: &str,
134 config: PostgresStoreConfig,
135 ) -> Result<Self, StoreError> {
136 let lock_ms = config.lock_timeout.map(|d| d.as_millis().max(1) as u64);
137 let statement_ms = config
138 .statement_timeout
139 .map(|d| d.as_millis().max(1) as u64);
140 let mut options = PgPoolOptions::new()
141 .max_connections(config.max_connections)
142 .min_connections(config.min_connections)
143 .acquire_timeout(config.acquire_timeout);
144 if let Some(timeout) = config.idle_timeout {
145 options = options.idle_timeout(timeout);
146 }
147 if let Some(timeout) = config.max_lifetime {
148 options = options.max_lifetime(timeout);
149 }
150 let pool = options
151 .after_connect(move |conn, _meta| {
152 Box::pin(async move {
153 if let Some(ms) = lock_ms {
154 conn.execute(format!("SET lock_timeout = {ms}").as_str())
155 .await?;
156 }
157 if let Some(ms) = statement_ms {
158 conn.execute(format!("SET statement_timeout = {ms}").as_str())
159 .await?;
160 }
161 Ok(())
162 })
163 })
164 .connect(database_url)
165 .await
166 .map_err(store_sqlx_error)?;
167 ensure_schema(&pool).await?;
168 Ok(Self { pool })
169 }
170
171 pub fn from_pool(pool: PgPool) -> Self {
172 Self { pool }
173 }
174
175 pub fn pool(&self) -> &PgPool {
176 &self.pool
177 }
178
179 pub fn session_store_factory(&self) -> PostgresSessionStoreFactory {
180 PostgresSessionStoreFactory {
181 pool: self.pool.clone(),
182 }
183 }
184
185 pub fn session_store(&self, session_id: impl Into<String>) -> PostgresSessionStore {
186 PostgresSessionStore {
187 pool: self.pool.clone(),
188 session_id: Some(session_id.into()),
189 bound_session: Arc::new(OnceLock::new()),
190 }
191 }
192
193 pub fn unbound_session_store(&self) -> PostgresSessionStore {
194 PostgresSessionStore {
195 pool: self.pool.clone(),
196 session_id: None,
197 bound_session: Arc::new(OnceLock::new()),
198 }
199 }
200
201 pub fn process_registry(&self) -> PostgresProcessRegistry {
202 PostgresProcessRegistry {
203 pool: self.pool.clone(),
204 notify: Arc::new(tokio::sync::Notify::new()),
205 }
206 }
207
208 pub fn host_event_store(&self) -> PostgresHostEventStore {
209 PostgresHostEventStore {
210 pool: self.pool.clone(),
211 }
212 }
213
214 pub fn lashlang_artifact_store(&self) -> PostgresLashlangArtifactStore {
215 PostgresLashlangArtifactStore {
216 pool: self.pool.clone(),
217 }
218 }
219}
220
221impl PostgresSessionStoreFactory {
222 pub fn new(storage: &PostgresStorage) -> Self {
223 storage.session_store_factory()
224 }
225}
226
227impl PostgresSessionStore {
228 pub fn unbound(storage: &PostgresStorage) -> Self {
229 storage.unbound_session_store()
230 }
231
232 async fn selected_session_id(&self) -> Result<Option<String>, StoreError> {
233 if let Some(session_id) = &self.session_id {
234 return Ok(Some(session_id.clone()));
235 }
236 sqlx::query_scalar("SELECT session_id FROM lash_sessions ORDER BY session_id ASC LIMIT 1")
237 .fetch_optional(&self.pool)
238 .await
239 .map_err(store_sqlx_error)
240 }
241}
242
243async fn ensure_schema(pool: &PgPool) -> Result<(), StoreError> {
244 let mut tx = pool.begin().await.map_err(store_sqlx_error)?;
245 tx.execute("SELECT pg_advisory_xact_lock(715421, 907001)")
246 .await
247 .map_err(store_sqlx_error)?;
248 tx.execute(
249 r#"
250 CREATE TABLE IF NOT EXISTS lash_schema_versions (
251 component TEXT PRIMARY KEY,
252 version INTEGER NOT NULL
253 );
254
255 CREATE TABLE IF NOT EXISTS lash_blobs (
256 hash TEXT PRIMARY KEY,
257 content BYTEA NOT NULL
258 );
259
260 CREATE TABLE IF NOT EXISTS lash_sessions (
261 session_id TEXT PRIMARY KEY,
262 head_revision BIGINT NOT NULL DEFAULT 0,
263 head_json TEXT NOT NULL,
264 checkpoint_ref TEXT
265 );
266
267 CREATE TABLE IF NOT EXISTS lash_graph_nodes (
268 session_id TEXT NOT NULL,
269 seq BIGSERIAL,
270 node_id TEXT NOT NULL,
271 node_json TEXT NOT NULL,
272 tombstoned BOOLEAN NOT NULL DEFAULT FALSE,
273 PRIMARY KEY (session_id, node_id)
274 );
275 CREATE INDEX IF NOT EXISTS idx_lash_graph_nodes_seq
276 ON lash_graph_nodes(session_id, seq);
277
278 CREATE TABLE IF NOT EXISTS lash_usage_deltas (
279 seq BIGSERIAL PRIMARY KEY,
280 session_id TEXT NOT NULL,
281 entry_json TEXT NOT NULL
282 );
283
284 CREATE TABLE IF NOT EXISTS lash_session_meta (
285 session_id TEXT PRIMARY KEY,
286 meta_json TEXT NOT NULL
287 );
288
289 CREATE TABLE IF NOT EXISTS lash_runtime_turn_commits (
290 session_id TEXT NOT NULL,
291 turn_id TEXT NOT NULL,
292 turn_commit_hash TEXT NOT NULL,
293 result_json TEXT NOT NULL,
294 committed_at_ms BIGINT NOT NULL,
295 PRIMARY KEY (session_id, turn_id)
296 );
297
298 CREATE TABLE IF NOT EXISTS lash_queued_work_batches (
299 enqueue_seq BIGSERIAL PRIMARY KEY,
300 batch_id TEXT NOT NULL UNIQUE,
301 session_id TEXT NOT NULL,
302 source_key TEXT,
303 delivery_policy TEXT NOT NULL,
304 slot_policy TEXT NOT NULL,
305 merge_key_json TEXT NOT NULL,
306 available_at_ms BIGINT NOT NULL,
307 enqueued_at_ms BIGINT NOT NULL,
308 claim_id TEXT,
309 claim_owner_id TEXT,
310 claim_token TEXT,
311 claim_fencing_token BIGINT NOT NULL DEFAULT 0,
312 claim_claimed_at_ms BIGINT NOT NULL DEFAULT 0,
313 claim_expires_at_ms BIGINT NOT NULL DEFAULT 0,
314 UNIQUE (session_id, source_key)
315 );
316 CREATE INDEX IF NOT EXISTS idx_lash_queued_work_ready
317 ON lash_queued_work_batches(session_id, available_at_ms, enqueue_seq);
318
319 CREATE TABLE IF NOT EXISTS lash_queued_work_items (
320 batch_id TEXT NOT NULL REFERENCES lash_queued_work_batches(batch_id) ON DELETE CASCADE,
321 item_index INTEGER NOT NULL,
322 item_id TEXT NOT NULL,
323 payload_json TEXT NOT NULL,
324 PRIMARY KEY (batch_id, item_index)
325 );
326
327 CREATE TABLE IF NOT EXISTS lash_attachment_manifest (
328 attachment_id TEXT PRIMARY KEY,
329 session_id TEXT NOT NULL,
330 canonical_uri TEXT NOT NULL,
331 intent_at_ms BIGINT NOT NULL,
332 committed_at_ms BIGINT
333 );
334 CREATE INDEX IF NOT EXISTS idx_lash_attachment_manifest_uncommitted
335 ON lash_attachment_manifest(committed_at_ms)
336 WHERE committed_at_ms IS NULL;
337
338 CREATE TABLE IF NOT EXISTS lash_processes (
339 process_id TEXT PRIMARY KEY,
340 registration_hash TEXT NOT NULL,
341 owner_scope_id TEXT NOT NULL,
342 host_profile_id TEXT NOT NULL,
343 created_at_ms BIGINT NOT NULL,
344 updated_at_ms BIGINT NOT NULL,
345 status TEXT NOT NULL,
346 record_json TEXT NOT NULL
347 );
348 CREATE INDEX IF NOT EXISTS idx_lash_processes_status
349 ON lash_processes(status);
350
351 CREATE TABLE IF NOT EXISTS lash_process_events (
352 process_id TEXT NOT NULL REFERENCES lash_processes(process_id) ON DELETE CASCADE,
353 sequence BIGINT NOT NULL,
354 event_type TEXT NOT NULL,
355 payload_hash TEXT NOT NULL,
356 idempotency_key TEXT,
357 occurred_at_ms BIGINT NOT NULL,
358 event_json TEXT NOT NULL,
359 PRIMARY KEY (process_id, sequence)
360 );
361 CREATE UNIQUE INDEX IF NOT EXISTS idx_lash_process_events_key
362 ON lash_process_events(process_id, idempotency_key)
363 WHERE idempotency_key IS NOT NULL;
364
365 CREATE TABLE IF NOT EXISTS lash_process_wake_acks (
366 process_id TEXT NOT NULL REFERENCES lash_processes(process_id) ON DELETE CASCADE,
367 sequence BIGINT NOT NULL,
368 PRIMARY KEY (process_id, sequence)
369 );
370
371 CREATE TABLE IF NOT EXISTS lash_process_handle_grants (
372 session_id TEXT NOT NULL,
373 scope_id TEXT NOT NULL,
374 process_id TEXT NOT NULL REFERENCES lash_processes(process_id) ON DELETE CASCADE,
375 descriptor_json TEXT NOT NULL,
376 PRIMARY KEY (scope_id, process_id)
377 );
378 CREATE INDEX IF NOT EXISTS idx_lash_process_handle_grants_session
379 ON lash_process_handle_grants(session_id);
380 CREATE INDEX IF NOT EXISTS idx_lash_process_handle_grants_process
381 ON lash_process_handle_grants(process_id);
382
383 CREATE TABLE IF NOT EXISTS lash_process_leases (
384 process_id TEXT PRIMARY KEY REFERENCES lash_processes(process_id) ON DELETE CASCADE,
385 lease_owner_id TEXT,
386 lease_token TEXT,
387 lease_fencing_token BIGINT NOT NULL DEFAULT 0,
388 lease_claimed_at_ms BIGINT NOT NULL DEFAULT 0,
389 lease_expires_at_ms BIGINT NOT NULL DEFAULT 0
390 );
391
392 CREATE SEQUENCE IF NOT EXISTS lash_host_event_subscription_seq;
393 CREATE TABLE IF NOT EXISTS lash_host_event_trigger_subscriptions (
394 subscription_id TEXT PRIMARY KEY,
395 registrant_scope_id TEXT NOT NULL,
396 handle TEXT NOT NULL,
397 source_type TEXT NOT NULL,
398 source_key TEXT NOT NULL,
399 enabled BOOLEAN NOT NULL,
400 created_at_ms BIGINT NOT NULL,
401 updated_at_ms BIGINT NOT NULL,
402 record_json TEXT NOT NULL,
403 UNIQUE(registrant_scope_id, handle)
404 );
405 CREATE INDEX IF NOT EXISTS idx_lash_host_event_subscriptions_registrant
406 ON lash_host_event_trigger_subscriptions(registrant_scope_id, handle);
407 CREATE INDEX IF NOT EXISTS idx_lash_host_event_subscriptions_source
408 ON lash_host_event_trigger_subscriptions(source_type, source_key, enabled);
409
410 CREATE TABLE IF NOT EXISTS lash_host_event_occurrences (
411 occurrence_id TEXT PRIMARY KEY,
412 idempotency_key TEXT NOT NULL UNIQUE,
413 request_hash TEXT NOT NULL,
414 source_type TEXT NOT NULL,
415 source_key TEXT NOT NULL,
416 occurred_at_ms BIGINT NOT NULL,
417 record_json TEXT NOT NULL
418 );
419
420 CREATE TABLE IF NOT EXISTS lash_host_event_deliveries (
421 occurrence_id TEXT NOT NULL REFERENCES lash_host_event_occurrences(occurrence_id) ON DELETE CASCADE,
422 subscription_id TEXT NOT NULL REFERENCES lash_host_event_trigger_subscriptions(subscription_id) ON DELETE CASCADE,
423 process_id TEXT NOT NULL,
424 created_at_ms BIGINT NOT NULL,
425 PRIMARY KEY (occurrence_id, subscription_id)
426 );
427
428 CREATE TABLE IF NOT EXISTS lash_lashlang_artifacts (
429 module_ref TEXT PRIMARY KEY,
430 artifact_bytes BYTEA NOT NULL
431 );
432 "#,
433 )
434 .await
435 .map_err(store_sqlx_error)?;
436
437 let existing: Option<i32> =
438 sqlx::query_scalar("SELECT version FROM lash_schema_versions WHERE component = $1")
439 .bind(SCHEMA_COMPONENT)
440 .fetch_optional(&mut *tx)
441 .await
442 .map_err(store_sqlx_error)?;
443 match existing {
444 Some(version) if version == SCHEMA_VERSION => {}
445 Some(version) => {
446 return Err(StoreError::Backend(format!(
447 "Postgres schema component `{SCHEMA_COMPONENT}` has version {version}, expected {SCHEMA_VERSION}"
448 )));
449 }
450 None => {
451 sqlx::query("INSERT INTO lash_schema_versions (component, version) VALUES ($1, $2)")
452 .bind(SCHEMA_COMPONENT)
453 .bind(SCHEMA_VERSION)
454 .execute(&mut *tx)
455 .await
456 .map_err(store_sqlx_error)?;
457 }
458 }
459 tx.commit().await.map_err(store_sqlx_error)
460}
461
462fn current_epoch_ms() -> u64 {
463 SystemTime::now()
464 .duration_since(UNIX_EPOCH)
465 .unwrap_or_default()
466 .as_millis() as u64
467}
468
469fn current_timestamp_string() -> String {
470 let now = SystemTime::now()
471 .duration_since(UNIX_EPOCH)
472 .unwrap_or_default();
473 format!("unix:{}", now.as_secs())
474}
475
476fn store_sqlx_error(err: sqlx::Error) -> StoreError {
477 StoreError::Backend(err.to_string())
478}
479
480fn is_contention_error(err: &sqlx::Error) -> bool {
485 matches!(
486 err.as_database_error().and_then(|db| db.code()).as_deref(),
487 Some("40001" | "40P01" | "55P03")
488 )
489}
490
491fn plugin_sqlx_error(err: sqlx::Error) -> PluginError {
492 PluginError::Session(err.to_string())
493}
494
495fn process_decode_error(err: serde_json::Error) -> PluginError {
496 PluginError::Session(format!("failed to decode process registry row: {err}"))
497}
498
499fn store_decode_json<T: serde::de::DeserializeOwned>(
500 json: &str,
501 what: &str,
502) -> Result<T, StoreError> {
503 serde_json::from_str(json)
504 .map_err(|err| StoreError::Backend(format!("failed to decode {what}: {err}")))
505}
506
507fn encode_json<T: serde::Serialize>(value: &T) -> String {
508 serde_json::to_string(value).expect("persisted state should serialize")
509}
510
511fn encode_msgpack<T: serde::Serialize>(value: &T) -> Vec<u8> {
512 let mut buf = Vec::with_capacity(1024);
513 rmp_serde::encode::write_named(&mut buf, value).expect("value should serialize");
514 buf
515}
516
517fn decode_msgpack<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Option<T> {
518 rmp_serde::from_slice(bytes).ok()
519}
520
521fn block_on_detached<T: Send + 'static>(
522 future: impl std::future::Future<Output = T> + Send + 'static,
523) -> T {
524 std::thread::spawn(move || {
525 tokio::runtime::Builder::new_current_thread()
526 .enable_all()
527 .build()
528 .expect("postgres manifest runtime")
529 .block_on(future)
530 })
531 .join()
532 .expect("postgres manifest thread")
533}
534
535fn merge_token_ledger_entries(entries: Vec<TokenLedgerEntry>) -> Vec<TokenLedgerEntry> {
536 let mut merged = Vec::<TokenLedgerEntry>::new();
537 for entry in entries {
538 if entry.usage.total() == 0 {
539 continue;
540 }
541 if let Some(existing) = merged
542 .iter_mut()
543 .find(|existing| existing.source == entry.source && existing.model == entry.model)
544 {
545 existing.usage.input_tokens += entry.usage.input_tokens;
546 existing.usage.output_tokens += entry.usage.output_tokens;
547 existing.usage.cached_input_tokens += entry.usage.cached_input_tokens;
548 existing.usage.reasoning_tokens += entry.usage.reasoning_tokens;
549 } else {
550 merged.push(entry);
551 }
552 }
553 merged
554}
555
556#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
557struct SessionCheckpointEnvelope {
558 manifest: SessionCheckpoint,
559 tool_state: Option<lash_core::ToolState>,
560 plugin_snapshot: Option<lash_core::PluginSessionSnapshot>,
561 execution_state: Option<Vec<u8>>,
562}
563
564async fn put_blob_tx(
565 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
566 content: &[u8],
567) -> Result<BlobRef, StoreError> {
568 let hash = format!("{:x}", Sha256::digest(content));
569 sqlx::query(
570 "INSERT INTO lash_blobs (hash, content)
571 VALUES ($1, $2)
572 ON CONFLICT (hash) DO NOTHING",
573 )
574 .bind(&hash)
575 .bind(content)
576 .execute(&mut **tx)
577 .await
578 .map_err(store_sqlx_error)?;
579 Ok(BlobRef(hash))
580}
581
582async fn get_blob(pool: &PgPool, blob_ref: &BlobRef) -> Option<Vec<u8>> {
583 sqlx::query_scalar::<_, Vec<u8>>("SELECT content FROM lash_blobs WHERE hash = $1")
584 .bind(blob_ref.as_str())
585 .fetch_optional(pool)
586 .await
587 .ok()
588 .flatten()
589}
590
591async fn put_checkpoint_tx(
592 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
593 checkpoint: &HydratedSessionCheckpoint,
594) -> Result<(BlobRef, SessionCheckpoint), StoreError> {
595 let manifest = SessionCheckpoint {
596 turn_state: checkpoint.turn_state.clone(),
597 tool_state_ref: checkpoint.tool_state_ref.clone(),
598 plugin_snapshot_ref: checkpoint.plugin_snapshot_ref.clone(),
599 plugin_snapshot_revision: checkpoint.plugin_snapshot_revision,
600 execution_state_ref: checkpoint.execution_state_ref.clone(),
601 };
602 let envelope = SessionCheckpointEnvelope {
603 manifest: manifest.clone(),
604 tool_state: checkpoint.tool_state.clone(),
605 plugin_snapshot: checkpoint.plugin_snapshot.clone(),
606 execution_state: checkpoint.execution_state.clone(),
607 };
608 let bytes = encode_msgpack(&envelope);
609 let checkpoint_ref = put_blob_tx(tx, &bytes).await?;
610 Ok((checkpoint_ref, manifest))
611}
612
613async fn get_checkpoint(pool: &PgPool, blob_ref: &BlobRef) -> Option<HydratedSessionCheckpoint> {
614 let bytes = get_blob(pool, blob_ref).await?;
615 let envelope: SessionCheckpointEnvelope = decode_msgpack(&bytes)?;
616 Some(HydratedSessionCheckpoint {
617 turn_state: envelope.manifest.turn_state,
618 tool_state_ref: envelope.manifest.tool_state_ref,
619 tool_state: envelope.tool_state,
620 plugin_snapshot_ref: envelope.manifest.plugin_snapshot_ref,
621 plugin_snapshot: envelope.plugin_snapshot,
622 plugin_snapshot_revision: envelope.manifest.plugin_snapshot_revision,
623 execution_state_ref: envelope.manifest.execution_state_ref,
624 execution_state: envelope.execution_state,
625 })
626}
627
628async fn load_session_head_meta_tx(
629 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
630 session_id: &str,
631 for_update: bool,
632) -> Result<Option<SessionHeadMeta>, StoreError> {
633 let sql = if for_update {
634 "SELECT head_json, head_revision FROM lash_sessions WHERE session_id = $1 FOR UPDATE"
635 } else {
636 "SELECT head_json, head_revision FROM lash_sessions WHERE session_id = $1"
637 };
638 let row = sqlx::query(sql)
639 .bind(session_id)
640 .fetch_optional(&mut **tx)
641 .await
642 .map_err(store_sqlx_error)?;
643 let Some(row) = row else {
644 return Ok(None);
645 };
646 let head_json: String = row.get(0);
647 let head_revision: i64 = row.get(1);
648 let mut meta: SessionHeadMeta = store_decode_json(&head_json, "session head")?;
649 meta.head_revision = head_revision as u64;
650 Ok(Some(meta))
651}
652
653async fn load_usage_deltas(pool: &PgPool, session_id: &str) -> Vec<TokenLedgerEntry> {
654 let rows = sqlx::query(
655 "SELECT entry_json FROM lash_usage_deltas WHERE session_id = $1 ORDER BY seq ASC",
656 )
657 .bind(session_id)
658 .fetch_all(pool)
659 .await
660 .unwrap_or_default();
661 rows.into_iter()
662 .filter_map(|row| {
663 let json: String = row.get(0);
664 serde_json::from_str(&json).ok()
665 })
666 .collect()
667}
668
669async fn load_graph(
670 pool: &PgPool,
671 session_id: &str,
672 leaf_node_id: Option<String>,
673 active_path: bool,
674) -> Result<lash_core::SessionGraph, StoreError> {
675 let rows = sqlx::query(
676 "SELECT node_json FROM lash_graph_nodes
677 WHERE session_id = $1 AND tombstoned = FALSE
678 ORDER BY seq ASC",
679 )
680 .bind(session_id)
681 .fetch_all(pool)
682 .await
683 .map_err(store_sqlx_error)?;
684 let mut nodes = Vec::<SessionNodeRecord>::new();
685 for row in rows {
686 let json: String = row.get(0);
687 nodes.push(store_decode_json(&json, "session graph node")?);
688 }
689 if active_path && let Some(leaf) = leaf_node_id.clone() {
690 let wanted = active_path_node_ids(&nodes, &leaf);
691 nodes.retain(|node| wanted.contains(&node.node_id));
692 }
693 Ok(lash_core::SessionGraph::from_nodes(nodes, leaf_node_id))
694}
695
696fn active_path_node_ids(nodes: &[SessionNodeRecord], leaf_node_id: &str) -> HashSet<String> {
697 let mut parent_by_id = std::collections::BTreeMap::new();
698 for node in nodes {
699 parent_by_id.insert(node.node_id.clone(), node.parent_node_id.clone());
700 }
701 let mut wanted = HashSet::new();
702 let mut cursor = Some(leaf_node_id.to_string());
703 while let Some(node_id) = cursor {
704 if !wanted.insert(node_id.clone()) {
705 break;
706 }
707 cursor = parent_by_id.get(&node_id).cloned().flatten();
708 }
709 wanted
710}
711
712async fn commit_attachment_refs_tx(
713 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
714 session_id: &str,
715 attachment_ids: &[AttachmentId],
716) -> Result<(), StoreError> {
717 if attachment_ids.is_empty() {
718 return Ok(());
719 }
720 let now = current_epoch_ms() as i64;
721 for id in attachment_ids {
722 sqlx::query(
723 "UPDATE lash_attachment_manifest
724 SET committed_at_ms = COALESCE(committed_at_ms, $1)
725 WHERE attachment_id = $2 AND session_id = $3",
726 )
727 .bind(now)
728 .bind(id.as_str())
729 .bind(session_id)
730 .execute(&mut **tx)
731 .await
732 .map_err(store_sqlx_error)?;
733 }
734 Ok(())
735}
736
737impl AttachmentManifest for PostgresSessionStore {
738 fn record_intent(&self, intent: AttachmentIntent) -> Result<(), StoreError> {
739 let pool = self.pool.clone();
740 block_on_detached(async move {
741 sqlx::query(
742 "INSERT INTO lash_attachment_manifest (
743 attachment_id, session_id, canonical_uri, intent_at_ms, committed_at_ms
744 )
745 VALUES ($1, $2, $3, $4, NULL)
746 ON CONFLICT (attachment_id) DO UPDATE SET
747 session_id = EXCLUDED.session_id,
748 canonical_uri = EXCLUDED.canonical_uri,
749 intent_at_ms = EXCLUDED.intent_at_ms",
750 )
751 .bind(intent.attachment_id.as_str())
752 .bind(intent.session_id)
753 .bind(intent.canonical_uri)
754 .bind(intent.intent_at_epoch_ms as i64)
755 .execute(&pool)
756 .await
757 .map(|_| ())
758 .map_err(store_sqlx_error)
759 })
760 }
761
762 fn commit_refs(
763 &self,
764 session_id: &str,
765 attachment_ids: &[AttachmentId],
766 ) -> Result<(), StoreError> {
767 let pool = self.pool.clone();
768 let session_id = session_id.to_string();
769 let attachment_ids = attachment_ids.to_vec();
770 block_on_detached(async move {
771 let mut tx = pool.begin().await.map_err(store_sqlx_error)?;
772 commit_attachment_refs_tx(&mut tx, &session_id, &attachment_ids).await?;
773 tx.commit().await.map_err(store_sqlx_error)
774 })
775 }
776
777 fn list_uncommitted(
778 &self,
779 older_than_epoch_ms: u64,
780 ) -> Result<Vec<AttachmentManifestEntry>, StoreError> {
781 let pool = self.pool.clone();
782 block_on_detached(async move {
783 let rows = sqlx::query(
784 "SELECT attachment_id, session_id, canonical_uri, intent_at_ms, committed_at_ms
785 FROM lash_attachment_manifest
786 WHERE committed_at_ms IS NULL AND intent_at_ms <= $1
787 ORDER BY attachment_id ASC",
788 )
789 .bind(older_than_epoch_ms as i64)
790 .fetch_all(&pool)
791 .await
792 .map_err(store_sqlx_error)?;
793 Ok(rows
794 .into_iter()
795 .map(|row| AttachmentManifestEntry {
796 attachment_id: AttachmentId::new(row.get::<String, _>(0)),
797 session_id: row.get(1),
798 canonical_uri: row.get(2),
799 intent_at_epoch_ms: row.get::<i64, _>(3) as u64,
800 committed_at_epoch_ms: row.get::<Option<i64>, _>(4).map(|value| value as u64),
801 })
802 .collect())
803 })
804 }
805
806 fn forget(&self, attachment_id: &AttachmentId) -> Result<(), StoreError> {
807 let pool = self.pool.clone();
808 let attachment_id = attachment_id.to_string();
809 block_on_detached(async move {
810 sqlx::query("DELETE FROM lash_attachment_manifest WHERE attachment_id = $1")
811 .bind(attachment_id)
812 .execute(&pool)
813 .await
814 .map(|_| ())
815 .map_err(store_sqlx_error)
816 })
817 }
818}
819
820#[async_trait::async_trait]
821impl SessionStoreFactory for PostgresSessionStoreFactory {
822 fn durability_tier(&self) -> DurabilityTier {
823 DurabilityTier::Durable
824 }
825
826 async fn create_store(
827 &self,
828 request: &SessionStoreCreateRequest,
829 ) -> Result<Arc<dyn RuntimePersistence>, String> {
830 let store = PostgresSessionStore {
831 pool: self.pool.clone(),
832 session_id: Some(request.session_id.clone()),
833 bound_session: Arc::new(OnceLock::new()),
834 };
835 if store
836 .load_session_meta()
837 .await
838 .map_err(|err| err.to_string())?
839 .is_none()
840 {
841 store
842 .save_session_meta(SessionMeta {
843 session_id: request.session_id.clone(),
844 session_name: request.session_id.clone(),
845 created_at: current_timestamp_string(),
846 model: request.policy.model.id.clone(),
847 cwd: std::env::current_dir()
848 .ok()
849 .and_then(|path| path.to_str().map(str::to_string)),
850 relation: request.relation.clone(),
851 })
852 .await
853 .map_err(|err| err.to_string())?;
854 }
855 Ok(Arc::new(store))
856 }
857
858 async fn open_existing_store(
859 &self,
860 request: &SessionStoreCreateRequest,
861 ) -> Result<Option<Arc<dyn RuntimePersistence>>, String> {
862 let store = PostgresSessionStore {
863 pool: self.pool.clone(),
864 session_id: Some(request.session_id.clone()),
865 bound_session: Arc::new(OnceLock::new()),
866 };
867 if store
868 .load_session_meta()
869 .await
870 .map_err(|err| err.to_string())?
871 .is_some()
872 {
873 Ok(Some(Arc::new(store)))
874 } else {
875 Ok(None)
876 }
877 }
878
879 async fn delete_session(&self, session_id: &str) -> Result<(), String> {
880 let mut tx = self.pool.begin().await.map_err(|err| err.to_string())?;
881 for sql in [
882 "DELETE FROM lash_queued_work_items WHERE batch_id IN (SELECT batch_id FROM lash_queued_work_batches WHERE session_id = $1)",
883 "DELETE FROM lash_queued_work_batches WHERE session_id = $1",
884 "DELETE FROM lash_usage_deltas WHERE session_id = $1",
885 "DELETE FROM lash_graph_nodes WHERE session_id = $1",
886 "DELETE FROM lash_runtime_turn_commits WHERE session_id = $1",
887 "DELETE FROM lash_session_meta WHERE session_id = $1",
888 "DELETE FROM lash_sessions WHERE session_id = $1",
889 "DELETE FROM lash_attachment_manifest WHERE session_id = $1",
890 ] {
891 sqlx::query(sql)
892 .bind(session_id)
893 .execute(&mut *tx)
894 .await
895 .map_err(|err| err.to_string())?;
896 }
897 tx.commit().await.map_err(|err| err.to_string())
898 }
899}
900
901#[derive(Clone, Debug)]
902struct QueuedBatchRow {
903 enqueue_seq: u64,
904 batch_id: String,
905 session_id: String,
906 source_key: Option<String>,
907 delivery_policy: DeliveryPolicy,
908 slot_policy: SlotPolicy,
909 merge_key: MergeKey,
910 available_at_ms: u64,
911 enqueued_at_ms: u64,
912 claim_fencing_token: u64,
913}
914
915fn queued_batch_row(row: PgRow) -> Result<QueuedBatchRow, StoreError> {
916 let delivery_policy =
917 DeliveryPolicy::from_wire_str(row.get::<String, _>("delivery_policy").as_str())
918 .ok_or_else(|| {
919 StoreError::Backend("invalid queued work delivery policy".to_string())
920 })?;
921 let slot_policy = SlotPolicy::from_wire_str(row.get::<String, _>("slot_policy").as_str())
922 .ok_or_else(|| StoreError::Backend("invalid queued work slot policy".to_string()))?;
923 let merge_json: String = row.get("merge_key_json");
924 Ok(QueuedBatchRow {
925 enqueue_seq: row.get::<i64, _>("enqueue_seq") as u64,
926 batch_id: row.get("batch_id"),
927 session_id: row.get("session_id"),
928 source_key: row.get("source_key"),
929 delivery_policy,
930 slot_policy,
931 merge_key: store_decode_json(&merge_json, "queued work merge key")?,
932 available_at_ms: row.get::<i64, _>("available_at_ms") as u64,
933 enqueued_at_ms: row.get::<i64, _>("enqueued_at_ms") as u64,
934 claim_fencing_token: row.get::<i64, _>("claim_fencing_token") as u64,
935 })
936}
937
938async fn load_queued_batch(
939 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
940 batch_id: &str,
941) -> Result<Option<QueuedWorkBatch>, StoreError> {
942 let row = sqlx::query(
943 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
944 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
945 claim_fencing_token
946 FROM lash_queued_work_batches
947 WHERE batch_id = $1",
948 )
949 .bind(batch_id)
950 .fetch_optional(&mut **tx)
951 .await
952 .map_err(store_sqlx_error)?;
953 let Some(row) = row else {
954 return Ok(None);
955 };
956 let row = queued_batch_row(row)?;
957 queued_work_batch_from_row(tx, row).await.map(Some)
958}
959
960async fn queued_work_batch_from_row(
961 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
962 row: QueuedBatchRow,
963) -> Result<QueuedWorkBatch, StoreError> {
964 let item_rows = sqlx::query(
965 "SELECT item_id, payload_json
966 FROM lash_queued_work_items
967 WHERE batch_id = $1
968 ORDER BY item_index ASC",
969 )
970 .bind(&row.batch_id)
971 .fetch_all(&mut **tx)
972 .await
973 .map_err(store_sqlx_error)?;
974 let mut items = Vec::new();
975 for item in item_rows {
976 let payload_json: String = item.get(1);
977 items.push(QueuedWorkItem {
978 item_id: item.get(0),
979 payload: store_decode_json(&payload_json, "queued work payload")?,
980 });
981 }
982 Ok(QueuedWorkBatch {
983 batch_id: row.batch_id,
984 session_id: row.session_id,
985 enqueue_seq: row.enqueue_seq,
986 source_key: row.source_key,
987 delivery_policy: row.delivery_policy,
988 slot_policy: row.slot_policy,
989 merge_key: row.merge_key,
990 available_at_ms: row.available_at_ms,
991 enqueued_at_ms: row.enqueued_at_ms,
992 items,
993 })
994}
995
996async fn ensure_queued_work_completion_tx(
997 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
998 completed: &QueuedWorkCompletion,
999) -> Result<(), StoreError> {
1000 for batch_id in &completed.batch_ids {
1001 let exists: Option<i64> = sqlx::query_scalar(
1002 "SELECT 1::BIGINT FROM lash_queued_work_batches
1003 WHERE session_id = $1
1004 AND batch_id = $2
1005 AND claim_id = $3
1006 AND claim_token = $4
1007 LIMIT 1",
1008 )
1009 .bind(&completed.session_id)
1010 .bind(batch_id)
1011 .bind(&completed.claim_id)
1012 .bind(&completed.lease_token)
1013 .fetch_optional(&mut **tx)
1014 .await
1015 .map_err(store_sqlx_error)?;
1016 if exists.is_none() {
1017 return Err(StoreError::QueuedWorkClaimExpired {
1018 session_id: completed.session_id.clone(),
1019 claim_id: completed.claim_id.clone(),
1020 });
1021 }
1022 }
1023 Ok(())
1024}
1025
1026#[async_trait::async_trait]
1027impl RuntimePersistence for PostgresSessionStore {
1028 fn durability_tier(&self) -> DurabilityTier {
1029 DurabilityTier::Durable
1030 }
1031
1032 async fn load_session(
1033 &self,
1034 scope: SessionReadScope,
1035 ) -> Result<Option<PersistedSessionRead>, StoreError> {
1036 let Some(session_id) = self.selected_session_id().await? else {
1037 return Ok(None);
1038 };
1039 let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1040 let Some(meta) = load_session_head_meta_tx(&mut tx, &session_id, false).await? else {
1041 return Ok(None);
1042 };
1043 tx.commit().await.map_err(store_sqlx_error)?;
1044 let leaf_node_id = match &scope {
1045 SessionReadScope::FullGraph => meta.leaf_node_id.clone(),
1046 SessionReadScope::ActivePath { leaf_node_id } => {
1047 leaf_node_id.clone().or_else(|| meta.leaf_node_id.clone())
1048 }
1049 };
1050 let graph = load_graph(
1051 &self.pool,
1052 &session_id,
1053 leaf_node_id.clone(),
1054 matches!(scope, SessionReadScope::ActivePath { .. }),
1055 )
1056 .await?;
1057 let checkpoint = match meta.checkpoint_ref.as_ref() {
1058 Some(blob_ref) => get_checkpoint(&self.pool, blob_ref).await,
1059 None => None,
1060 };
1061 Ok(Some(PersistedSessionRead {
1062 session_id: meta.session_id,
1063 head_revision: meta.head_revision,
1064 config: meta.config,
1065 agent_frames: meta.agent_frames,
1066 current_agent_frame_id: meta.current_agent_frame_id,
1067 graph,
1068 checkpoint_ref: meta.checkpoint_ref,
1069 checkpoint,
1070 token_ledger: merge_token_ledger_entries(
1071 load_usage_deltas(&self.pool, &session_id).await,
1072 ),
1073 }))
1074 }
1075
1076 async fn load_node(&self, node_id: &str) -> Result<Option<SessionNodeRecord>, StoreError> {
1077 let json: Option<String> = if let Some(session_id) = &self.session_id {
1078 sqlx::query_scalar(
1079 "SELECT node_json FROM lash_graph_nodes
1080 WHERE session_id = $1 AND node_id = $2 AND tombstoned = FALSE",
1081 )
1082 .bind(session_id)
1083 .bind(node_id)
1084 .fetch_optional(&self.pool)
1085 .await
1086 .map_err(store_sqlx_error)?
1087 } else {
1088 sqlx::query_scalar(
1089 "SELECT node_json FROM lash_graph_nodes
1090 WHERE node_id = $1 AND tombstoned = FALSE
1091 ORDER BY session_id ASC
1092 LIMIT 1",
1093 )
1094 .bind(node_id)
1095 .fetch_optional(&self.pool)
1096 .await
1097 .map_err(store_sqlx_error)?
1098 };
1099 json.map(|json| store_decode_json(&json, "session graph node"))
1100 .transpose()
1101 }
1102
1103 async fn commit_runtime_state(
1104 &self,
1105 commit: RuntimeCommit,
1106 ) -> Result<RuntimeCommitResult, StoreError> {
1107 let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1108 let existing = load_session_head_meta_tx(&mut tx, &commit.session_id, false).await?;
1112 if let Some(bound_session_id) = existing.as_ref().map(|meta| meta.session_id.as_str())
1113 && bound_session_id != commit.session_id
1114 {
1115 return Err(StoreError::SessionBindingMismatch {
1116 bound_session_id: bound_session_id.to_string(),
1117 attempted_session_id: commit.session_id,
1118 });
1119 }
1120 let effective_binding = self
1124 .session_id
1125 .clone()
1126 .or_else(|| self.bound_session.get().cloned());
1127 if let Some(bound_session_id) = &effective_binding
1128 && commit.session_id != *bound_session_id
1129 {
1130 return Err(StoreError::SessionBindingMismatch {
1131 bound_session_id: bound_session_id.clone(),
1132 attempted_session_id: commit.session_id,
1133 });
1134 }
1135 if self.session_id.is_none() {
1136 let _ = self.bound_session.set(commit.session_id.clone());
1137 }
1138 if let Some(completed) = &commit.turn_commit {
1139 if completed.session_id != commit.session_id {
1140 return Err(StoreError::RuntimeTurnCommitConflict {
1141 session_id: completed.session_id.clone(),
1142 turn_id: completed.turn_id.clone(),
1143 });
1144 }
1145 let prior = sqlx::query(
1146 "SELECT turn_commit_hash, result_json
1147 FROM lash_runtime_turn_commits
1148 WHERE session_id = $1 AND turn_id = $2",
1149 )
1150 .bind(&completed.session_id)
1151 .bind(&completed.turn_id)
1152 .fetch_optional(&mut *tx)
1153 .await
1154 .map_err(store_sqlx_error)?;
1155 if let Some(row) = prior {
1156 let hash: String = row.get(0);
1157 let result_json: String = row.get(1);
1158 if hash == completed.turn_commit_hash {
1159 return store_decode_json(&result_json, "runtime turn commit result");
1160 }
1161 return Err(StoreError::RuntimeTurnCommitConflict {
1162 session_id: completed.session_id.clone(),
1163 turn_id: completed.turn_id.clone(),
1164 });
1165 }
1166 }
1167 let actual_revision = existing.as_ref().map_or(0, |meta| meta.head_revision);
1168 if commit.expected_head_revision.is_some()
1169 && commit.expected_head_revision != Some(actual_revision)
1170 {
1171 return Err(StoreError::HeadRevisionConflict {
1172 expected: commit.expected_head_revision,
1173 actual: actual_revision,
1174 });
1175 }
1176 for completed in &commit.completed_queue_claims {
1177 if completed.session_id != commit.session_id {
1178 return Err(StoreError::QueuedWorkClaimExpired {
1179 session_id: completed.session_id.clone(),
1180 claim_id: completed.claim_id.clone(),
1181 });
1182 }
1183 ensure_queued_work_completion_tx(&mut tx, completed).await?;
1184 }
1185 let (checkpoint_ref, manifest) = put_checkpoint_tx(&mut tx, &commit.checkpoint).await?;
1186 for entry in &commit.usage_deltas {
1187 sqlx::query("INSERT INTO lash_usage_deltas (session_id, entry_json) VALUES ($1, $2)")
1188 .bind(&commit.session_id)
1189 .bind(encode_json(entry))
1190 .execute(&mut *tx)
1191 .await
1192 .map_err(store_sqlx_error)?;
1193 }
1194 let leaf_node_id = match &commit.graph {
1195 GraphCommitDelta::Unchanged { leaf_node_id } => leaf_node_id.clone(),
1196 GraphCommitDelta::Append {
1197 nodes,
1198 leaf_node_id,
1199 } => {
1200 for node in nodes {
1201 sqlx::query(
1202 "INSERT INTO lash_graph_nodes (session_id, node_id, node_json)
1203 VALUES ($1, $2, $3)
1204 ON CONFLICT (session_id, node_id) DO UPDATE SET
1205 node_json = EXCLUDED.node_json,
1206 tombstoned = FALSE",
1207 )
1208 .bind(&commit.session_id)
1209 .bind(&node.node_id)
1210 .bind(encode_json(node))
1211 .execute(&mut *tx)
1212 .await
1213 .map_err(store_sqlx_error)?;
1214 }
1215 leaf_node_id.clone()
1216 }
1217 GraphCommitDelta::ReplaceFull(graph) => {
1218 sqlx::query("DELETE FROM lash_graph_nodes WHERE session_id = $1")
1219 .bind(&commit.session_id)
1220 .execute(&mut *tx)
1221 .await
1222 .map_err(store_sqlx_error)?;
1223 for node in &graph.nodes {
1224 sqlx::query(
1225 "INSERT INTO lash_graph_nodes (session_id, node_id, node_json)
1226 VALUES ($1, $2, $3)",
1227 )
1228 .bind(&commit.session_id)
1229 .bind(&node.node_id)
1230 .bind(encode_json(node))
1231 .execute(&mut *tx)
1232 .await
1233 .map_err(store_sqlx_error)?;
1234 }
1235 graph.leaf_node_id.clone()
1236 }
1237 };
1238 let graph_node_count: i64 = sqlx::query_scalar(
1239 "SELECT COUNT(*) FROM lash_graph_nodes WHERE session_id = $1 AND tombstoned = FALSE",
1240 )
1241 .bind(&commit.session_id)
1242 .fetch_one(&mut *tx)
1243 .await
1244 .map_err(store_sqlx_error)?;
1245 let next_revision = actual_revision + 1;
1246 let meta = SessionHeadMeta {
1247 session_id: commit.session_id.clone(),
1248 head_revision: next_revision,
1249 config: commit.config.clone(),
1250 agent_frames: commit.agent_frames.clone(),
1251 current_agent_frame_id: commit.current_agent_frame_id.clone(),
1252 checkpoint_ref: Some(checkpoint_ref.clone()),
1253 leaf_node_id,
1254 graph_node_count: graph_node_count as usize,
1255 token_ledger: Vec::new(),
1256 };
1257 let head_write = sqlx::query(
1262 "INSERT INTO lash_sessions (session_id, head_revision, head_json, checkpoint_ref)
1263 VALUES ($1, $2, $3, $4)
1264 ON CONFLICT (session_id) DO UPDATE SET
1265 head_revision = EXCLUDED.head_revision,
1266 head_json = EXCLUDED.head_json,
1267 checkpoint_ref = EXCLUDED.checkpoint_ref
1268 WHERE lash_sessions.head_revision = $5",
1269 )
1270 .bind(&commit.session_id)
1271 .bind(next_revision as i64)
1272 .bind(encode_json(&meta))
1273 .bind(checkpoint_ref.as_str())
1274 .bind(actual_revision as i64)
1275 .execute(&mut *tx)
1276 .await;
1277 let head_write = match head_write {
1278 Ok(result) => result,
1279 Err(err) if is_contention_error(&err) => {
1280 return Err(StoreError::HeadRevisionConflict {
1285 expected: commit.expected_head_revision.or(Some(actual_revision)),
1286 actual: actual_revision,
1287 });
1288 }
1289 Err(err) => return Err(store_sqlx_error(err)),
1290 };
1291 if head_write.rows_affected() == 0 {
1292 let actual_now = sqlx::query_scalar::<_, i64>(
1297 "SELECT head_revision FROM lash_sessions WHERE session_id = $1",
1298 )
1299 .bind(&commit.session_id)
1300 .fetch_optional(&mut *tx)
1301 .await
1302 .map_err(store_sqlx_error)?
1303 .map_or(actual_revision, |revision| revision as u64);
1304 return Err(StoreError::HeadRevisionConflict {
1305 expected: commit.expected_head_revision.or(Some(actual_revision)),
1306 actual: actual_now,
1307 });
1308 }
1309 for completed in &commit.completed_queue_claims {
1310 for batch_id in &completed.batch_ids {
1311 sqlx::query(
1312 "DELETE FROM lash_queued_work_batches
1313 WHERE session_id = $1 AND batch_id = $2 AND claim_id = $3 AND claim_token = $4",
1314 )
1315 .bind(&completed.session_id)
1316 .bind(batch_id)
1317 .bind(&completed.claim_id)
1318 .bind(&completed.lease_token)
1319 .execute(&mut *tx)
1320 .await
1321 .map_err(store_sqlx_error)?;
1322 }
1323 }
1324 commit_attachment_refs_tx(
1325 &mut tx,
1326 &commit.session_id,
1327 &commit.committed_attachment_ids,
1328 )
1329 .await?;
1330 let result = RuntimeCommitResult {
1331 head_revision: next_revision,
1332 checkpoint_ref,
1333 manifest,
1334 };
1335 if let Some(completed) = &commit.turn_commit {
1336 sqlx::query(
1337 "INSERT INTO lash_runtime_turn_commits (
1338 session_id, turn_id, turn_commit_hash, result_json, committed_at_ms
1339 )
1340 VALUES ($1, $2, $3, $4, $5)",
1341 )
1342 .bind(&completed.session_id)
1343 .bind(&completed.turn_id)
1344 .bind(&completed.turn_commit_hash)
1345 .bind(encode_json(&result))
1346 .bind(current_epoch_ms() as i64)
1347 .execute(&mut *tx)
1348 .await
1349 .map_err(store_sqlx_error)?;
1350 }
1351 tx.commit().await.map_err(store_sqlx_error)?;
1352 Ok(result)
1353 }
1354
1355 async fn enqueue_queued_work(
1356 &self,
1357 batch: QueuedWorkBatchDraft,
1358 ) -> Result<QueuedWorkBatch, StoreError> {
1359 let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1360 if let Some(source_key) = batch.source_key.as_deref() {
1361 let existing_id: Option<String> = sqlx::query_scalar(
1362 "SELECT batch_id FROM lash_queued_work_batches
1363 WHERE session_id = $1 AND source_key = $2",
1364 )
1365 .bind(&batch.session_id)
1366 .bind(source_key)
1367 .fetch_optional(&mut *tx)
1368 .await
1369 .map_err(store_sqlx_error)?;
1370 if let Some(batch_id) = existing_id {
1371 let existing = load_queued_batch(&mut tx, &batch_id)
1372 .await?
1373 .ok_or_else(|| {
1374 StoreError::Backend("queued work source row disappeared".to_string())
1375 })?;
1376 tx.commit().await.map_err(store_sqlx_error)?;
1377 return Ok(existing);
1378 }
1379 }
1380 let now = current_epoch_ms();
1381 let batch_id = derive_batch_id(&batch.session_id, batch.source_key.as_deref(), now, None);
1382 let row = sqlx::query_scalar::<_, i64>(
1383 "INSERT INTO lash_queued_work_batches (
1384 batch_id, session_id, source_key, delivery_policy, slot_policy,
1385 merge_key_json, available_at_ms, enqueued_at_ms
1386 )
1387 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
1388 RETURNING enqueue_seq",
1389 )
1390 .bind(&batch_id)
1391 .bind(&batch.session_id)
1392 .bind(&batch.source_key)
1393 .bind(batch.delivery_policy.as_str())
1394 .bind(batch.slot_policy.as_str())
1395 .bind(encode_json(&batch.merge_key))
1396 .bind(batch.available_at_ms as i64)
1397 .bind(now as i64)
1398 .fetch_one(&mut *tx)
1399 .await
1400 .map_err(store_sqlx_error)?;
1401 for (index, payload) in batch.payloads.iter().enumerate() {
1402 let item_id = format!("{batch_id}:item:{index}");
1403 sqlx::query(
1404 "INSERT INTO lash_queued_work_items (batch_id, item_index, item_id, payload_json)
1405 VALUES ($1, $2, $3, $4)",
1406 )
1407 .bind(&batch_id)
1408 .bind(index as i32)
1409 .bind(item_id)
1410 .bind(encode_json(payload))
1411 .execute(&mut *tx)
1412 .await
1413 .map_err(store_sqlx_error)?;
1414 }
1415 let queued = load_queued_batch(&mut tx, &batch_id)
1416 .await?
1417 .ok_or_else(|| StoreError::Backend("queued work insert disappeared".to_string()))?;
1418 debug_assert_eq!(queued.enqueue_seq, row as u64);
1419 tx.commit().await.map_err(store_sqlx_error)?;
1420 Ok(queued)
1421 }
1422
1423 async fn claim_ready_queued_work(
1424 &self,
1425 session_id: &str,
1426 owner_id: &str,
1427 boundary: QueuedWorkClaimBoundary,
1428 lease_ttl_ms: u64,
1429 max_batches: usize,
1430 ) -> Result<Option<QueuedWorkClaim>, StoreError> {
1431 if max_batches == 0 {
1432 return Ok(None);
1433 }
1434 let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1435 let now = current_epoch_ms();
1436 let rows = sqlx::query(
1437 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1438 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1439 claim_fencing_token
1440 FROM lash_queued_work_batches
1441 WHERE session_id = $1
1442 AND available_at_ms <= $2
1443 AND (claim_token IS NULL OR claim_expires_at_ms <= $2)
1444 ORDER BY enqueue_seq ASC
1445 LIMIT $3
1446 FOR UPDATE SKIP LOCKED",
1447 )
1448 .bind(session_id)
1449 .bind(now as i64)
1450 .bind(claim_scan_limit(max_batches))
1451 .fetch_all(&mut *tx)
1452 .await
1453 .map_err(store_sqlx_error)?;
1454 let mut selected = Vec::new();
1455 for row in rows {
1456 selected.push(queued_batch_row(row)?);
1457 }
1458 let candidates = selected
1459 .iter()
1460 .map(|row| ClaimCandidate {
1461 enqueue_seq: row.enqueue_seq,
1462 claim_fencing_token: row.claim_fencing_token,
1463 delivery_policy: row.delivery_policy,
1464 slot_policy: row.slot_policy,
1465 merge_key: row.merge_key.clone(),
1466 })
1467 .collect::<Vec<_>>();
1468 let selected_len = select_claim_prefix(&candidates, boundary, max_batches);
1469 if selected_len == 0 {
1470 tx.commit().await.map_err(store_sqlx_error)?;
1471 return Ok(None);
1472 }
1473 selected.truncate(selected_len);
1474 let lease =
1475 QueuedWorkClaimLease::derive(&candidates[0], session_id, owner_id, now, lease_ttl_ms);
1476 for row in &selected {
1477 let changed = sqlx::query(
1478 "UPDATE lash_queued_work_batches
1479 SET claim_id = $3,
1480 claim_owner_id = $4,
1481 claim_token = $5,
1482 claim_fencing_token = claim_fencing_token + 1,
1483 claim_claimed_at_ms = $6,
1484 claim_expires_at_ms = $7
1485 WHERE session_id = $1
1486 AND batch_id = $2
1487 AND (claim_token IS NULL OR claim_expires_at_ms <= $6)",
1488 )
1489 .bind(session_id)
1490 .bind(&row.batch_id)
1491 .bind(&lease.claim_id)
1492 .bind(owner_id)
1493 .bind(&lease.lease_token)
1494 .bind(now as i64)
1495 .bind(lease.expires_at_epoch_ms as i64)
1496 .execute(&mut *tx)
1497 .await
1498 .map_err(store_sqlx_error)?
1499 .rows_affected();
1500 if changed == 0 {
1501 tx.rollback().await.map_err(store_sqlx_error)?;
1502 return Ok(None);
1503 }
1504 }
1505 let mut batches = Vec::new();
1506 for row in selected {
1507 batches.push(queued_work_batch_from_row(&mut tx, row).await?);
1508 }
1509 tx.commit().await.map_err(store_sqlx_error)?;
1510 Ok(Some(QueuedWorkClaim {
1511 session_id: session_id.to_string(),
1512 claim_id: lease.claim_id,
1513 owner_id: owner_id.to_string(),
1514 lease_token: lease.lease_token,
1515 fencing_token: lease.fencing_token,
1516 claimed_at_epoch_ms: lease.claimed_at_epoch_ms,
1517 expires_at_epoch_ms: lease.expires_at_epoch_ms,
1518 batches,
1519 }))
1520 }
1521
1522 async fn renew_queued_work_claim(
1523 &self,
1524 claim: &QueuedWorkClaim,
1525 lease_ttl_ms: u64,
1526 ) -> Result<QueuedWorkClaim, StoreError> {
1527 let expires_at = current_epoch_ms().saturating_add(lease_ttl_ms);
1528 let changed = sqlx::query(
1529 "UPDATE lash_queued_work_batches
1530 SET claim_expires_at_ms = $4
1531 WHERE session_id = $1 AND claim_id = $2 AND claim_token = $3",
1532 )
1533 .bind(&claim.session_id)
1534 .bind(&claim.claim_id)
1535 .bind(&claim.lease_token)
1536 .bind(expires_at as i64)
1537 .execute(&self.pool)
1538 .await
1539 .map_err(store_sqlx_error)?
1540 .rows_affected();
1541 renewed_claim(claim, changed as usize, expires_at)
1542 }
1543
1544 async fn abandon_queued_work_claim(&self, claim: &QueuedWorkClaim) -> Result<(), StoreError> {
1545 sqlx::query(
1546 "UPDATE lash_queued_work_batches
1547 SET claim_id = NULL,
1548 claim_owner_id = NULL,
1549 claim_token = NULL,
1550 claim_claimed_at_ms = 0,
1551 claim_expires_at_ms = 0
1552 WHERE session_id = $1 AND claim_id = $2 AND claim_token = $3",
1553 )
1554 .bind(&claim.session_id)
1555 .bind(&claim.claim_id)
1556 .bind(&claim.lease_token)
1557 .execute(&self.pool)
1558 .await
1559 .map_err(store_sqlx_error)?;
1560 Ok(())
1561 }
1562
1563 async fn cancel_queued_work_batch(
1564 &self,
1565 session_id: &str,
1566 batch_id: &str,
1567 ) -> Result<Option<QueuedWorkBatch>, StoreError> {
1568 let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1569 let now = current_epoch_ms();
1570 let row = sqlx::query(
1571 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1572 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1573 claim_fencing_token
1574 FROM lash_queued_work_batches
1575 WHERE session_id = $1
1576 AND batch_id = $2
1577 AND (claim_token IS NULL OR claim_expires_at_ms <= $3)
1578 FOR UPDATE",
1579 )
1580 .bind(session_id)
1581 .bind(batch_id)
1582 .bind(now as i64)
1583 .fetch_optional(&mut *tx)
1584 .await
1585 .map_err(store_sqlx_error)?;
1586 let Some(row) = row else {
1587 tx.commit().await.map_err(store_sqlx_error)?;
1588 return Ok(None);
1589 };
1590 let batch = queued_work_batch_from_row(&mut tx, queued_batch_row(row)?).await?;
1591 sqlx::query("DELETE FROM lash_queued_work_batches WHERE batch_id = $1")
1592 .bind(batch_id)
1593 .execute(&mut *tx)
1594 .await
1595 .map_err(store_sqlx_error)?;
1596 tx.commit().await.map_err(store_sqlx_error)?;
1597 Ok(Some(batch))
1598 }
1599
1600 async fn list_queued_work(&self, session_id: &str) -> Result<Vec<QueuedWorkBatch>, StoreError> {
1601 let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1602 let rows = sqlx::query(
1603 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1604 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1605 claim_fencing_token
1606 FROM lash_queued_work_batches
1607 WHERE session_id = $1
1608 ORDER BY enqueue_seq ASC",
1609 )
1610 .bind(session_id)
1611 .fetch_all(&mut *tx)
1612 .await
1613 .map_err(store_sqlx_error)?;
1614 let mut batches = Vec::new();
1615 for row in rows {
1616 batches.push(queued_work_batch_from_row(&mut tx, queued_batch_row(row)?).await?);
1617 }
1618 tx.commit().await.map_err(store_sqlx_error)?;
1619 Ok(batches)
1620 }
1621
1622 async fn list_pending_queued_work(
1623 &self,
1624 session_id: &str,
1625 ) -> Result<Vec<QueuedWorkBatch>, StoreError> {
1626 let mut tx = self.pool.begin().await.map_err(store_sqlx_error)?;
1627 let now = current_epoch_ms();
1628 let rows = sqlx::query(
1629 "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
1630 slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
1631 claim_fencing_token
1632 FROM lash_queued_work_batches
1633 WHERE session_id = $1
1634 AND (claim_token IS NULL OR claim_expires_at_ms <= $2)
1635 ORDER BY enqueue_seq ASC",
1636 )
1637 .bind(session_id)
1638 .bind(now as i64)
1639 .fetch_all(&mut *tx)
1640 .await
1641 .map_err(store_sqlx_error)?;
1642 let mut batches = Vec::new();
1643 for row in rows {
1644 batches.push(queued_work_batch_from_row(&mut tx, queued_batch_row(row)?).await?);
1645 }
1646 tx.commit().await.map_err(store_sqlx_error)?;
1647 Ok(batches)
1648 }
1649
1650 async fn save_session_meta(&self, meta: SessionMeta) -> Result<(), StoreError> {
1651 sqlx::query(
1652 "INSERT INTO lash_session_meta (session_id, meta_json)
1653 VALUES ($1, $2)
1654 ON CONFLICT (session_id) DO UPDATE SET meta_json = EXCLUDED.meta_json",
1655 )
1656 .bind(&meta.session_id)
1657 .bind(encode_json(&meta))
1658 .execute(&self.pool)
1659 .await
1660 .map_err(store_sqlx_error)?;
1661 Ok(())
1662 }
1663
1664 async fn load_session_meta(&self) -> Result<Option<SessionMeta>, StoreError> {
1665 let json: Option<String> = if let Some(session_id) = &self.session_id {
1666 sqlx::query_scalar("SELECT meta_json FROM lash_session_meta WHERE session_id = $1")
1667 .bind(session_id)
1668 .fetch_optional(&self.pool)
1669 .await
1670 .map_err(store_sqlx_error)?
1671 } else {
1672 sqlx::query_scalar(
1673 "SELECT meta_json FROM lash_session_meta ORDER BY session_id ASC LIMIT 1",
1674 )
1675 .fetch_optional(&self.pool)
1676 .await
1677 .map_err(store_sqlx_error)?
1678 };
1679 json.map(|json| store_decode_json(&json, "session meta"))
1680 .transpose()
1681 }
1682
1683 async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), StoreError> {
1684 for id in ids {
1685 if let Some(session_id) = &self.session_id {
1686 sqlx::query(
1687 "UPDATE lash_graph_nodes
1688 SET tombstoned = TRUE
1689 WHERE session_id = $1 AND node_id = $2",
1690 )
1691 .bind(session_id)
1692 .bind(id)
1693 .execute(&self.pool)
1694 .await
1695 .map_err(store_sqlx_error)?;
1696 } else {
1697 sqlx::query(
1698 "UPDATE lash_graph_nodes
1699 SET tombstoned = TRUE
1700 WHERE node_id = $1",
1701 )
1702 .bind(id)
1703 .execute(&self.pool)
1704 .await
1705 .map_err(store_sqlx_error)?;
1706 }
1707 }
1708 Ok(())
1709 }
1710
1711 async fn vacuum(&self) -> Result<VacuumReport, StoreError> {
1712 let removed = if let Some(session_id) = &self.session_id {
1713 sqlx::query("DELETE FROM lash_graph_nodes WHERE session_id = $1 AND tombstoned = TRUE")
1714 .bind(session_id)
1715 .execute(&self.pool)
1716 .await
1717 .map_err(store_sqlx_error)?
1718 .rows_affected()
1719 } else {
1720 sqlx::query("DELETE FROM lash_graph_nodes WHERE tombstoned = TRUE")
1721 .execute(&self.pool)
1722 .await
1723 .map_err(store_sqlx_error)?
1724 .rows_affected()
1725 };
1726 Ok(VacuumReport {
1727 removed_node_count: removed as usize,
1728 })
1729 }
1730
1731 async fn gc_unreachable(&self) -> Result<GcReport, StoreError> {
1732 Ok(GcReport::default())
1733 }
1734}
1735
1736fn process_status_label(record: &ProcessRecord) -> &'static str {
1737 record.status.label()
1738}
1739
1740async fn load_process_tx(
1741 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1742 process_id: &str,
1743) -> Result<Option<ProcessRecord>, PluginError> {
1744 let json: Option<String> = sqlx::query_scalar(
1745 "SELECT record_json
1746 FROM lash_processes
1747 WHERE process_id = $1
1748 FOR UPDATE",
1749 )
1750 .bind(process_id)
1751 .fetch_optional(&mut **tx)
1752 .await
1753 .map_err(plugin_sqlx_error)?;
1754 json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
1755 .transpose()
1756}
1757
1758async fn load_process(
1759 pool: &PgPool,
1760 process_id: &str,
1761) -> Result<Option<ProcessRecord>, PluginError> {
1762 let json: Option<String> =
1763 sqlx::query_scalar("SELECT record_json FROM lash_processes WHERE process_id = $1")
1764 .bind(process_id)
1765 .fetch_optional(pool)
1766 .await
1767 .map_err(plugin_sqlx_error)?;
1768 json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
1769 .transpose()
1770}
1771
1772async fn save_process_tx(
1773 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1774 record: &ProcessRecord,
1775) -> Result<(), PluginError> {
1776 sqlx::query(
1777 "UPDATE lash_processes
1778 SET updated_at_ms = $2, status = $3, record_json = $4
1779 WHERE process_id = $1",
1780 )
1781 .bind(&record.id)
1782 .bind(record.updated_at_ms as i64)
1783 .bind(process_status_label(record))
1784 .bind(serde_json::to_string(record).map_err(process_decode_error)?)
1785 .execute(&mut **tx)
1786 .await
1787 .map_err(plugin_sqlx_error)?;
1788 Ok(())
1789}
1790
1791async fn load_event_by_key_tx(
1792 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1793 process_id: &str,
1794 replay_key: &str,
1795) -> Result<Option<(String, ProcessEvent)>, PluginError> {
1796 let row = sqlx::query(
1797 "SELECT payload_hash, event_json
1798 FROM lash_process_events
1799 WHERE process_id = $1 AND idempotency_key = $2",
1800 )
1801 .bind(process_id)
1802 .bind(replay_key)
1803 .fetch_optional(&mut **tx)
1804 .await
1805 .map_err(plugin_sqlx_error)?;
1806 row.map(|row| {
1807 let hash: String = row.get(0);
1808 let json: String = row.get(1);
1809 serde_json::from_str(&json)
1810 .map(|event| (hash, event))
1811 .map_err(process_decode_error)
1812 })
1813 .transpose()
1814}
1815
1816async fn load_process_lease_tx(
1817 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
1818 process_id: &str,
1819) -> Result<Option<ProcessLease>, PluginError> {
1820 let row = sqlx::query(
1821 "SELECT lease_owner_id, lease_token, lease_fencing_token,
1822 lease_claimed_at_ms, lease_expires_at_ms
1823 FROM lash_process_leases
1824 WHERE process_id = $1",
1825 )
1826 .bind(process_id)
1827 .fetch_optional(&mut **tx)
1828 .await
1829 .map_err(plugin_sqlx_error)?;
1830 let Some(row) = row else {
1831 return Ok(None);
1832 };
1833 let owner_id: Option<String> = row.get(0);
1834 let lease_token: Option<String> = row.get(1);
1835 let (Some(owner_id), Some(lease_token)) = (owner_id, lease_token) else {
1836 return Ok(None);
1837 };
1838 Ok(Some(ProcessLease {
1839 schema_version: PROCESS_LEASE_SCHEMA_VERSION,
1840 process_id: process_id.to_string(),
1841 owner_id,
1842 lease_token,
1843 fencing_token: row.get::<i64, _>(2) as u64,
1844 claimed_at_epoch_ms: row.get::<i64, _>(3) as u64,
1845 expires_at_epoch_ms: row.get::<i64, _>(4) as u64,
1846 }))
1847}
1848
1849fn process_lease_conflict(process_id: &str, current: &ProcessLease) -> PluginError {
1850 PluginError::Session(format!(
1851 "process `{process_id}` is already leased by `{}` until {}",
1852 current.owner_id, current.expires_at_epoch_ms
1853 ))
1854}
1855
1856fn process_lease_expired(process_id: &str) -> PluginError {
1857 PluginError::Session(format!(
1858 "process lease for `{process_id}` is missing or expired"
1859 ))
1860}
1861
1862fn guard_lease(current: Option<&ProcessLease>, lease_token: &str, now: u64) -> bool {
1863 current
1864 .map(|current| current.lease_token == lease_token && current.expires_at_epoch_ms > now)
1865 .unwrap_or(false)
1866}
1867
1868async fn list_grants_for_scope(
1869 pool: &PgPool,
1870 session_scope: &SessionScope,
1871 live_only: bool,
1872) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
1873 let status_clause = if live_only {
1874 "AND p.status = 'running'"
1875 } else {
1876 ""
1877 };
1878 let sql = format!(
1879 "SELECT g.process_id, g.descriptor_json, p.record_json
1880 FROM lash_process_handle_grants g
1881 JOIN lash_processes p ON p.process_id = g.process_id
1882 WHERE g.scope_id = $1 {status_clause}
1883 ORDER BY g.process_id ASC"
1884 );
1885 let rows = sqlx::query(&sql)
1886 .bind(session_scope.id().as_str())
1887 .fetch_all(pool)
1888 .await
1889 .map_err(plugin_sqlx_error)?;
1890 let mut entries = Vec::new();
1891 for row in rows {
1892 let process_id: String = row.get(0);
1893 let descriptor_json: String = row.get(1);
1894 let record_json: String = row.get(2);
1895 let descriptor: ProcessHandleDescriptor =
1896 serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
1897 let record: ProcessRecord =
1898 serde_json::from_str(&record_json).map_err(process_decode_error)?;
1899 entries.push((
1900 ProcessHandleGrant {
1901 session_id: session_scope.session_id.clone(),
1902 process_id,
1903 descriptor,
1904 },
1905 record,
1906 ));
1907 }
1908 Ok(entries)
1909}
1910
1911#[async_trait::async_trait]
1912impl ProcessRegistry for PostgresProcessRegistry {
1913 fn durability_tier(&self) -> DurabilityTier {
1914 DurabilityTier::Durable
1915 }
1916
1917 async fn register_process(
1918 &self,
1919 registration: ProcessRegistration,
1920 ) -> Result<ProcessRecord, PluginError> {
1921 let (registration, registration_hash) =
1922 lash_core::runtime::prepare_process_registration(registration)?;
1923 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
1924 if let Some(existing) = load_process_tx(&mut tx, ®istration.id).await? {
1925 if existing.registration_hash == registration_hash {
1926 tx.commit().await.map_err(plugin_sqlx_error)?;
1927 return Ok(existing);
1928 }
1929 return Err(PluginError::Session(format!(
1930 "process `{}` registration hash conflict: existing {}, new {}",
1931 registration.id, existing.registration_hash, registration_hash
1932 )));
1933 }
1934 let now = current_epoch_ms();
1935 let record =
1936 ProcessRecord::from_prepared_registration(registration, registration_hash, now);
1937 let record_json = serde_json::to_string(&record).map_err(process_decode_error)?;
1938 sqlx::query(
1939 "INSERT INTO lash_processes (
1940 process_id, registration_hash, owner_scope_id, host_profile_id,
1941 created_at_ms, updated_at_ms, status, record_json
1942 )
1943 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
1944 )
1945 .bind(&record.id)
1946 .bind(&record.registration_hash)
1947 .bind(record.originator_scope_id().as_str())
1948 .bind(record.host_profile_id())
1949 .bind(record.created_at_ms as i64)
1950 .bind(record.updated_at_ms as i64)
1951 .bind(process_status_label(&record))
1952 .bind(record_json)
1953 .execute(&mut *tx)
1954 .await
1955 .map_err(plugin_sqlx_error)?;
1956 tx.commit().await.map_err(plugin_sqlx_error)?;
1957 Ok(record)
1958 }
1959
1960 async fn set_external_ref(
1961 &self,
1962 process_id: &str,
1963 external_ref: ProcessExternalRef,
1964 ) -> Result<ProcessRecord, PluginError> {
1965 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
1966 let mut record = load_process_tx(&mut tx, process_id)
1967 .await?
1968 .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
1969 record.external_ref = Some(external_ref);
1970 record.updated_at_ms = current_epoch_ms();
1971 save_process_tx(&mut tx, &record).await?;
1972 tx.commit().await.map_err(plugin_sqlx_error)?;
1973 Ok(record)
1974 }
1975
1976 async fn grant_handle(
1977 &self,
1978 session_scope: &SessionScope,
1979 process_id: &str,
1980 descriptor: ProcessHandleDescriptor,
1981 ) -> Result<ProcessHandleGrant, PluginError> {
1982 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
1983 if load_process_tx(&mut tx, process_id).await?.is_none() {
1984 return Err(PluginError::Session(format!(
1985 "unknown process `{process_id}`"
1986 )));
1987 }
1988 sqlx::query(
1989 "INSERT INTO lash_process_handle_grants (session_id, scope_id, process_id, descriptor_json)
1990 VALUES ($1, $2, $3, $4)
1991 ON CONFLICT (scope_id, process_id) DO UPDATE SET
1992 session_id = EXCLUDED.session_id,
1993 descriptor_json = EXCLUDED.descriptor_json",
1994 )
1995 .bind(&session_scope.session_id)
1996 .bind(session_scope.id().as_str())
1997 .bind(process_id)
1998 .bind(serde_json::to_string(&descriptor).map_err(process_decode_error)?)
1999 .execute(&mut *tx)
2000 .await
2001 .map_err(plugin_sqlx_error)?;
2002 tx.commit().await.map_err(plugin_sqlx_error)?;
2003 Ok(ProcessHandleGrant {
2004 session_id: session_scope.session_id.clone(),
2005 process_id: process_id.to_string(),
2006 descriptor,
2007 })
2008 }
2009
2010 async fn revoke_handle(
2011 &self,
2012 session_scope: &SessionScope,
2013 process_id: &str,
2014 ) -> Result<(), PluginError> {
2015 sqlx::query(
2016 "DELETE FROM lash_process_handle_grants WHERE scope_id = $1 AND process_id = $2",
2017 )
2018 .bind(session_scope.id().as_str())
2019 .bind(process_id)
2020 .execute(&self.pool)
2021 .await
2022 .map_err(plugin_sqlx_error)?;
2023 Ok(())
2024 }
2025
2026 async fn transfer_handle_grants(
2027 &self,
2028 from_scope: &SessionScope,
2029 to_scope: &SessionScope,
2030 process_ids: &[String],
2031 ) -> Result<(), PluginError> {
2032 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2033 for process_id in process_ids {
2034 let descriptor_json: Option<String> = sqlx::query_scalar(
2035 "SELECT descriptor_json FROM lash_process_handle_grants
2036 WHERE scope_id = $1 AND process_id = $2",
2037 )
2038 .bind(from_scope.id().as_str())
2039 .bind(process_id)
2040 .fetch_optional(&mut *tx)
2041 .await
2042 .map_err(plugin_sqlx_error)?;
2043 let Some(descriptor_json) = descriptor_json else {
2044 return Err(PluginError::Session(format!(
2045 "process handle `{process_id}` is not granted to session `{}`",
2046 from_scope.session_id
2047 )));
2048 };
2049 sqlx::query(
2050 "DELETE FROM lash_process_handle_grants WHERE scope_id = $1 AND process_id = $2",
2051 )
2052 .bind(from_scope.id().as_str())
2053 .bind(process_id)
2054 .execute(&mut *tx)
2055 .await
2056 .map_err(plugin_sqlx_error)?;
2057 sqlx::query(
2058 "INSERT INTO lash_process_handle_grants (session_id, scope_id, process_id, descriptor_json)
2059 VALUES ($1, $2, $3, $4)
2060 ON CONFLICT (scope_id, process_id) DO UPDATE SET
2061 session_id = EXCLUDED.session_id,
2062 descriptor_json = EXCLUDED.descriptor_json",
2063 )
2064 .bind(&to_scope.session_id)
2065 .bind(to_scope.id().as_str())
2066 .bind(process_id)
2067 .bind(descriptor_json)
2068 .execute(&mut *tx)
2069 .await
2070 .map_err(plugin_sqlx_error)?;
2071 }
2072 tx.commit().await.map_err(plugin_sqlx_error)
2073 }
2074
2075 async fn list_handle_grants(
2076 &self,
2077 session_scope: &SessionScope,
2078 ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
2079 list_grants_for_scope(&self.pool, session_scope, false).await
2080 }
2081
2082 async fn list_live_handle_grants(
2083 &self,
2084 session_scope: &SessionScope,
2085 ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
2086 list_grants_for_scope(&self.pool, session_scope, true).await
2087 }
2088
2089 async fn has_handle_grant(
2090 &self,
2091 session_scope: &SessionScope,
2092 process_id: &str,
2093 ) -> Result<bool, PluginError> {
2094 let exists: Option<i64> = sqlx::query_scalar(
2095 "SELECT 1::BIGINT FROM lash_process_handle_grants
2096 WHERE scope_id = $1 AND process_id = $2
2097 LIMIT 1",
2098 )
2099 .bind(session_scope.id().as_str())
2100 .bind(process_id)
2101 .fetch_optional(&self.pool)
2102 .await
2103 .map_err(plugin_sqlx_error)?;
2104 Ok(exists.is_some())
2105 }
2106
2107 async fn handle_grants_for_process(
2108 &self,
2109 process_id: &str,
2110 ) -> Result<Vec<ProcessHandleGrant>, PluginError> {
2111 if load_process(&self.pool, process_id).await?.is_none() {
2112 return Err(PluginError::Session(format!(
2113 "unknown process `{process_id}`"
2114 )));
2115 }
2116 let rows = sqlx::query(
2117 "SELECT session_id, descriptor_json
2118 FROM lash_process_handle_grants
2119 WHERE process_id = $1
2120 ORDER BY session_id ASC, scope_id ASC",
2121 )
2122 .bind(process_id)
2123 .fetch_all(&self.pool)
2124 .await
2125 .map_err(plugin_sqlx_error)?;
2126 let mut grants = Vec::new();
2127 for row in rows {
2128 let descriptor_json: String = row.get(1);
2129 grants.push(ProcessHandleGrant {
2130 session_id: row.get(0),
2131 process_id: process_id.to_string(),
2132 descriptor: serde_json::from_str(&descriptor_json).map_err(process_decode_error)?,
2133 });
2134 }
2135 Ok(grants)
2136 }
2137
2138 async fn delete_session_process_state(
2139 &self,
2140 session_id: &str,
2141 ) -> Result<lash_core::ProcessSessionDeleteReport, PluginError> {
2142 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2143 let rows = sqlx::query(
2144 "SELECT g.process_id, p.record_json
2145 FROM lash_process_handle_grants g
2146 JOIN lash_processes p ON p.process_id = g.process_id
2147 WHERE g.session_id = $1
2148 ORDER BY g.process_id ASC",
2149 )
2150 .bind(session_id)
2151 .fetch_all(&mut *tx)
2152 .await
2153 .map_err(plugin_sqlx_error)?;
2154 let mut removed = Vec::new();
2155 for row in rows {
2156 let process_id: String = row.get(0);
2157 let record_json: String = row.get(1);
2158 let record: ProcessRecord =
2159 serde_json::from_str(&record_json).map_err(process_decode_error)?;
2160 removed.push((process_id, record));
2161 }
2162 let deleted_wake_count = 0;
2167 let revoked = sqlx::query("DELETE FROM lash_process_handle_grants WHERE session_id = $1")
2168 .bind(session_id)
2169 .execute(&mut *tx)
2170 .await
2171 .map_err(plugin_sqlx_error)?
2172 .rows_affected() as usize;
2173 let mut orphaned_process_ids = Vec::new();
2174 let mut preserved_process_ids = Vec::new();
2175 for (process_id, record) in removed {
2176 if record.is_terminal() {
2177 continue;
2178 }
2179 let remaining: i64 = sqlx::query_scalar(
2180 "SELECT COUNT(*) FROM lash_process_handle_grants WHERE process_id = $1",
2181 )
2182 .bind(&process_id)
2183 .fetch_one(&mut *tx)
2184 .await
2185 .map_err(plugin_sqlx_error)?;
2186 if remaining == 0 {
2187 orphaned_process_ids.push(process_id);
2188 } else {
2189 preserved_process_ids.push(process_id);
2190 }
2191 }
2192 tx.commit().await.map_err(plugin_sqlx_error)?;
2193 orphaned_process_ids.sort();
2194 orphaned_process_ids.dedup();
2195 preserved_process_ids.sort();
2196 preserved_process_ids.dedup();
2197 Ok(lash_core::ProcessSessionDeleteReport {
2198 session_id: session_id.to_string(),
2199 revoked_handle_count: revoked,
2200 deleted_wake_count,
2201 orphaned_process_ids,
2202 preserved_process_ids,
2203 })
2204 }
2205
2206 async fn append_event(
2207 &self,
2208 process_id: &str,
2209 request: ProcessEventAppendRequest,
2210 ) -> Result<ProcessEventAppendResult, PluginError> {
2211 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2212 let mut record = load_process_tx(&mut tx, process_id)
2213 .await?
2214 .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
2215 let replay_lookup =
2216 if let Some(replay_key) = request.replay.as_ref().map(|r| r.key.as_str()) {
2217 load_event_by_key_tx(&mut tx, process_id, replay_key).await?
2218 } else {
2219 None
2220 };
2221 let sequence: i64 = sqlx::query_scalar(
2222 "SELECT COALESCE(MAX(sequence), 0) + 1 FROM lash_process_events WHERE process_id = $1",
2223 )
2224 .bind(process_id)
2225 .fetch_one(&mut *tx)
2226 .await
2227 .map_err(plugin_sqlx_error)?;
2228 let occurred_at_ms = current_epoch_ms();
2229 let prepared = lash_core::runtime::prepare_process_event_append(
2230 &record,
2231 request,
2232 sequence as u64,
2233 replay_lookup,
2234 occurred_at_ms,
2235 )?;
2236 if prepared.replayed {
2237 let repaired = if let Some(status) = prepared.status_update.clone() {
2238 record.status = status;
2239 if record.status.is_terminal() {
2240 record.wait = None;
2241 }
2242 record.updated_at_ms = prepared.occurred_at_ms;
2243 save_process_tx(&mut tx, &record).await?;
2244 true
2245 } else {
2246 false
2247 };
2248 tx.commit().await.map_err(plugin_sqlx_error)?;
2249 if repaired {
2250 self.notify.notify_waiters();
2251 }
2252 return Ok(ProcessEventAppendResult {
2253 event: prepared.event,
2254 wake_delivery: prepared.wake_delivery,
2255 });
2256 }
2257 let event = prepared.event;
2258 sqlx::query(
2259 "INSERT INTO lash_process_events (
2260 process_id, sequence, event_type, payload_hash, idempotency_key,
2261 occurred_at_ms, event_json
2262 )
2263 VALUES ($1, $2, $3, $4, $5, $6, $7)",
2264 )
2265 .bind(process_id)
2266 .bind(sequence)
2267 .bind(event.event_type.as_str())
2268 .bind(&prepared.payload_hash)
2269 .bind(event.invocation.replay_key())
2270 .bind(prepared.occurred_at_ms as i64)
2271 .bind(serde_json::to_string(&event).map_err(process_decode_error)?)
2272 .execute(&mut *tx)
2273 .await
2274 .map_err(plugin_sqlx_error)?;
2275 if let Some(status) = prepared.status_update.clone() {
2276 record.status = status;
2277 if record.status.is_terminal() {
2278 record.wait = None;
2279 }
2280 }
2281 record.updated_at_ms = prepared.occurred_at_ms;
2282 save_process_tx(&mut tx, &record).await?;
2283 tx.commit().await.map_err(plugin_sqlx_error)?;
2284 self.notify.notify_waiters();
2285 Ok(ProcessEventAppendResult {
2286 event,
2287 wake_delivery: prepared.wake_delivery,
2288 })
2289 }
2290
2291 async fn events_after(
2292 &self,
2293 process_id: &str,
2294 after_sequence: u64,
2295 ) -> Result<Vec<ProcessEvent>, PluginError> {
2296 if load_process(&self.pool, process_id).await?.is_none() {
2297 return Err(PluginError::Session(format!(
2298 "unknown process `{process_id}`"
2299 )));
2300 }
2301 let rows = sqlx::query(
2302 "SELECT event_json FROM lash_process_events
2303 WHERE process_id = $1 AND sequence > $2
2304 ORDER BY sequence ASC",
2305 )
2306 .bind(process_id)
2307 .bind(after_sequence as i64)
2308 .fetch_all(&self.pool)
2309 .await
2310 .map_err(plugin_sqlx_error)?;
2311 let mut events = Vec::new();
2312 for row in rows {
2313 let json: String = row.get(0);
2314 events.push(serde_json::from_str(&json).map_err(process_decode_error)?);
2315 }
2316 Ok(events)
2317 }
2318
2319 async fn count_events_through(
2320 &self,
2321 process_id: &str,
2322 event_type: &str,
2323 up_to_sequence: u64,
2324 ) -> Result<u64, PluginError> {
2325 if load_process(&self.pool, process_id).await?.is_none() {
2326 return Err(PluginError::Session(format!(
2327 "unknown process `{process_id}`"
2328 )));
2329 }
2330 let row = sqlx::query(
2331 "SELECT COUNT(*) FROM lash_process_events
2332 WHERE process_id = $1 AND event_type = $2 AND sequence <= $3",
2333 )
2334 .bind(process_id)
2335 .bind(event_type)
2336 .bind(up_to_sequence as i64)
2337 .fetch_one(&self.pool)
2338 .await
2339 .map_err(plugin_sqlx_error)?;
2340 let count: i64 = row.get(0);
2341 Ok(count as u64)
2342 }
2343
2344 async fn recent_events(
2345 &self,
2346 process_id: &str,
2347 limit: usize,
2348 ) -> Result<Vec<ProcessEvent>, PluginError> {
2349 if load_process(&self.pool, process_id).await?.is_none() {
2350 return Err(PluginError::Session(format!(
2351 "unknown process `{process_id}`"
2352 )));
2353 }
2354 let rows = sqlx::query(
2355 "SELECT event_json FROM lash_process_events
2356 WHERE process_id = $1
2357 ORDER BY sequence DESC
2358 LIMIT $2",
2359 )
2360 .bind(process_id)
2361 .bind(limit as i64)
2362 .fetch_all(&self.pool)
2363 .await
2364 .map_err(plugin_sqlx_error)?;
2365 let mut events = Vec::new();
2366 for row in rows {
2367 let json: String = row.get(0);
2368 events.push(serde_json::from_str(&json).map_err(process_decode_error)?);
2369 }
2370 events.reverse();
2371 Ok(events)
2372 }
2373
2374 async fn wake_events_after(
2375 &self,
2376 process_id: &str,
2377 after_sequence: u64,
2378 ) -> Result<Vec<ProcessEvent>, PluginError> {
2379 let rows = sqlx::query("SELECT sequence FROM lash_process_wake_acks WHERE process_id = $1")
2380 .bind(process_id)
2381 .fetch_all(&self.pool)
2382 .await
2383 .map_err(plugin_sqlx_error)?;
2384 let acked = rows
2385 .into_iter()
2386 .map(|row| row.get::<i64, _>(0) as u64)
2387 .collect::<HashSet<_>>();
2388 Ok(self
2389 .events_after(process_id, after_sequence)
2390 .await?
2391 .into_iter()
2392 .filter(|event| event.semantics.wake.is_some() && !acked.contains(&event.sequence))
2393 .collect())
2394 }
2395
2396 async fn wait_event_after(
2397 &self,
2398 process_id: &str,
2399 event_type: &str,
2400 after_sequence: u64,
2401 ) -> Result<ProcessEvent, PluginError> {
2402 loop {
2403 if let Some(event) = self
2404 .events_after(process_id, after_sequence)
2405 .await?
2406 .into_iter()
2407 .find(|event| event.event_type == event_type)
2408 {
2409 return Ok(event);
2410 }
2411 tokio::select! {
2412 _ = self.notify.notified() => {}
2413 _ = tokio::time::sleep(Duration::from_millis(50)) => {}
2414 }
2415 }
2416 }
2417
2418 async fn await_process(&self, process_id: &str) -> Result<ProcessAwaitOutput, PluginError> {
2419 loop {
2420 let record = load_process(&self.pool, process_id)
2421 .await?
2422 .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
2423 if let Some(await_output) = record.status.await_output() {
2424 return Ok(await_output.clone());
2425 }
2426 tokio::select! {
2427 _ = self.notify.notified() => {}
2428 _ = tokio::time::sleep(Duration::from_millis(50)) => {}
2429 }
2430 }
2431 }
2432
2433 async fn complete_process(
2434 &self,
2435 process_id: &str,
2436 await_output: ProcessAwaitOutput,
2437 ) -> Result<ProcessRecord, PluginError> {
2438 let event_type = match await_output.terminal_state() {
2439 lash_core::ProcessTerminalState::Completed => "process.completed",
2440 lash_core::ProcessTerminalState::Failed => "process.failed",
2441 lash_core::ProcessTerminalState::Cancelled => "process.cancelled",
2442 };
2443 self.append_event(
2444 process_id,
2445 ProcessEventAppendRequest::new(
2446 event_type,
2447 serde_json::json!({ "await_output": await_output }),
2448 )
2449 .with_replay_key(format!("process:{process_id}:terminal:{event_type}")),
2450 )
2451 .await?;
2452 load_process(&self.pool, process_id).await?.ok_or_else(|| {
2453 PluginError::Session(format!(
2454 "unknown process `{process_id}` after terminal event"
2455 ))
2456 })
2457 }
2458
2459 async fn set_process_wait(
2460 &self,
2461 process_id: &str,
2462 wait: lash_core::WaitState,
2463 ) -> Result<ProcessRecord, PluginError> {
2464 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2465 let mut record = load_process_tx(&mut tx, process_id)
2466 .await?
2467 .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
2468 if record.is_terminal() {
2469 return Err(PluginError::Session(format!(
2470 "terminal process `{process_id}` cannot enter a wait state"
2471 )));
2472 }
2473 record.wait = Some(wait);
2474 record.updated_at_ms = current_epoch_ms();
2475 save_process_tx(&mut tx, &record).await?;
2476 tx.commit().await.map_err(plugin_sqlx_error)?;
2477 self.notify.notify_waiters();
2478 Ok(record)
2479 }
2480
2481 async fn clear_process_wait(&self, process_id: &str) -> Result<ProcessRecord, PluginError> {
2482 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2483 let mut record = load_process_tx(&mut tx, process_id)
2484 .await?
2485 .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
2486 record.wait = None;
2487 record.updated_at_ms = current_epoch_ms();
2488 save_process_tx(&mut tx, &record).await?;
2489 tx.commit().await.map_err(plugin_sqlx_error)?;
2490 self.notify.notify_waiters();
2491 Ok(record)
2492 }
2493
2494 async fn get_process(&self, process_id: &str) -> Option<ProcessRecord> {
2495 load_process(&self.pool, process_id).await.ok().flatten()
2496 }
2497
2498 async fn list_processes(
2499 &self,
2500 filter: &lash_core::ProcessListFilter,
2501 ) -> Result<Vec<ProcessRecord>, PluginError> {
2502 let rows = sqlx::query(
2503 "SELECT record_json FROM lash_processes
2504 ORDER BY process_id ASC",
2505 )
2506 .fetch_all(&self.pool)
2507 .await
2508 .map_err(plugin_sqlx_error)?;
2509 let mut records = Vec::new();
2510 for row in rows {
2511 let json: String = row.get(0);
2512 let record: ProcessRecord =
2513 serde_json::from_str(&json).map_err(process_decode_error)?;
2514 if filter.matches_record(&record) {
2515 records.push(record);
2516 }
2517 }
2518 Ok(records)
2519 }
2520
2521 async fn ack_wake(&self, process_id: &str, sequence: u64) -> Result<(), PluginError> {
2522 if load_process(&self.pool, process_id).await?.is_none() {
2523 return Err(PluginError::Session(format!(
2524 "unknown process `{process_id}`"
2525 )));
2526 }
2527 sqlx::query(
2528 "INSERT INTO lash_process_wake_acks (process_id, sequence)
2529 VALUES ($1, $2)
2530 ON CONFLICT DO NOTHING",
2531 )
2532 .bind(process_id)
2533 .bind(sequence as i64)
2534 .execute(&self.pool)
2535 .await
2536 .map_err(plugin_sqlx_error)?;
2537 Ok(())
2538 }
2539
2540 async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, PluginError> {
2541 let rows = sqlx::query(
2542 "SELECT record_json FROM lash_processes
2543 WHERE status = 'running'
2544 ORDER BY process_id ASC",
2545 )
2546 .fetch_all(&self.pool)
2547 .await
2548 .map_err(plugin_sqlx_error)?;
2549 let mut records = Vec::new();
2550 for row in rows {
2551 let json: String = row.get(0);
2552 records.push(serde_json::from_str(&json).map_err(process_decode_error)?);
2553 }
2554 Ok(records)
2555 }
2556
2557 async fn claim_process_lease(
2558 &self,
2559 process_id: &str,
2560 owner_id: &str,
2561 lease_ttl_ms: u64,
2562 ) -> Result<ProcessLease, PluginError> {
2563 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2564 if load_process_tx(&mut tx, process_id).await?.is_none() {
2565 return Err(PluginError::Session(format!(
2566 "unknown process `{process_id}`"
2567 )));
2568 }
2569 let now = current_epoch_ms();
2570 let current = load_process_lease_tx(&mut tx, process_id).await?;
2571 if let Some(current) = current.as_ref()
2572 && current.expires_at_epoch_ms > now
2573 && current.owner_id != owner_id
2574 {
2575 return Err(process_lease_conflict(process_id, current));
2576 }
2577 let existing_fence: Option<i64> = sqlx::query_scalar(
2578 "SELECT lease_fencing_token FROM lash_process_leases WHERE process_id = $1 FOR UPDATE",
2579 )
2580 .bind(process_id)
2581 .fetch_optional(&mut *tx)
2582 .await
2583 .map_err(plugin_sqlx_error)?;
2584 let fencing_token = existing_fence.unwrap_or(0) as u64 + 1;
2585 let lease = ProcessLease {
2586 schema_version: PROCESS_LEASE_SCHEMA_VERSION,
2587 process_id: process_id.to_string(),
2588 owner_id: owner_id.to_string(),
2589 lease_token: format!(
2590 "{:x}",
2591 Sha256::digest(format!("{process_id}:{owner_id}:{now}:{fencing_token}").as_bytes())
2592 ),
2593 fencing_token,
2594 claimed_at_epoch_ms: now,
2595 expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
2596 };
2597 sqlx::query(
2598 "INSERT INTO lash_process_leases (
2599 process_id, lease_owner_id, lease_token, lease_fencing_token,
2600 lease_claimed_at_ms, lease_expires_at_ms
2601 )
2602 VALUES ($1, $2, $3, $4, $5, $6)
2603 ON CONFLICT (process_id) DO UPDATE SET
2604 lease_owner_id = EXCLUDED.lease_owner_id,
2605 lease_token = EXCLUDED.lease_token,
2606 lease_fencing_token = EXCLUDED.lease_fencing_token,
2607 lease_claimed_at_ms = EXCLUDED.lease_claimed_at_ms,
2608 lease_expires_at_ms = EXCLUDED.lease_expires_at_ms",
2609 )
2610 .bind(&lease.process_id)
2611 .bind(&lease.owner_id)
2612 .bind(&lease.lease_token)
2613 .bind(lease.fencing_token as i64)
2614 .bind(lease.claimed_at_epoch_ms as i64)
2615 .bind(lease.expires_at_epoch_ms as i64)
2616 .execute(&mut *tx)
2617 .await
2618 .map_err(plugin_sqlx_error)?;
2619 tx.commit().await.map_err(plugin_sqlx_error)?;
2620 Ok(lease)
2621 }
2622
2623 async fn renew_process_lease(
2624 &self,
2625 lease: &ProcessLease,
2626 lease_ttl_ms: u64,
2627 ) -> Result<ProcessLease, PluginError> {
2628 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2629 let now = current_epoch_ms();
2630 let current = load_process_lease_tx(&mut tx, &lease.process_id).await?;
2631 if !guard_lease(current.as_ref(), &lease.lease_token, now) {
2632 return Err(process_lease_expired(&lease.process_id));
2633 }
2634 let renewed = ProcessLease {
2635 expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
2636 ..lease.clone()
2637 };
2638 sqlx::query(
2639 "UPDATE lash_process_leases
2640 SET lease_expires_at_ms = $2
2641 WHERE process_id = $1 AND lease_token = $3",
2642 )
2643 .bind(&renewed.process_id)
2644 .bind(renewed.expires_at_epoch_ms as i64)
2645 .bind(&renewed.lease_token)
2646 .execute(&mut *tx)
2647 .await
2648 .map_err(plugin_sqlx_error)?;
2649 tx.commit().await.map_err(plugin_sqlx_error)?;
2650 Ok(renewed)
2651 }
2652
2653 async fn complete_process_lease(
2654 &self,
2655 completion: &ProcessLeaseCompletion,
2656 ) -> Result<(), PluginError> {
2657 sqlx::query(
2658 "UPDATE lash_process_leases
2659 SET lease_owner_id = NULL,
2660 lease_token = NULL,
2661 lease_claimed_at_ms = 0,
2662 lease_expires_at_ms = 0
2663 WHERE process_id = $1 AND lease_token = $2",
2664 )
2665 .bind(&completion.process_id)
2666 .bind(&completion.lease_token)
2667 .execute(&self.pool)
2668 .await
2669 .map_err(plugin_sqlx_error)?;
2670 Ok(())
2671 }
2672}
2673
2674#[async_trait::async_trait]
2675impl HostEventStore for PostgresHostEventStore {
2676 fn durability_tier(&self) -> DurabilityTier {
2677 DurabilityTier::Durable
2678 }
2679
2680 async fn register_subscription(
2681 &self,
2682 draft: TriggerSubscriptionDraft,
2683 ) -> Result<TriggerSubscriptionRecord, PluginError> {
2684 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2685 let seq: i64 = sqlx::query_scalar("SELECT nextval('lash_host_event_subscription_seq')")
2686 .fetch_one(&mut *tx)
2687 .await
2688 .map_err(plugin_sqlx_error)?;
2689 let handle = format!("trigger:{seq}");
2690 let subscription_id = format!("subscription:{seq}");
2691 let now = current_epoch_ms();
2692 let record = TriggerSubscriptionRecord {
2693 subscription_id: subscription_id.clone(),
2694 registrant: draft.registrant,
2695 env_ref: draft.env_ref,
2696 wake_target: draft.wake_target,
2697 handle,
2698 name: draft.name,
2699 source_type: draft.source_type,
2700 source_key: draft.source_key,
2701 source: draft.source,
2702 event_ty: draft.event_ty,
2703 module_ref: draft.module_ref,
2704 required_surface_ref: draft.required_surface_ref,
2705 process_ref: draft.process_ref,
2706 process_name: draft.process_name,
2707 input_template: draft.input_template,
2708 enabled: true,
2709 created_at_ms: now,
2710 updated_at_ms: now,
2711 };
2712 sqlx::query(
2713 "INSERT INTO lash_host_event_trigger_subscriptions (
2714 subscription_id, registrant_scope_id, handle, source_type, source_key,
2715 enabled, created_at_ms, updated_at_ms, record_json
2716 )
2717 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
2718 )
2719 .bind(&record.subscription_id)
2720 .bind(record.registrant_scope_id())
2721 .bind(&record.handle)
2722 .bind(&record.source_type)
2723 .bind(&record.source_key)
2724 .bind(record.enabled)
2725 .bind(record.created_at_ms as i64)
2726 .bind(record.updated_at_ms as i64)
2727 .bind(serde_json::to_string(&record).map_err(process_decode_error)?)
2728 .execute(&mut *tx)
2729 .await
2730 .map_err(plugin_sqlx_error)?;
2731 tx.commit().await.map_err(plugin_sqlx_error)?;
2732 Ok(record)
2733 }
2734
2735 async fn list_subscriptions(
2736 &self,
2737 filter: TriggerSubscriptionFilter,
2738 ) -> Result<Vec<TriggerSubscriptionRecord>, PluginError> {
2739 let rows = sqlx::query(
2740 "SELECT record_json FROM lash_host_event_trigger_subscriptions
2741 ORDER BY registrant_scope_id ASC, handle ASC",
2742 )
2743 .fetch_all(&self.pool)
2744 .await
2745 .map_err(plugin_sqlx_error)?;
2746 let mut records = Vec::new();
2747 for row in rows {
2748 let json: String = row.get(0);
2749 let record: TriggerSubscriptionRecord =
2750 serde_json::from_str(&json).map_err(process_decode_error)?;
2751 if filter.matches(&record) {
2752 records.push(record);
2753 }
2754 }
2755 Ok(records)
2756 }
2757
2758 async fn cancel_subscription(
2759 &self,
2760 session_id: &str,
2761 handle: &str,
2762 ) -> Result<bool, PluginError> {
2763 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2764 let rows = sqlx::query(
2765 "SELECT record_json FROM lash_host_event_trigger_subscriptions
2766 WHERE handle = $1
2767 ORDER BY registrant_scope_id ASC
2768 FOR UPDATE",
2769 )
2770 .bind(handle)
2771 .fetch_all(&mut *tx)
2772 .await
2773 .map_err(plugin_sqlx_error)?;
2774 let mut json = None;
2775 for row in rows {
2776 let candidate: String = row.get(0);
2777 let record: TriggerSubscriptionRecord =
2778 serde_json::from_str(&candidate).map_err(process_decode_error)?;
2779 if record.registrant_session_id() == Some(session_id) {
2780 json = Some(candidate);
2781 break;
2782 }
2783 }
2784 let Some(json) = json else {
2785 tx.commit().await.map_err(plugin_sqlx_error)?;
2786 return Ok(false);
2787 };
2788 let mut record: TriggerSubscriptionRecord =
2789 serde_json::from_str(&json).map_err(process_decode_error)?;
2790 let changed = record.enabled;
2791 record.enabled = false;
2792 record.updated_at_ms = current_epoch_ms();
2793 sqlx::query(
2794 "UPDATE lash_host_event_trigger_subscriptions
2795 SET enabled = $3, updated_at_ms = $4, record_json = $5
2796 WHERE registrant_scope_id = $1 AND handle = $2",
2797 )
2798 .bind(record.registrant_scope_id())
2799 .bind(handle)
2800 .bind(record.enabled)
2801 .bind(record.updated_at_ms as i64)
2802 .bind(serde_json::to_string(&record).map_err(process_decode_error)?)
2803 .execute(&mut *tx)
2804 .await
2805 .map_err(plugin_sqlx_error)?;
2806 tx.commit().await.map_err(plugin_sqlx_error)?;
2807 Ok(changed)
2808 }
2809
2810 async fn delete_session_subscriptions(&self, session_id: &str) -> Result<usize, PluginError> {
2811 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2812 let rows = sqlx::query(
2813 "SELECT subscription_id, record_json
2814 FROM lash_host_event_trigger_subscriptions
2815 ORDER BY registrant_scope_id ASC, handle ASC
2816 FOR UPDATE",
2817 )
2818 .fetch_all(&mut *tx)
2819 .await
2820 .map_err(plugin_sqlx_error)?;
2821 let mut deleted = 0usize;
2822 for row in rows {
2823 let subscription_id: String = row.get(0);
2824 let json: String = row.get(1);
2825 let record: TriggerSubscriptionRecord =
2826 serde_json::from_str(&json).map_err(process_decode_error)?;
2827 if record.registrant_session_id() != Some(session_id) {
2828 continue;
2829 }
2830 sqlx::query(
2831 "DELETE FROM lash_host_event_trigger_subscriptions WHERE subscription_id = $1",
2832 )
2833 .bind(&subscription_id)
2834 .execute(&mut *tx)
2835 .await
2836 .map_err(plugin_sqlx_error)?;
2837 deleted = deleted.saturating_add(1);
2838 }
2839 tx.commit().await.map_err(plugin_sqlx_error)?;
2840 Ok(deleted)
2841 }
2842
2843 async fn record_occurrence(
2844 &self,
2845 request: HostEventOccurrenceRequest,
2846 ) -> Result<HostEventOccurrenceRecord, PluginError> {
2847 lash_core::validate_host_event_occurrence_request(&request)?;
2848 let request_hash = lash_core::host_event_occurrence_request_hash(&request)?;
2849 let occurrence_id = lash_core::deterministic_occurrence_id(&request)?;
2850 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2851 let existing = sqlx::query(
2852 "SELECT request_hash, record_json
2853 FROM lash_host_event_occurrences
2854 WHERE idempotency_key = $1
2855 FOR UPDATE",
2856 )
2857 .bind(&request.idempotency_key)
2858 .fetch_optional(&mut *tx)
2859 .await
2860 .map_err(plugin_sqlx_error)?;
2861 if let Some(row) = existing {
2862 let existing_hash: String = row.get(0);
2863 let existing_json: String = row.get(1);
2864 if existing_hash != request_hash {
2865 return Err(PluginError::Session(format!(
2866 "host event occurrence idempotency conflict for `{}`",
2867 request.idempotency_key
2868 )));
2869 }
2870 let record = serde_json::from_str(&existing_json).map_err(process_decode_error)?;
2871 tx.commit().await.map_err(plugin_sqlx_error)?;
2872 return Ok(record);
2873 }
2874 let record = HostEventOccurrenceRecord {
2875 occurrence_id: occurrence_id.clone(),
2876 source_type: request.source_type,
2877 source_key: request.source_key,
2878 payload: request.payload,
2879 idempotency_key: request.idempotency_key.clone(),
2880 source: request.source,
2881 occurred_at_ms: current_epoch_ms(),
2882 };
2883 sqlx::query(
2884 "INSERT INTO lash_host_event_occurrences (
2885 occurrence_id, idempotency_key, request_hash, source_type, source_key,
2886 occurred_at_ms, record_json
2887 )
2888 VALUES ($1, $2, $3, $4, $5, $6, $7)",
2889 )
2890 .bind(&record.occurrence_id)
2891 .bind(&record.idempotency_key)
2892 .bind(&request_hash)
2893 .bind(&record.source_type)
2894 .bind(&record.source_key)
2895 .bind(record.occurred_at_ms as i64)
2896 .bind(serde_json::to_string(&record).map_err(process_decode_error)?)
2897 .execute(&mut *tx)
2898 .await
2899 .map_err(plugin_sqlx_error)?;
2900 tx.commit().await.map_err(plugin_sqlx_error)?;
2901 Ok(record)
2902 }
2903
2904 async fn reserve_matching_deliveries(
2905 &self,
2906 occurrence_id: &str,
2907 ) -> Result<Vec<TriggerDeliveryReservation>, PluginError> {
2908 let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
2909 let occurrence_json: Option<String> = sqlx::query_scalar(
2910 "SELECT record_json FROM lash_host_event_occurrences WHERE occurrence_id = $1",
2911 )
2912 .bind(occurrence_id)
2913 .fetch_optional(&mut *tx)
2914 .await
2915 .map_err(plugin_sqlx_error)?;
2916 let Some(occurrence_json) = occurrence_json else {
2917 return Err(PluginError::Session(format!(
2918 "unknown host event occurrence `{occurrence_id}`"
2919 )));
2920 };
2921 let occurrence: HostEventOccurrenceRecord =
2922 serde_json::from_str(&occurrence_json).map_err(process_decode_error)?;
2923 let rows = sqlx::query(
2924 "SELECT record_json FROM lash_host_event_trigger_subscriptions
2925 WHERE enabled = TRUE AND source_type = $1 AND source_key = $2
2926 ORDER BY registrant_scope_id ASC, handle ASC",
2927 )
2928 .bind(&occurrence.source_type)
2929 .bind(&occurrence.source_key)
2930 .fetch_all(&mut *tx)
2931 .await
2932 .map_err(plugin_sqlx_error)?;
2933 let mut deliveries = Vec::new();
2934 for row in rows {
2935 let json: String = row.get(0);
2936 let subscription: TriggerSubscriptionRecord =
2937 serde_json::from_str(&json).map_err(process_decode_error)?;
2938 let process_id = lash_core::deterministic_delivery_process_id(
2939 &occurrence.occurrence_id,
2940 &subscription.subscription_id,
2941 )?;
2942 let inserted = sqlx::query(
2943 "INSERT INTO lash_host_event_deliveries (
2944 occurrence_id, subscription_id, process_id, created_at_ms
2945 )
2946 VALUES ($1, $2, $3, $4)
2947 ON CONFLICT DO NOTHING",
2948 )
2949 .bind(&occurrence.occurrence_id)
2950 .bind(&subscription.subscription_id)
2951 .bind(&process_id)
2952 .bind(current_epoch_ms() as i64)
2953 .execute(&mut *tx)
2954 .await
2955 .map_err(plugin_sqlx_error)?
2956 .rows_affected();
2957 if inserted == 0 {
2958 continue;
2959 }
2960 deliveries.push(TriggerDeliveryReservation {
2961 occurrence: occurrence.clone(),
2962 subscription,
2963 process_id,
2964 });
2965 }
2966 tx.commit().await.map_err(plugin_sqlx_error)?;
2967 Ok(deliveries)
2968 }
2969}
2970
2971#[async_trait::async_trait]
2972impl lashlang::LashlangArtifactStore for PostgresLashlangArtifactStore {
2973 fn durability_tier(&self) -> DurabilityTier {
2974 DurabilityTier::Durable
2975 }
2976
2977 async fn put_module_artifact(
2978 &self,
2979 artifact: &lashlang::ModuleArtifact,
2980 ) -> Result<(), lashlang::ArtifactStoreError> {
2981 let bytes = artifact
2982 .to_store_bytes()
2983 .map_err(lashlang::ArtifactStoreError::from)?;
2984 sqlx::query(
2985 "INSERT INTO lash_lashlang_artifacts (module_ref, artifact_bytes)
2986 VALUES ($1, $2)
2987 ON CONFLICT (module_ref) DO UPDATE SET artifact_bytes = EXCLUDED.artifact_bytes",
2988 )
2989 .bind(artifact.module_ref.as_str())
2990 .bind(bytes)
2991 .execute(&self.pool)
2992 .await
2993 .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?;
2994 Ok(())
2995 }
2996
2997 async fn get_module_artifact(
2998 &self,
2999 module_ref: &lashlang::ModuleRef,
3000 ) -> Result<Option<Arc<lashlang::ModuleArtifact>>, lashlang::ArtifactStoreError> {
3001 let bytes: Option<Vec<u8>> = sqlx::query_scalar(
3002 "SELECT artifact_bytes FROM lash_lashlang_artifacts WHERE module_ref = $1",
3003 )
3004 .bind(module_ref.as_str())
3005 .fetch_optional(&self.pool)
3006 .await
3007 .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?;
3008 bytes
3009 .map(|bytes| {
3010 lashlang::ModuleArtifact::from_store_bytes(&bytes)
3011 .map(Arc::new)
3012 .map_err(lashlang::ArtifactStoreError::from)
3013 })
3014 .transpose()
3015 }
3016
3017 async fn put_artifact_bytes(
3018 &self,
3019 artifact_ref: &str,
3020 _descriptor: &str,
3021 bytes: &[u8],
3022 ) -> Result<(), lashlang::ArtifactStoreError> {
3023 sqlx::query(
3024 "INSERT INTO lash_lashlang_artifacts (module_ref, artifact_bytes)
3025 VALUES ($1, $2)
3026 ON CONFLICT (module_ref) DO UPDATE SET artifact_bytes = EXCLUDED.artifact_bytes",
3027 )
3028 .bind(artifact_ref)
3029 .bind(bytes)
3030 .execute(&self.pool)
3031 .await
3032 .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))?;
3033 Ok(())
3034 }
3035
3036 async fn get_artifact_bytes(
3037 &self,
3038 artifact_ref: &str,
3039 ) -> Result<Option<Vec<u8>>, lashlang::ArtifactStoreError> {
3040 sqlx::query_scalar(
3041 "SELECT artifact_bytes FROM lash_lashlang_artifacts WHERE module_ref = $1",
3042 )
3043 .bind(artifact_ref)
3044 .fetch_optional(&self.pool)
3045 .await
3046 .map_err(|err| lashlang::ArtifactStoreError::Backend(err.to_string()))
3047 }
3048}