1use std::collections::HashSet;
8use std::sync::{Arc, OnceLock};
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11use lash_core::runtime::{
12 ProcessHandleGrantEntry, QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim,
13 QueuedWorkClaimBoundary, QueuedWorkCompletion, QueuedWorkItem,
14};
15use lash_core::store::queued_work::{
16 ClaimCandidate, QueuedWorkClaimLease, claim_scan_limit, derive_batch_id, renewed_claim,
17 select_claim_prefix,
18};
19use lash_core::store::{
20 GraphCommitDelta, HydratedSessionCheckpoint, PersistedSessionRead, RuntimeCommit,
21 RuntimeCommitResult, SessionCheckpoint, SessionHeadMeta,
22};
23use lash_core::{
24 AttachmentId, AttachmentIntent, AttachmentManifest, AttachmentManifestEntry, BlobRef,
25 DeliveryPolicy, DurabilityTier, GcReport, MergeKey, ProcessAwaitOutput, ProcessEvent,
26 ProcessEventAppendRequest, ProcessEventAppendResult, ProcessExternalRef,
27 ProcessHandleDescriptor, ProcessHandleGrant, ProcessLease, ProcessLeaseCompletion,
28 ProcessRecord, ProcessRegistration, ProcessRegistry, RuntimePersistence, SessionMeta,
29 SessionNodeRecord, SessionReadScope, SessionScope, SessionStoreCreateRequest,
30 SessionStoreFactory, SlotPolicy, StoreError, TokenLedgerEntry, VacuumReport,
31};
32use lash_core::{
33 PluginError, TriggerDeliveryReservation, TriggerOccurrenceRecord, TriggerOccurrenceRequest,
34 TriggerStore, TriggerSubscriptionDraft, TriggerSubscriptionFilter, TriggerSubscriptionRecord,
35};
36use sha2::{Digest, Sha256};
37use sqlx::postgres::{PgPool, PgPoolOptions, PgRow};
38use sqlx::{Executor, Row};
39
40const SCHEMA_COMPONENT: &str = "lash-postgres-store";
41const SCHEMA_VERSION: i32 = 1;
42const PROCESS_LEASE_SCHEMA_VERSION: u32 = lash_core::PROCESS_LEASE_SCHEMA_VERSION;
43
44#[derive(Clone)]
45pub struct PostgresStorage {
46 pool: PgPool,
47}
48
49#[derive(Clone)]
50pub struct PostgresSessionStoreFactory {
51 pool: PgPool,
52}
53
54#[derive(Clone)]
55pub struct PostgresSessionStore {
56 pool: PgPool,
57 session_id: Option<String>,
59 bound_session: Arc<OnceLock<String>>,
65}
66
67#[derive(Clone)]
68pub struct PostgresProcessRegistry {
69 pool: PgPool,
70 notify: Arc<tokio::sync::Notify>,
71}
72
73#[derive(Clone)]
74pub struct PostgresTriggerStore {
75 pool: PgPool,
76}
77
78#[derive(Clone)]
79pub struct PostgresLashlangArtifactStore {
80 pool: PgPool,
81}
82
83#[derive(Clone, Debug)]
92pub struct PostgresStoreConfig {
93 pub max_connections: u32,
95 pub min_connections: u32,
97 pub acquire_timeout: Duration,
99 pub idle_timeout: Option<Duration>,
101 pub max_lifetime: Option<Duration>,
103 pub lock_timeout: Option<Duration>,
105 pub statement_timeout: Option<Duration>,
108}
109
110impl Default for PostgresStoreConfig {
111 fn default() -> Self {
112 Self {
113 max_connections: 16,
114 min_connections: 0,
115 acquire_timeout: Duration::from_secs(30),
116 idle_timeout: Some(Duration::from_secs(600)),
117 max_lifetime: Some(Duration::from_secs(1800)),
118 lock_timeout: Some(Duration::from_secs(10)),
119 statement_timeout: Some(Duration::from_secs(30)),
120 }
121 }
122}
123
124impl PostgresStorage {
125 pub async fn connect(database_url: &str) -> Result<Self, StoreError> {
127 Self::connect_with(database_url, PostgresStoreConfig::default()).await
128 }
129
130 pub async fn connect_with(
132 database_url: &str,
133 config: PostgresStoreConfig,
134 ) -> Result<Self, StoreError> {
135 let lock_ms = config.lock_timeout.map(|d| d.as_millis().max(1) as u64);
136 let statement_ms = config
137 .statement_timeout
138 .map(|d| d.as_millis().max(1) as u64);
139 let mut options = PgPoolOptions::new()
140 .max_connections(config.max_connections)
141 .min_connections(config.min_connections)
142 .acquire_timeout(config.acquire_timeout);
143 if let Some(timeout) = config.idle_timeout {
144 options = options.idle_timeout(timeout);
145 }
146 if let Some(timeout) = config.max_lifetime {
147 options = options.max_lifetime(timeout);
148 }
149 let pool = options
150 .after_connect(move |conn, _meta| {
151 Box::pin(async move {
152 if let Some(ms) = lock_ms {
153 conn.execute(format!("SET lock_timeout = {ms}").as_str())
154 .await?;
155 }
156 if let Some(ms) = statement_ms {
157 conn.execute(format!("SET statement_timeout = {ms}").as_str())
158 .await?;
159 }
160 Ok(())
161 })
162 })
163 .connect(database_url)
164 .await
165 .map_err(store_sqlx_error)?;
166 ensure_schema(&pool).await?;
167 Ok(Self { pool })
168 }
169
170 pub fn from_pool(pool: PgPool) -> Self {
171 Self { pool }
172 }
173
174 pub fn pool(&self) -> &PgPool {
175 &self.pool
176 }
177
178 pub fn session_store_factory(&self) -> PostgresSessionStoreFactory {
179 PostgresSessionStoreFactory {
180 pool: self.pool.clone(),
181 }
182 }
183
184 pub fn session_store(&self, session_id: impl Into<String>) -> PostgresSessionStore {
185 PostgresSessionStore {
186 pool: self.pool.clone(),
187 session_id: Some(session_id.into()),
188 bound_session: Arc::new(OnceLock::new()),
189 }
190 }
191
192 pub fn unbound_session_store(&self) -> PostgresSessionStore {
193 PostgresSessionStore {
194 pool: self.pool.clone(),
195 session_id: None,
196 bound_session: Arc::new(OnceLock::new()),
197 }
198 }
199
200 pub fn process_registry(&self) -> PostgresProcessRegistry {
201 PostgresProcessRegistry {
202 pool: self.pool.clone(),
203 notify: Arc::new(tokio::sync::Notify::new()),
204 }
205 }
206
207 pub fn trigger_store(&self) -> PostgresTriggerStore {
208 PostgresTriggerStore {
209 pool: self.pool.clone(),
210 }
211 }
212
213 pub fn lashlang_artifact_store(&self) -> PostgresLashlangArtifactStore {
214 PostgresLashlangArtifactStore {
215 pool: self.pool.clone(),
216 }
217 }
218}
219
220impl PostgresSessionStoreFactory {
221 pub fn new(storage: &PostgresStorage) -> Self {
222 storage.session_store_factory()
223 }
224}
225
226impl PostgresSessionStore {
227 pub fn unbound(storage: &PostgresStorage) -> Self {
228 storage.unbound_session_store()
229 }
230
231 async fn selected_session_id(&self) -> Result<Option<String>, StoreError> {
232 if let Some(session_id) = &self.session_id {
233 return Ok(Some(session_id.clone()));
234 }
235 sqlx::query_scalar("SELECT session_id FROM lash_sessions ORDER BY session_id ASC LIMIT 1")
236 .fetch_optional(&self.pool)
237 .await
238 .map_err(store_sqlx_error)
239 }
240}
241
242include!("postgres/schema.rs");
243include!("postgres/support.rs");
244include!("postgres/attachments.rs");
245include!("postgres/session_factory.rs");
246include!("postgres/runtime_persistence.rs");
247include!("postgres/process_helpers.rs");
248include!("postgres/process_registry.rs");
249include!("postgres/trigger_store.rs");
250include!("postgres/artifact_store.rs");