Skip to main content

lash_postgres_store/
lib.rs

1//! PostgreSQL durable storage for Lash.
2//!
3//! One [`PostgresStorage`] owns a shared [`sqlx::PgPool`] and creates durable
4//! implementations for the runtime session store, process registry, trigger
5//! store, Lashlang artifact store, process execution environment store, and
6//! attachment manifest.
7
8use std::collections::HashSet;
9use std::sync::{Arc, OnceLock};
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12use lash_core::runtime::{
13    ProcessHandleGrantEntry, QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim,
14    QueuedWorkClaimBoundary, QueuedWorkCompletion, QueuedWorkItem,
15};
16use lash_core::store::queued_work::{
17    ClaimCandidate, QueuedWorkClaimLease, claim_scan_limit, derive_batch_id, renewed_claim,
18    select_claim_prefix,
19};
20use lash_core::store::{
21    GraphCommitDelta, HydratedSessionCheckpoint, PersistedSessionRead, RuntimeCommit,
22    RuntimeCommitResult, SessionCheckpoint, SessionHeadMeta,
23};
24use lash_core::{
25    AttachmentId, AttachmentIntent, AttachmentManifest, AttachmentManifestEntry, BlobRef,
26    DeliveryPolicy, DurabilityTier, GcReport, MergeKey, ProcessAwaitOutput, ProcessEvent,
27    ProcessEventAppendRequest, ProcessEventAppendResult, ProcessExternalRef,
28    ProcessHandleDescriptor, ProcessHandleGrant, ProcessLease, ProcessLeaseCompletion,
29    ProcessRecord, ProcessRegistration, ProcessRegistry, RuntimePersistence, SessionMeta,
30    SessionNodeRecord, SessionReadScope, SessionScope, SessionStoreCreateRequest,
31    SessionStoreFactory, SlotPolicy, StoreError, TokenLedgerEntry, VacuumReport,
32};
33use lash_core::{
34    PluginError, TriggerDeliveryReservation, TriggerOccurrenceRecord, TriggerOccurrenceRequest,
35    TriggerStore, TriggerSubscriptionDraft, TriggerSubscriptionFilter, TriggerSubscriptionRecord,
36};
37use sha2::{Digest, Sha256};
38use sqlx::postgres::{PgPool, PgPoolOptions, PgRow};
39use sqlx::{Executor, Row};
40
41const SCHEMA_COMPONENT: &str = "lash-postgres-store";
42const SCHEMA_VERSION: i32 = 2;
43const PROCESS_LEASE_SCHEMA_VERSION: u32 = lash_core::PROCESS_LEASE_SCHEMA_VERSION;
44
45#[derive(Clone)]
46pub struct PostgresStorage {
47    pool: PgPool,
48}
49
50#[derive(Clone)]
51pub struct PostgresSessionStoreFactory {
52    pool: PgPool,
53}
54
55#[derive(Clone)]
56pub struct PostgresSessionStore {
57    pool: PgPool,
58    /// Explicit session binding for handles created via the factory.
59    session_id: Option<String>,
60    /// In-memory bind-on-first-commit for an *unbound* handle. A session-store
61    /// handle commits to exactly one session; an unbound handle latches the first
62    /// session it commits and rejects others (Postgres is multi-session per
63    /// database, so this can't be inferred from a singleton head row the way the
64    /// single-file SQLite store does). Shared across clones via `Arc`.
65    bound_session: Arc<OnceLock<String>>,
66}
67
68#[derive(Clone)]
69pub struct PostgresProcessRegistry {
70    pool: PgPool,
71    notify: Arc<tokio::sync::Notify>,
72}
73
74#[derive(Clone)]
75pub struct PostgresTriggerStore {
76    pool: PgPool,
77}
78
79#[derive(Clone)]
80pub struct PostgresLashlangArtifactStore {
81    pool: PgPool,
82}
83
84/// Connection-pool and per-connection timeout knobs for [`PostgresStorage`].
85///
86/// Session commits use **optimistic CAS** on the head (`UPDATE … WHERE
87/// head_revision = expected`), not a held `SELECT … FOR UPDATE`, so concurrent
88/// writers never pin a pool connection while blocked on a lock. `lock_timeout` is
89/// defense in depth: it caps how long the single CAS write may wait on the head
90/// row's lock before erroring (surfaced as a retryable conflict), so a pathological
91/// burst can never starve the pool.
92#[derive(Clone, Debug)]
93pub struct PostgresStoreConfig {
94    /// Maximum pooled connections. Default 16.
95    pub max_connections: u32,
96    /// Minimum idle connections kept warm. Default 0.
97    pub min_connections: u32,
98    /// How long `acquire` waits for a free connection before erroring. Default 30s.
99    pub acquire_timeout: Duration,
100    /// Close a connection after this idle period. Default 10m.
101    pub idle_timeout: Option<Duration>,
102    /// Recycle a connection after this lifetime. Default 30m.
103    pub max_lifetime: Option<Duration>,
104    /// Postgres `lock_timeout` applied to every connection. Default 10s.
105    pub lock_timeout: Option<Duration>,
106    /// Postgres `statement_timeout` applied to every connection. Default 30s — a
107    /// backstop so a wedged query can never hold a connection indefinitely.
108    pub statement_timeout: Option<Duration>,
109}
110
111impl Default for PostgresStoreConfig {
112    fn default() -> Self {
113        Self {
114            max_connections: 16,
115            min_connections: 0,
116            acquire_timeout: Duration::from_secs(30),
117            idle_timeout: Some(Duration::from_secs(600)),
118            max_lifetime: Some(Duration::from_secs(1800)),
119            lock_timeout: Some(Duration::from_secs(10)),
120            statement_timeout: Some(Duration::from_secs(30)),
121        }
122    }
123}
124
125impl PostgresStorage {
126    /// Connect with [`PostgresStoreConfig::default`] pool/timeout settings.
127    pub async fn connect(database_url: &str) -> Result<Self, StoreError> {
128        Self::connect_with(database_url, PostgresStoreConfig::default()).await
129    }
130
131    /// Connect with explicit pool sizing and per-connection timeouts.
132    pub async fn connect_with(
133        database_url: &str,
134        config: PostgresStoreConfig,
135    ) -> Result<Self, StoreError> {
136        let lock_ms = config.lock_timeout.map(|d| d.as_millis().max(1) as u64);
137        let statement_ms = config
138            .statement_timeout
139            .map(|d| d.as_millis().max(1) as u64);
140        let mut options = PgPoolOptions::new()
141            .max_connections(config.max_connections)
142            .min_connections(config.min_connections)
143            .acquire_timeout(config.acquire_timeout);
144        if let Some(timeout) = config.idle_timeout {
145            options = options.idle_timeout(timeout);
146        }
147        if let Some(timeout) = config.max_lifetime {
148            options = options.max_lifetime(timeout);
149        }
150        let pool = options
151            .after_connect(move |conn, _meta| {
152                Box::pin(async move {
153                    if let Some(ms) = lock_ms {
154                        conn.execute(format!("SET lock_timeout = {ms}").as_str())
155                            .await?;
156                    }
157                    if let Some(ms) = statement_ms {
158                        conn.execute(format!("SET statement_timeout = {ms}").as_str())
159                            .await?;
160                    }
161                    Ok(())
162                })
163            })
164            .connect(database_url)
165            .await
166            .map_err(store_sqlx_error)?;
167        ensure_schema(&pool).await?;
168        Ok(Self { pool })
169    }
170
171    pub fn from_pool(pool: PgPool) -> Self {
172        Self { pool }
173    }
174
175    pub fn pool(&self) -> &PgPool {
176        &self.pool
177    }
178
179    pub fn session_store_factory(&self) -> PostgresSessionStoreFactory {
180        PostgresSessionStoreFactory {
181            pool: self.pool.clone(),
182        }
183    }
184
185    pub fn session_store(&self, session_id: impl Into<String>) -> PostgresSessionStore {
186        PostgresSessionStore {
187            pool: self.pool.clone(),
188            session_id: Some(session_id.into()),
189            bound_session: Arc::new(OnceLock::new()),
190        }
191    }
192
193    pub fn unbound_session_store(&self) -> PostgresSessionStore {
194        PostgresSessionStore {
195            pool: self.pool.clone(),
196            session_id: None,
197            bound_session: Arc::new(OnceLock::new()),
198        }
199    }
200
201    pub fn process_registry(&self) -> PostgresProcessRegistry {
202        PostgresProcessRegistry {
203            pool: self.pool.clone(),
204            notify: Arc::new(tokio::sync::Notify::new()),
205        }
206    }
207
208    pub fn trigger_store(&self) -> PostgresTriggerStore {
209        PostgresTriggerStore {
210            pool: self.pool.clone(),
211        }
212    }
213
214    pub fn lashlang_artifact_store(&self) -> PostgresLashlangArtifactStore {
215        PostgresLashlangArtifactStore {
216            pool: self.pool.clone(),
217        }
218    }
219
220    pub fn process_env_store(&self) -> PostgresLashlangArtifactStore {
221        PostgresLashlangArtifactStore {
222            pool: self.pool.clone(),
223        }
224    }
225}
226
227impl PostgresSessionStoreFactory {
228    pub fn new(storage: &PostgresStorage) -> Self {
229        storage.session_store_factory()
230    }
231}
232
233impl PostgresSessionStore {
234    pub fn unbound(storage: &PostgresStorage) -> Self {
235        storage.unbound_session_store()
236    }
237
238    async fn selected_session_id(&self) -> Result<Option<String>, StoreError> {
239        if let Some(session_id) = &self.session_id {
240            return Ok(Some(session_id.clone()));
241        }
242        sqlx::query_scalar("SELECT session_id FROM lash_sessions ORDER BY session_id ASC LIMIT 1")
243            .fetch_optional(&self.pool)
244            .await
245            .map_err(store_sqlx_error)
246    }
247}
248
249include!("postgres/schema.rs");
250include!("postgres/support.rs");
251include!("postgres/attachments.rs");
252include!("postgres/session_factory.rs");
253include!("postgres/runtime_persistence.rs");
254include!("postgres/process_helpers.rs");
255include!("postgres/process_registry.rs");
256include!("postgres/trigger_store.rs");
257include!("postgres/artifact_store.rs");