1use std::collections::HashSet;
9use std::sync::{Arc, OnceLock};
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12use lash_core::runtime::{
13 ProcessHandleGrantEntry, QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim,
14 QueuedWorkClaimBoundary, QueuedWorkCompletion, QueuedWorkItem,
15};
16use lash_core::store::queued_work::{
17 ClaimCandidate, QueuedWorkClaimLease, claim_scan_limit, derive_batch_id, renewed_claim,
18 select_claim_prefix,
19};
20use lash_core::store::{
21 GraphCommitDelta, HydratedSessionCheckpoint, PersistedSessionRead, RuntimeCommit,
22 RuntimeCommitResult, SessionCheckpoint, SessionHeadMeta,
23};
24use lash_core::{
25 AttachmentId, AttachmentIntent, AttachmentManifest, AttachmentManifestEntry, BlobRef,
26 DeliveryPolicy, DurabilityTier, GcReport, MergeKey, ProcessAwaitOutput, ProcessEvent,
27 ProcessEventAppendRequest, ProcessEventAppendResult, ProcessExternalRef,
28 ProcessHandleDescriptor, ProcessHandleGrant, ProcessLease, ProcessLeaseCompletion,
29 ProcessRecord, ProcessRegistration, ProcessRegistry, RuntimePersistence, SessionMeta,
30 SessionNodeRecord, SessionReadScope, SessionScope, SessionStoreCreateRequest,
31 SessionStoreFactory, SlotPolicy, StoreError, TokenLedgerEntry, VacuumReport,
32};
33use lash_core::{
34 PluginError, TriggerDeliveryReservation, TriggerOccurrenceRecord, TriggerOccurrenceRequest,
35 TriggerStore, TriggerSubscriptionDraft, TriggerSubscriptionFilter, TriggerSubscriptionRecord,
36};
37use sha2::{Digest, Sha256};
38use sqlx::postgres::{PgPool, PgPoolOptions, PgRow};
39use sqlx::{Executor, Row};
40
41const SCHEMA_COMPONENT: &str = "lash-postgres-store";
42const SCHEMA_VERSION: i32 = 2;
43const PROCESS_LEASE_SCHEMA_VERSION: u32 = lash_core::PROCESS_LEASE_SCHEMA_VERSION;
44
45#[derive(Clone)]
46pub struct PostgresStorage {
47 pool: PgPool,
48}
49
50#[derive(Clone)]
51pub struct PostgresSessionStoreFactory {
52 pool: PgPool,
53}
54
55#[derive(Clone)]
56pub struct PostgresSessionStore {
57 pool: PgPool,
58 session_id: Option<String>,
60 bound_session: Arc<OnceLock<String>>,
66}
67
68#[derive(Clone)]
69pub struct PostgresProcessRegistry {
70 pool: PgPool,
71 notify: Arc<tokio::sync::Notify>,
72}
73
74#[derive(Clone)]
75pub struct PostgresTriggerStore {
76 pool: PgPool,
77}
78
79#[derive(Clone)]
80pub struct PostgresLashlangArtifactStore {
81 pool: PgPool,
82}
83
84#[derive(Clone, Debug)]
93pub struct PostgresStoreConfig {
94 pub max_connections: u32,
96 pub min_connections: u32,
98 pub acquire_timeout: Duration,
100 pub idle_timeout: Option<Duration>,
102 pub max_lifetime: Option<Duration>,
104 pub lock_timeout: Option<Duration>,
106 pub statement_timeout: Option<Duration>,
109}
110
111impl Default for PostgresStoreConfig {
112 fn default() -> Self {
113 Self {
114 max_connections: 16,
115 min_connections: 0,
116 acquire_timeout: Duration::from_secs(30),
117 idle_timeout: Some(Duration::from_secs(600)),
118 max_lifetime: Some(Duration::from_secs(1800)),
119 lock_timeout: Some(Duration::from_secs(10)),
120 statement_timeout: Some(Duration::from_secs(30)),
121 }
122 }
123}
124
125impl PostgresStorage {
126 pub async fn connect(database_url: &str) -> Result<Self, StoreError> {
128 Self::connect_with(database_url, PostgresStoreConfig::default()).await
129 }
130
131 pub async fn connect_with(
133 database_url: &str,
134 config: PostgresStoreConfig,
135 ) -> Result<Self, StoreError> {
136 let lock_ms = config.lock_timeout.map(|d| d.as_millis().max(1) as u64);
137 let statement_ms = config
138 .statement_timeout
139 .map(|d| d.as_millis().max(1) as u64);
140 let mut options = PgPoolOptions::new()
141 .max_connections(config.max_connections)
142 .min_connections(config.min_connections)
143 .acquire_timeout(config.acquire_timeout);
144 if let Some(timeout) = config.idle_timeout {
145 options = options.idle_timeout(timeout);
146 }
147 if let Some(timeout) = config.max_lifetime {
148 options = options.max_lifetime(timeout);
149 }
150 let pool = options
151 .after_connect(move |conn, _meta| {
152 Box::pin(async move {
153 if let Some(ms) = lock_ms {
154 conn.execute(format!("SET lock_timeout = {ms}").as_str())
155 .await?;
156 }
157 if let Some(ms) = statement_ms {
158 conn.execute(format!("SET statement_timeout = {ms}").as_str())
159 .await?;
160 }
161 Ok(())
162 })
163 })
164 .connect(database_url)
165 .await
166 .map_err(store_sqlx_error)?;
167 ensure_schema(&pool).await?;
168 Ok(Self { pool })
169 }
170
171 pub fn from_pool(pool: PgPool) -> Self {
172 Self { pool }
173 }
174
175 pub fn pool(&self) -> &PgPool {
176 &self.pool
177 }
178
179 pub fn session_store_factory(&self) -> PostgresSessionStoreFactory {
180 PostgresSessionStoreFactory {
181 pool: self.pool.clone(),
182 }
183 }
184
185 pub fn session_store(&self, session_id: impl Into<String>) -> PostgresSessionStore {
186 PostgresSessionStore {
187 pool: self.pool.clone(),
188 session_id: Some(session_id.into()),
189 bound_session: Arc::new(OnceLock::new()),
190 }
191 }
192
193 pub fn unbound_session_store(&self) -> PostgresSessionStore {
194 PostgresSessionStore {
195 pool: self.pool.clone(),
196 session_id: None,
197 bound_session: Arc::new(OnceLock::new()),
198 }
199 }
200
201 pub fn process_registry(&self) -> PostgresProcessRegistry {
202 PostgresProcessRegistry {
203 pool: self.pool.clone(),
204 notify: Arc::new(tokio::sync::Notify::new()),
205 }
206 }
207
208 pub fn trigger_store(&self) -> PostgresTriggerStore {
209 PostgresTriggerStore {
210 pool: self.pool.clone(),
211 }
212 }
213
214 pub fn lashlang_artifact_store(&self) -> PostgresLashlangArtifactStore {
215 PostgresLashlangArtifactStore {
216 pool: self.pool.clone(),
217 }
218 }
219
220 pub fn process_env_store(&self) -> PostgresLashlangArtifactStore {
221 PostgresLashlangArtifactStore {
222 pool: self.pool.clone(),
223 }
224 }
225}
226
227impl PostgresSessionStoreFactory {
228 pub fn new(storage: &PostgresStorage) -> Self {
229 storage.session_store_factory()
230 }
231}
232
233impl PostgresSessionStore {
234 pub fn unbound(storage: &PostgresStorage) -> Self {
235 storage.unbound_session_store()
236 }
237
238 async fn selected_session_id(&self) -> Result<Option<String>, StoreError> {
239 if let Some(session_id) = &self.session_id {
240 return Ok(Some(session_id.clone()));
241 }
242 sqlx::query_scalar("SELECT session_id FROM lash_sessions ORDER BY session_id ASC LIMIT 1")
243 .fetch_optional(&self.pool)
244 .await
245 .map_err(store_sqlx_error)
246 }
247}
248
249include!("postgres/schema.rs");
250include!("postgres/support.rs");
251include!("postgres/attachments.rs");
252include!("postgres/session_factory.rs");
253include!("postgres/runtime_persistence.rs");
254include!("postgres/process_helpers.rs");
255include!("postgres/process_registry.rs");
256include!("postgres/trigger_store.rs");
257include!("postgres/artifact_store.rs");