Skip to main content

lash_postgres_store/
lib.rs

1//! PostgreSQL durable storage for Lash.
2//!
3//! One [`PostgresStorage`] owns a shared [`sqlx::PgPool`] and creates durable
4//! implementations for the runtime session store, process registry, trigger
5//! store, Lashlang artifact store, and attachment manifest.
6
7use std::collections::HashSet;
8use std::sync::{Arc, OnceLock};
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11use lash_core::runtime::{
12    ProcessHandleGrantEntry, QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim,
13    QueuedWorkClaimBoundary, QueuedWorkCompletion, QueuedWorkItem,
14};
15use lash_core::store::queued_work::{
16    ClaimCandidate, QueuedWorkClaimLease, claim_scan_limit, derive_batch_id, renewed_claim,
17    select_claim_prefix,
18};
19use lash_core::store::{
20    GraphCommitDelta, HydratedSessionCheckpoint, PersistedSessionRead, RuntimeCommit,
21    RuntimeCommitResult, SessionCheckpoint, SessionHeadMeta,
22};
23use lash_core::{
24    AttachmentId, AttachmentIntent, AttachmentManifest, AttachmentManifestEntry, BlobRef,
25    DeliveryPolicy, DurabilityTier, GcReport, MergeKey, ProcessAwaitOutput, ProcessEvent,
26    ProcessEventAppendRequest, ProcessEventAppendResult, ProcessExternalRef,
27    ProcessHandleDescriptor, ProcessHandleGrant, ProcessLease, ProcessLeaseCompletion,
28    ProcessRecord, ProcessRegistration, ProcessRegistry, RuntimePersistence, SessionMeta,
29    SessionNodeRecord, SessionReadScope, SessionScope, SessionStoreCreateRequest,
30    SessionStoreFactory, SlotPolicy, StoreError, TokenLedgerEntry, VacuumReport,
31};
32use lash_core::{
33    PluginError, TriggerDeliveryReservation, TriggerOccurrenceRecord, TriggerOccurrenceRequest,
34    TriggerStore, TriggerSubscriptionDraft, TriggerSubscriptionFilter, TriggerSubscriptionRecord,
35};
36use sha2::{Digest, Sha256};
37use sqlx::postgres::{PgPool, PgPoolOptions, PgRow};
38use sqlx::{Executor, Row};
39
40const SCHEMA_COMPONENT: &str = "lash-postgres-store";
41const SCHEMA_VERSION: i32 = 1;
42const PROCESS_LEASE_SCHEMA_VERSION: u32 = lash_core::PROCESS_LEASE_SCHEMA_VERSION;
43
44#[derive(Clone)]
45pub struct PostgresStorage {
46    pool: PgPool,
47}
48
49#[derive(Clone)]
50pub struct PostgresSessionStoreFactory {
51    pool: PgPool,
52}
53
54#[derive(Clone)]
55pub struct PostgresSessionStore {
56    pool: PgPool,
57    /// Explicit session binding for handles created via the factory.
58    session_id: Option<String>,
59    /// In-memory bind-on-first-commit for an *unbound* handle. A session-store
60    /// handle commits to exactly one session; an unbound handle latches the first
61    /// session it commits and rejects others (Postgres is multi-session per
62    /// database, so this can't be inferred from a singleton head row the way the
63    /// single-file SQLite store does). Shared across clones via `Arc`.
64    bound_session: Arc<OnceLock<String>>,
65}
66
67#[derive(Clone)]
68pub struct PostgresProcessRegistry {
69    pool: PgPool,
70    notify: Arc<tokio::sync::Notify>,
71}
72
73#[derive(Clone)]
74pub struct PostgresTriggerStore {
75    pool: PgPool,
76}
77
78#[derive(Clone)]
79pub struct PostgresLashlangArtifactStore {
80    pool: PgPool,
81}
82
83/// Connection-pool and per-connection timeout knobs for [`PostgresStorage`].
84///
85/// Session commits use **optimistic CAS** on the head (`UPDATE … WHERE
86/// head_revision = expected`), not a held `SELECT … FOR UPDATE`, so concurrent
87/// writers never pin a pool connection while blocked on a lock. `lock_timeout` is
88/// defense in depth: it caps how long the single CAS write may wait on the head
89/// row's lock before erroring (surfaced as a retryable conflict), so a pathological
90/// burst can never starve the pool.
91#[derive(Clone, Debug)]
92pub struct PostgresStoreConfig {
93    /// Maximum pooled connections. Default 16.
94    pub max_connections: u32,
95    /// Minimum idle connections kept warm. Default 0.
96    pub min_connections: u32,
97    /// How long `acquire` waits for a free connection before erroring. Default 30s.
98    pub acquire_timeout: Duration,
99    /// Close a connection after this idle period. Default 10m.
100    pub idle_timeout: Option<Duration>,
101    /// Recycle a connection after this lifetime. Default 30m.
102    pub max_lifetime: Option<Duration>,
103    /// Postgres `lock_timeout` applied to every connection. Default 10s.
104    pub lock_timeout: Option<Duration>,
105    /// Postgres `statement_timeout` applied to every connection. Default 30s — a
106    /// backstop so a wedged query can never hold a connection indefinitely.
107    pub statement_timeout: Option<Duration>,
108}
109
110impl Default for PostgresStoreConfig {
111    fn default() -> Self {
112        Self {
113            max_connections: 16,
114            min_connections: 0,
115            acquire_timeout: Duration::from_secs(30),
116            idle_timeout: Some(Duration::from_secs(600)),
117            max_lifetime: Some(Duration::from_secs(1800)),
118            lock_timeout: Some(Duration::from_secs(10)),
119            statement_timeout: Some(Duration::from_secs(30)),
120        }
121    }
122}
123
124impl PostgresStorage {
125    /// Connect with [`PostgresStoreConfig::default`] pool/timeout settings.
126    pub async fn connect(database_url: &str) -> Result<Self, StoreError> {
127        Self::connect_with(database_url, PostgresStoreConfig::default()).await
128    }
129
130    /// Connect with explicit pool sizing and per-connection timeouts.
131    pub async fn connect_with(
132        database_url: &str,
133        config: PostgresStoreConfig,
134    ) -> Result<Self, StoreError> {
135        let lock_ms = config.lock_timeout.map(|d| d.as_millis().max(1) as u64);
136        let statement_ms = config
137            .statement_timeout
138            .map(|d| d.as_millis().max(1) as u64);
139        let mut options = PgPoolOptions::new()
140            .max_connections(config.max_connections)
141            .min_connections(config.min_connections)
142            .acquire_timeout(config.acquire_timeout);
143        if let Some(timeout) = config.idle_timeout {
144            options = options.idle_timeout(timeout);
145        }
146        if let Some(timeout) = config.max_lifetime {
147            options = options.max_lifetime(timeout);
148        }
149        let pool = options
150            .after_connect(move |conn, _meta| {
151                Box::pin(async move {
152                    if let Some(ms) = lock_ms {
153                        conn.execute(format!("SET lock_timeout = {ms}").as_str())
154                            .await?;
155                    }
156                    if let Some(ms) = statement_ms {
157                        conn.execute(format!("SET statement_timeout = {ms}").as_str())
158                            .await?;
159                    }
160                    Ok(())
161                })
162            })
163            .connect(database_url)
164            .await
165            .map_err(store_sqlx_error)?;
166        ensure_schema(&pool).await?;
167        Ok(Self { pool })
168    }
169
170    pub fn from_pool(pool: PgPool) -> Self {
171        Self { pool }
172    }
173
174    pub fn pool(&self) -> &PgPool {
175        &self.pool
176    }
177
178    pub fn session_store_factory(&self) -> PostgresSessionStoreFactory {
179        PostgresSessionStoreFactory {
180            pool: self.pool.clone(),
181        }
182    }
183
184    pub fn session_store(&self, session_id: impl Into<String>) -> PostgresSessionStore {
185        PostgresSessionStore {
186            pool: self.pool.clone(),
187            session_id: Some(session_id.into()),
188            bound_session: Arc::new(OnceLock::new()),
189        }
190    }
191
192    pub fn unbound_session_store(&self) -> PostgresSessionStore {
193        PostgresSessionStore {
194            pool: self.pool.clone(),
195            session_id: None,
196            bound_session: Arc::new(OnceLock::new()),
197        }
198    }
199
200    pub fn process_registry(&self) -> PostgresProcessRegistry {
201        PostgresProcessRegistry {
202            pool: self.pool.clone(),
203            notify: Arc::new(tokio::sync::Notify::new()),
204        }
205    }
206
207    pub fn trigger_store(&self) -> PostgresTriggerStore {
208        PostgresTriggerStore {
209            pool: self.pool.clone(),
210        }
211    }
212
213    pub fn lashlang_artifact_store(&self) -> PostgresLashlangArtifactStore {
214        PostgresLashlangArtifactStore {
215            pool: self.pool.clone(),
216        }
217    }
218}
219
220impl PostgresSessionStoreFactory {
221    pub fn new(storage: &PostgresStorage) -> Self {
222        storage.session_store_factory()
223    }
224}
225
226impl PostgresSessionStore {
227    pub fn unbound(storage: &PostgresStorage) -> Self {
228        storage.unbound_session_store()
229    }
230
231    async fn selected_session_id(&self) -> Result<Option<String>, StoreError> {
232        if let Some(session_id) = &self.session_id {
233            return Ok(Some(session_id.clone()));
234        }
235        sqlx::query_scalar("SELECT session_id FROM lash_sessions ORDER BY session_id ASC LIMIT 1")
236            .fetch_optional(&self.pool)
237            .await
238            .map_err(store_sqlx_error)
239    }
240}
241
242include!("postgres/schema.rs");
243include!("postgres/support.rs");
244include!("postgres/attachments.rs");
245include!("postgres/session_factory.rs");
246include!("postgres/runtime_persistence.rs");
247include!("postgres/process_helpers.rs");
248include!("postgres/process_registry.rs");
249include!("postgres/trigger_store.rs");
250include!("postgres/artifact_store.rs");