1use super::*;
8
9pub struct SqliteTriggerStore {
10 conn: SqliteConnection,
11}
12
13impl SqliteTriggerStore {
14 pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
15 let conn = SqliteConnection::open(path).await?;
16 ensure_trigger_schema(&conn).await?;
17 apply_pragmas(&conn, StoreBacking::File).await?;
18 Ok(Self { conn })
19 }
20
21 pub async fn memory() -> tokio_rusqlite::Result<Self> {
22 let conn = SqliteConnection::open_in_memory().await?;
23 ensure_trigger_schema(&conn).await?;
24 apply_pragmas(&conn, StoreBacking::Memory).await?;
25 Ok(Self { conn })
26 }
27
28 fn encode_json<T: serde::Serialize>(value: &T) -> Result<String, lash_core::PluginError> {
29 serde_json::to_string(value).map_err(|err| {
30 lash_core::PluginError::Session(format!("failed to encode trigger row: {err}"))
31 })
32 }
33
34 fn decode_subscription(
35 json: String,
36 ) -> Result<lash_core::TriggerSubscriptionRecord, lash_core::PluginError> {
37 serde_json::from_str(&json).map_err(|err| {
38 lash_core::PluginError::Session(format!(
39 "failed to decode trigger subscription row: {err}"
40 ))
41 })
42 }
43
44 fn decode_occurrence(
45 json: String,
46 ) -> Result<lash_core::TriggerOccurrenceRecord, lash_core::PluginError> {
47 serde_json::from_str(&json).map_err(|err| {
48 lash_core::PluginError::Session(format!(
49 "failed to decode trigger occurrence row: {err}"
50 ))
51 })
52 }
53
54 fn session_registrant_scope_id(session_id: &str) -> String {
55 lash_core::ProcessOriginator::session(lash_core::SessionScope::new(session_id)).scope_id()
56 }
57}
58
59fn trigger_tx_outcome<T>(
60 result: Result<T, lash_core::PluginError>,
61) -> TxOutcome<Result<T, lash_core::PluginError>> {
62 match result {
63 Ok(value) => TxOutcome::Commit(Ok(value)),
64 Err(err) => TxOutcome::Rollback(Err(err)),
65 }
66}
67
68#[async_trait::async_trait]
69impl lash_core::TriggerStore for SqliteTriggerStore {
70 fn durability_tier(&self) -> DurabilityTier {
71 DurabilityTier::Durable
72 }
73
74 async fn register_subscription(
75 &self,
76 draft: lash_core::TriggerSubscriptionDraft,
77 ) -> Result<lash_core::TriggerSubscriptionRecord, lash_core::PluginError> {
78 self.conn
79 .write_flow(move |tx| {
80 Ok(trigger_tx_outcome((|| {
81 tx.execute("INSERT INTO trigger_subscription_seq DEFAULT VALUES", [])
82 .map_err(process_sqlite_error)?;
83 let seq = tx.last_insert_rowid();
84 let handle = format!("trigger:{seq}");
85 let subscription_id = format!("subscription:{seq}");
86 let now = current_epoch_ms();
87 let record = lash_core::TriggerSubscriptionRecord {
88 subscription_id: subscription_id.clone(),
89 registrant: draft.registrant,
90 env_ref: draft.env_ref,
91 wake_target: draft.wake_target,
92 handle,
93 name: draft.name,
94 source_type: draft.source_type,
95 source_key: draft.source_key,
96 source: draft.source,
97 event_ty: draft.event_ty,
98 module_ref: draft.module_ref,
99 host_requirements_ref: draft.host_requirements_ref,
100 process_ref: draft.process_ref,
101 process_name: draft.process_name,
102 input_template: draft.input_template,
103 enabled: true,
104 created_at_ms: now,
105 updated_at_ms: now,
106 };
107 tx.execute(
108 "INSERT INTO trigger_subscriptions (
109 subscription_id, registrant_scope_id, handle, source_type, source_key,
110 enabled, created_at_ms, updated_at_ms, record_json
111 )
112 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
113 params![
114 record.subscription_id.as_str(),
115 record.registrant_scope_id().as_str(),
116 record.handle.as_str(),
117 record.source_type.as_str(),
118 record.source_key.as_str(),
119 i64::from(record.enabled),
120 record.created_at_ms as i64,
121 record.updated_at_ms as i64,
122 Self::encode_json(&record)?,
123 ],
124 )
125 .map_err(process_sqlite_error)?;
126 Ok(record)
127 })()))
128 })
129 .await
130 .map_err(process_sqlite_error)?
131 }
132
133 async fn list_subscriptions(
134 &self,
135 filter: lash_core::TriggerSubscriptionFilter,
136 ) -> Result<Vec<lash_core::TriggerSubscriptionRecord>, lash_core::PluginError> {
137 self.conn
138 .call(move |conn| {
139 Ok((|| {
140 let mut sql =
141 "SELECT subscription_id, record_json FROM trigger_subscriptions WHERE 1 = 1"
142 .to_string();
143 let mut values = Vec::<rusqlite::types::Value>::new();
144 if let Some(session_id) = filter.session_id.as_ref() {
145 sql.push_str(" AND registrant_scope_id = ?");
146 values.push(Self::session_registrant_scope_id(session_id).into());
147 }
148 if let Some(handle) = filter.handle.as_ref() {
149 sql.push_str(" AND handle = ?");
150 values.push(handle.clone().into());
151 }
152 if let Some(source_type) = filter.source_type.as_ref() {
153 sql.push_str(" AND source_type = ?");
154 values.push(source_type.clone().into());
155 }
156 if let Some(source_key) = filter.source_key.as_ref() {
157 sql.push_str(" AND source_key = ?");
158 values.push(source_key.clone().into());
159 }
160 if let Some(enabled) = filter.enabled {
161 sql.push_str(" AND enabled = ?");
162 values.push(i64::from(enabled).into());
163 }
164 sql.push_str(" ORDER BY registrant_scope_id ASC, handle ASC");
165 let mut stmt = conn.prepare(&sql).map_err(process_sqlite_error)?;
166 let rows = stmt
167 .query_map(rusqlite::params_from_iter(values.iter()), |row| {
168 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
169 })
170 .map_err(process_sqlite_error)?;
171 let mut records = Vec::new();
172 for row in rows {
173 let (subscription_id, json) = row.map_err(process_sqlite_error)?;
174 let record = match Self::decode_subscription(json) {
175 Ok(record) => record,
176 Err(err) => {
177 tracing::warn!(
178 error = %err,
179 subscription_id,
180 "skipping malformed trigger subscription during listing"
181 );
182 continue;
183 }
184 };
185 if filter.matches(&record) {
186 records.push(record);
187 }
188 }
189 Ok(records)
190 })())
191 })
192 .await
193 .map_err(process_sqlite_error)?
194 }
195
196 async fn cancel_subscription(
197 &self,
198 session_id: &str,
199 handle: &str,
200 ) -> Result<bool, lash_core::PluginError> {
201 let session_id = session_id.to_string();
202 let handle = handle.to_string();
203 self.conn
204 .write_flow(move |tx| {
205 Ok(trigger_tx_outcome((|| {
206 let registrant_scope_id = Self::session_registrant_scope_id(&session_id);
207 let row: Option<(i64, String)> = tx
208 .query_row(
209 "SELECT enabled, record_json
210 FROM trigger_subscriptions
211 WHERE registrant_scope_id = ?1 AND handle = ?2",
212 params![registrant_scope_id.as_str(), handle.as_str()],
213 |row| Ok((row.get(0)?, row.get(1)?)),
214 )
215 .optional()
216 .map_err(process_sqlite_error)?;
217 let Some((enabled, json)) = row else {
218 return Ok(false);
219 };
220 let changed = enabled != 0;
221 let updated_at_ms = current_epoch_ms();
222 match Self::decode_subscription(json) {
223 Ok(mut record) => {
224 record.enabled = false;
225 record.updated_at_ms = updated_at_ms;
226 tx.execute(
227 "UPDATE trigger_subscriptions
228 SET enabled = ?3, updated_at_ms = ?4, record_json = ?5
229 WHERE registrant_scope_id = ?1 AND handle = ?2",
230 params![
231 registrant_scope_id.as_str(),
232 handle.as_str(),
233 i64::from(record.enabled),
234 record.updated_at_ms as i64,
235 Self::encode_json(&record)?,
236 ],
237 )
238 .map_err(process_sqlite_error)?;
239 }
240 Err(err) => {
241 tracing::warn!(
242 error = %err,
243 registrant_scope_id,
244 handle,
245 "disabling malformed trigger subscription without rewriting record JSON"
246 );
247 tx.execute(
248 "UPDATE trigger_subscriptions
249 SET enabled = ?3, updated_at_ms = ?4
250 WHERE registrant_scope_id = ?1 AND handle = ?2",
251 params![
252 registrant_scope_id.as_str(),
253 handle.as_str(),
254 0i64,
255 updated_at_ms as i64,
256 ],
257 )
258 .map_err(process_sqlite_error)?;
259 }
260 }
261 Ok(changed)
262 })()))
263 })
264 .await
265 .map_err(process_sqlite_error)?
266 }
267
268 async fn delete_session_subscriptions(
269 &self,
270 session_id: &str,
271 ) -> Result<usize, lash_core::PluginError> {
272 let session_id = session_id.to_string();
273 self.conn
274 .write_flow(move |tx| {
275 Ok(trigger_tx_outcome((|| {
276 let registrant_scope_id = Self::session_registrant_scope_id(&session_id);
277 let deleted = tx
278 .execute(
279 "DELETE FROM trigger_subscriptions WHERE registrant_scope_id = ?1",
280 params![registrant_scope_id.as_str()],
281 )
282 .map_err(process_sqlite_error)?;
283 Ok(deleted)
284 })()))
285 })
286 .await
287 .map_err(process_sqlite_error)?
288 }
289
290 async fn record_occurrence(
291 &self,
292 request: lash_core::TriggerOccurrenceRequest,
293 ) -> Result<lash_core::TriggerOccurrenceRecord, lash_core::PluginError> {
294 lash_core::validate_trigger_occurrence_request(&request)?;
295 let request_hash = lash_core::trigger_occurrence_request_hash(&request)?;
296 let occurrence_id = lash_core::deterministic_occurrence_id(&request)?;
297 self.conn
298 .write_flow(move |tx| {
299 Ok(trigger_tx_outcome((|| {
300 let existing: Option<(String, String)> = tx
301 .query_row(
302 "SELECT request_hash, record_json
303 FROM trigger_occurrences
304 WHERE idempotency_key = ?1",
305 params![request.idempotency_key.as_str()],
306 |row| Ok((row.get(0)?, row.get(1)?)),
307 )
308 .optional()
309 .map_err(process_sqlite_error)?;
310 if let Some((existing_hash, existing_json)) = existing {
311 if existing_hash != request_hash {
312 return Err(lash_core::PluginError::Session(format!(
313 "trigger occurrence idempotency conflict for `{}`",
314 request.idempotency_key
315 )));
316 }
317 return Self::decode_occurrence(existing_json);
318 }
319 let record = lash_core::TriggerOccurrenceRecord {
320 occurrence_id: occurrence_id.clone(),
321 source_type: request.source_type,
322 source_key: request.source_key,
323 payload: request.payload,
324 idempotency_key: request.idempotency_key,
325 source: request.source,
326 occurred_at_ms: current_epoch_ms(),
327 };
328 tx.execute(
329 "INSERT INTO trigger_occurrences (
330 occurrence_id, idempotency_key, request_hash, source_type,
331 source_key, occurred_at_ms, record_json
332 )
333 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
334 params![
335 record.occurrence_id.as_str(),
336 record.idempotency_key.as_str(),
337 request_hash.as_str(),
338 record.source_type.as_str(),
339 record.source_key.as_str(),
340 record.occurred_at_ms as i64,
341 Self::encode_json(&record)?,
342 ],
343 )
344 .map_err(process_sqlite_error)?;
345 Ok(record)
346 })()))
347 })
348 .await
349 .map_err(process_sqlite_error)?
350 }
351
352 async fn reserve_matching_deliveries(
353 &self,
354 occurrence_id: &str,
355 ) -> Result<Vec<lash_core::TriggerDeliveryReservation>, lash_core::PluginError> {
356 let occurrence_id = occurrence_id.to_string();
357 self.conn
358 .write_flow(move |tx| {
359 Ok(trigger_tx_outcome((|| {
360 let occurrence_json: Option<String> = tx
361 .query_row(
362 "SELECT record_json
363 FROM trigger_occurrences
364 WHERE occurrence_id = ?1",
365 params![occurrence_id.as_str()],
366 |row| row.get(0),
367 )
368 .optional()
369 .map_err(process_sqlite_error)?;
370 let Some(occurrence_json) = occurrence_json else {
371 return Err(lash_core::PluginError::Session(format!(
372 "unknown trigger occurrence `{occurrence_id}`"
373 )));
374 };
375 let occurrence = Self::decode_occurrence(occurrence_json)?;
376 let subscriptions = {
377 let mut stmt = tx
378 .prepare(
379 "SELECT subscription_id, record_json
380 FROM trigger_subscriptions
381 WHERE enabled = 1 AND source_type = ?1 AND source_key = ?2
382 ORDER BY registrant_scope_id ASC, handle ASC",
383 )
384 .map_err(process_sqlite_error)?;
385 let rows = stmt
386 .query_map(
387 params![
388 occurrence.source_type.as_str(),
389 occurrence.source_key.as_str()
390 ],
391 |row| {
392 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
393 },
394 )
395 .map_err(process_sqlite_error)?;
396 let mut subscriptions = Vec::new();
397 for row in rows {
398 let (subscription_id, json) = row.map_err(process_sqlite_error)?;
399 match Self::decode_subscription(json) {
400 Ok(subscription) => subscriptions.push(subscription),
401 Err(err) => tracing::warn!(
402 error = %err,
403 subscription_id,
404 occurrence_id = %occurrence.occurrence_id,
405 "skipping malformed trigger subscription during delivery reservation"
406 ),
407 }
408 }
409 subscriptions
410 };
411 let mut reservations = Vec::new();
412 for subscription in subscriptions {
413 let process_id = lash_core::deterministic_delivery_process_id(
414 &occurrence.occurrence_id,
415 &subscription.subscription_id,
416 )?;
417 let inserted = tx
418 .execute(
419 "INSERT OR IGNORE INTO trigger_deliveries (
420 occurrence_id, subscription_id, process_id, created_at_ms
421 )
422 VALUES (?1, ?2, ?3, ?4)",
423 params![
424 occurrence.occurrence_id.as_str(),
425 subscription.subscription_id.as_str(),
426 process_id.as_str(),
427 current_epoch_ms() as i64,
428 ],
429 )
430 .map_err(process_sqlite_error)?;
431 if inserted == 0 {
432 continue;
433 }
434 reservations.push(lash_core::TriggerDeliveryReservation {
435 occurrence: occurrence.clone(),
436 subscription,
437 process_id,
438 });
439 }
440 Ok(reservations)
441 })()))
442 })
443 .await
444 .map_err(process_sqlite_error)?
445 }
446}