1use super::*;
8
9pub struct SqliteTriggerStore {
10 conn: SqliteConnection,
11}
12
13impl SqliteTriggerStore {
14 pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
15 let conn = SqliteConnection::open(path).await?;
16 ensure_trigger_schema(&conn).await?;
17 apply_pragmas(&conn, StoreBacking::File).await?;
18 Ok(Self { conn })
19 }
20
21 pub async fn memory() -> tokio_rusqlite::Result<Self> {
22 let conn = SqliteConnection::open_in_memory().await?;
23 ensure_trigger_schema(&conn).await?;
24 apply_pragmas(&conn, StoreBacking::Memory).await?;
25 Ok(Self { conn })
26 }
27
28 fn encode_json<T: serde::Serialize>(value: &T) -> Result<String, lash_core::PluginError> {
29 serde_json::to_string(value).map_err(|err| {
30 lash_core::PluginError::Session(format!("failed to encode trigger row: {err}"))
31 })
32 }
33
34 fn decode_subscription(
35 json: String,
36 ) -> Result<lash_core::TriggerSubscriptionRecord, lash_core::PluginError> {
37 serde_json::from_str(&json).map_err(|err| {
38 lash_core::PluginError::Session(format!(
39 "failed to decode trigger subscription row: {err}"
40 ))
41 })
42 }
43
44 fn decode_occurrence(
45 json: String,
46 ) -> Result<lash_core::TriggerOccurrenceRecord, lash_core::PluginError> {
47 serde_json::from_str(&json).map_err(|err| {
48 lash_core::PluginError::Session(format!(
49 "failed to decode trigger occurrence row: {err}"
50 ))
51 })
52 }
53}
54
55fn trigger_tx_outcome<T>(
56 result: Result<T, lash_core::PluginError>,
57) -> TxOutcome<Result<T, lash_core::PluginError>> {
58 match result {
59 Ok(value) => TxOutcome::Commit(Ok(value)),
60 Err(err) => TxOutcome::Rollback(Err(err)),
61 }
62}
63
64#[async_trait::async_trait]
65impl lash_core::TriggerStore for SqliteTriggerStore {
66 fn durability_tier(&self) -> DurabilityTier {
67 DurabilityTier::Durable
68 }
69
70 async fn register_subscription(
71 &self,
72 draft: lash_core::TriggerSubscriptionDraft,
73 ) -> Result<lash_core::TriggerSubscriptionRecord, lash_core::PluginError> {
74 draft.validate()?;
75 self.conn
76 .write_flow(move |tx| {
77 Ok(trigger_tx_outcome((|| {
78 tx.execute("INSERT INTO trigger_subscription_seq DEFAULT VALUES", [])
79 .map_err(process_sqlite_error)?;
80 let seq = tx.last_insert_rowid();
81 let handle = format!("trigger:{seq}");
82 let subscription_id = format!("subscription:{seq}");
83 let now = current_epoch_ms();
84 let record = lash_core::TriggerSubscriptionRecord {
85 subscription_id: subscription_id.clone(),
86 registrant: draft.registrant,
87 env_ref: draft.env_ref,
88 wake_target: draft.wake_target,
89 handle,
90 name: draft.name,
91 source_type: draft.source_type,
92 source_key: draft.source_key,
93 source: draft.source,
94 payload_schema: draft.payload_schema,
95 target: draft.target,
96 target_identity: draft.target_identity,
97 event_types: draft.event_types,
98 input_template: draft.input_template,
99 target_label: draft.target_label,
100 enabled: true,
101 created_at_ms: now,
102 updated_at_ms: now,
103 };
104 tx.execute(
105 "INSERT INTO trigger_subscriptions (
106 subscription_id, registrant_scope_id, handle, source_type, source_key,
107 enabled, created_at_ms, updated_at_ms, record_json
108 )
109 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
110 params![
111 record.subscription_id.as_str(),
112 record.registrant_scope_id().as_str(),
113 record.handle.as_str(),
114 record.source_type.as_str(),
115 record.source_key.as_str(),
116 i64::from(record.enabled),
117 record.created_at_ms as i64,
118 record.updated_at_ms as i64,
119 Self::encode_json(&record)?,
120 ],
121 )
122 .map_err(process_sqlite_error)?;
123 Ok(record)
124 })()))
125 })
126 .await
127 .map_err(process_sqlite_error)?
128 }
129
130 async fn list_subscriptions(
131 &self,
132 filter: lash_core::TriggerSubscriptionFilter,
133 ) -> Result<Vec<lash_core::TriggerSubscriptionRecord>, lash_core::PluginError> {
134 self.conn
135 .call(move |conn| {
136 Ok((|| {
137 let mut sql =
138 "SELECT subscription_id, record_json FROM trigger_subscriptions WHERE 1 = 1"
139 .to_string();
140 let mut values = Vec::<rusqlite::types::Value>::new();
141 if let Some(handle) = filter.handle.as_ref() {
142 sql.push_str(" AND handle = ?");
143 values.push(handle.clone().into());
144 }
145 if let Some(source_type) = filter.source_type.as_ref() {
146 sql.push_str(" AND source_type = ?");
147 values.push(source_type.clone().into());
148 }
149 if let Some(source_key) = filter.source_key.as_ref() {
150 sql.push_str(" AND source_key = ?");
151 values.push(source_key.clone().into());
152 }
153 if let Some(enabled) = filter.enabled {
154 sql.push_str(" AND enabled = ?");
155 values.push(i64::from(enabled).into());
156 }
157 sql.push_str(" ORDER BY registrant_scope_id ASC, handle ASC");
158 let mut stmt = conn.prepare(&sql).map_err(process_sqlite_error)?;
159 let rows = stmt
160 .query_map(rusqlite::params_from_iter(values.iter()), |row| {
161 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
162 })
163 .map_err(process_sqlite_error)?;
164 let mut records = Vec::new();
165 for row in rows {
166 let (subscription_id, json) = row.map_err(process_sqlite_error)?;
167 let record = match Self::decode_subscription(json) {
168 Ok(record) => record,
169 Err(err) => {
170 tracing::warn!(
171 error = %err,
172 subscription_id,
173 "skipping malformed trigger subscription during listing"
174 );
175 continue;
176 }
177 };
178 if filter.matches(&record) {
179 records.push(record);
180 }
181 }
182 Ok(records)
183 })())
184 })
185 .await
186 .map_err(process_sqlite_error)?
187 }
188
189 async fn cancel_subscription(
190 &self,
191 session_id: &str,
192 handle: &str,
193 ) -> Result<bool, lash_core::PluginError> {
194 let session_id = session_id.to_string();
195 let handle = handle.to_string();
196 self.conn
197 .write_flow(move |tx| {
198 Ok(trigger_tx_outcome((|| {
199 let mut stmt = tx
200 .prepare(
201 "SELECT subscription_id, enabled, record_json
202 FROM trigger_subscriptions
203 WHERE handle = ?1",
204 )
205 .map_err(process_sqlite_error)?;
206 let rows = stmt
207 .query_map(params![handle.as_str()], |row| {
208 Ok((
209 row.get::<_, String>(0)?,
210 row.get::<_, i64>(1)?,
211 row.get::<_, String>(2)?,
212 ))
213 })
214 .map_err(process_sqlite_error)?;
215 let mut selected = None;
216 for row in rows {
217 let (subscription_id, enabled, json) = row.map_err(process_sqlite_error)?;
218 let record = match Self::decode_subscription(json.clone()) {
219 Ok(record) => record,
220 Err(err) => {
221 tracing::warn!(
222 error = %err,
223 subscription_id,
224 handle,
225 "skipping malformed trigger subscription during cancel"
226 );
227 continue;
228 }
229 };
230 if record.registrant_session_id() == Some(session_id.as_str()) {
231 selected = Some((subscription_id, enabled, json));
232 break;
233 }
234 }
235 let Some((subscription_id, enabled, json)) = selected else {
236 return Ok(false);
237 };
238 let changed = enabled != 0;
239 let updated_at_ms = current_epoch_ms();
240 match Self::decode_subscription(json) {
241 Ok(mut record) => {
242 record.enabled = false;
243 record.updated_at_ms = updated_at_ms;
244 tx.execute(
245 "UPDATE trigger_subscriptions
246 SET enabled = ?3, updated_at_ms = ?4, record_json = ?5
247 WHERE subscription_id = ?1 AND handle = ?2",
248 params![
249 subscription_id.as_str(),
250 handle.as_str(),
251 i64::from(record.enabled),
252 record.updated_at_ms as i64,
253 Self::encode_json(&record)?,
254 ],
255 )
256 .map_err(process_sqlite_error)?;
257 }
258 Err(err) => {
259 tracing::warn!(
260 error = %err,
261 subscription_id,
262 handle,
263 "disabling malformed trigger subscription without rewriting record JSON"
264 );
265 tx.execute(
266 "UPDATE trigger_subscriptions
267 SET enabled = ?3, updated_at_ms = ?4
268 WHERE subscription_id = ?1 AND handle = ?2",
269 params![
270 subscription_id.as_str(),
271 handle.as_str(),
272 0i64,
273 updated_at_ms as i64,
274 ],
275 )
276 .map_err(process_sqlite_error)?;
277 }
278 }
279 Ok(changed)
280 })()))
281 })
282 .await
283 .map_err(process_sqlite_error)?
284 }
285
286 async fn delete_session_subscriptions(
287 &self,
288 session_id: &str,
289 ) -> Result<usize, lash_core::PluginError> {
290 let session_id = session_id.to_string();
291 self.conn
292 .write_flow(move |tx| {
293 Ok(trigger_tx_outcome((|| {
294 let mut stmt = tx
295 .prepare("SELECT subscription_id, record_json FROM trigger_subscriptions")
296 .map_err(process_sqlite_error)?;
297 let rows = stmt
298 .query_map([], |row| {
299 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
300 })
301 .map_err(process_sqlite_error)?;
302 let mut subscription_ids = Vec::new();
303 for row in rows {
304 let (subscription_id, json) = row.map_err(process_sqlite_error)?;
305 let record = match Self::decode_subscription(json) {
306 Ok(record) => record,
307 Err(err) => {
308 tracing::warn!(
309 error = %err,
310 subscription_id,
311 "skipping malformed trigger subscription during session delete"
312 );
313 continue;
314 }
315 };
316 if record.registrant_session_id() == Some(session_id.as_str()) {
317 subscription_ids.push(subscription_id);
318 }
319 }
320 drop(stmt);
321 let mut deleted = 0usize;
322 for subscription_id in subscription_ids {
323 deleted = deleted.saturating_add(
324 tx.execute(
325 "DELETE FROM trigger_subscriptions WHERE subscription_id = ?1",
326 params![subscription_id.as_str()],
327 )
328 .map_err(process_sqlite_error)?,
329 );
330 }
331 Ok(deleted)
332 })()))
333 })
334 .await
335 .map_err(process_sqlite_error)?
336 }
337
338 async fn record_occurrence(
339 &self,
340 request: lash_core::TriggerOccurrenceRequest,
341 ) -> Result<lash_core::TriggerOccurrenceRecord, lash_core::PluginError> {
342 lash_core::validate_trigger_occurrence_request(&request)?;
343 let request_hash = lash_core::trigger_occurrence_request_hash(&request)?;
344 let occurrence_id = lash_core::deterministic_occurrence_id(&request)?;
345 self.conn
346 .write_flow(move |tx| {
347 Ok(trigger_tx_outcome((|| {
348 let existing: Option<(String, String)> = tx
349 .query_row(
350 "SELECT request_hash, record_json
351 FROM trigger_occurrences
352 WHERE idempotency_key = ?1",
353 params![request.idempotency_key.as_str()],
354 |row| Ok((row.get(0)?, row.get(1)?)),
355 )
356 .optional()
357 .map_err(process_sqlite_error)?;
358 if let Some((existing_hash, existing_json)) = existing {
359 if existing_hash != request_hash {
360 return Err(lash_core::PluginError::Session(format!(
361 "trigger occurrence idempotency conflict for `{}`",
362 request.idempotency_key
363 )));
364 }
365 return Self::decode_occurrence(existing_json);
366 }
367 let record = lash_core::TriggerOccurrenceRecord {
368 occurrence_id: occurrence_id.clone(),
369 source_type: request.source_type,
370 source_key: request.source_key,
371 payload: request.payload,
372 idempotency_key: request.idempotency_key,
373 source: request.source,
374 occurred_at_ms: current_epoch_ms(),
375 };
376 tx.execute(
377 "INSERT INTO trigger_occurrences (
378 occurrence_id, idempotency_key, request_hash, source_type,
379 source_key, occurred_at_ms, record_json
380 )
381 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
382 params![
383 record.occurrence_id.as_str(),
384 record.idempotency_key.as_str(),
385 request_hash.as_str(),
386 record.source_type.as_str(),
387 record.source_key.as_str(),
388 record.occurred_at_ms as i64,
389 Self::encode_json(&record)?,
390 ],
391 )
392 .map_err(process_sqlite_error)?;
393 Ok(record)
394 })()))
395 })
396 .await
397 .map_err(process_sqlite_error)?
398 }
399
400 async fn reserve_matching_deliveries(
401 &self,
402 occurrence_id: &str,
403 ) -> Result<Vec<lash_core::TriggerDeliveryReservation>, lash_core::PluginError> {
404 let occurrence_id = occurrence_id.to_string();
405 self.conn
406 .write_flow(move |tx| {
407 Ok(trigger_tx_outcome((|| {
408 let occurrence_json: Option<String> = tx
409 .query_row(
410 "SELECT record_json
411 FROM trigger_occurrences
412 WHERE occurrence_id = ?1",
413 params![occurrence_id.as_str()],
414 |row| row.get(0),
415 )
416 .optional()
417 .map_err(process_sqlite_error)?;
418 let Some(occurrence_json) = occurrence_json else {
419 return Err(lash_core::PluginError::Session(format!(
420 "unknown trigger occurrence `{occurrence_id}`"
421 )));
422 };
423 let occurrence = Self::decode_occurrence(occurrence_json)?;
424 let subscriptions = {
425 let mut stmt = tx
426 .prepare(
427 "SELECT subscription_id, record_json
428 FROM trigger_subscriptions
429 WHERE enabled = 1 AND source_type = ?1 AND source_key = ?2
430 ORDER BY registrant_scope_id ASC, handle ASC",
431 )
432 .map_err(process_sqlite_error)?;
433 let rows = stmt
434 .query_map(
435 params![
436 occurrence.source_type.as_str(),
437 occurrence.source_key.as_str()
438 ],
439 |row| {
440 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
441 },
442 )
443 .map_err(process_sqlite_error)?;
444 let mut subscriptions = Vec::new();
445 for row in rows {
446 let (subscription_id, json) = row.map_err(process_sqlite_error)?;
447 match Self::decode_subscription(json) {
448 Ok(subscription) => subscriptions.push(subscription),
449 Err(err) => tracing::warn!(
450 error = %err,
451 subscription_id,
452 occurrence_id = %occurrence.occurrence_id,
453 "skipping malformed trigger subscription during delivery reservation"
454 ),
455 }
456 }
457 subscriptions
458 };
459 let mut reservations = Vec::new();
460 for subscription in subscriptions {
461 let process_id = lash_core::deterministic_delivery_process_id(
462 &occurrence.occurrence_id,
463 &subscription.subscription_id,
464 )?;
465 let inserted = tx
466 .execute(
467 "INSERT OR IGNORE INTO trigger_deliveries (
468 occurrence_id, subscription_id, process_id, created_at_ms
469 )
470 VALUES (?1, ?2, ?3, ?4)",
471 params![
472 occurrence.occurrence_id.as_str(),
473 subscription.subscription_id.as_str(),
474 process_id.as_str(),
475 current_epoch_ms() as i64,
476 ],
477 )
478 .map_err(process_sqlite_error)?;
479 if inserted == 0 {
480 continue;
481 }
482 reservations.push(lash_core::TriggerDeliveryReservation {
483 occurrence: occurrence.clone(),
484 subscription,
485 process_id,
486 });
487 }
488 Ok(reservations)
489 })()))
490 })
491 .await
492 .map_err(process_sqlite_error)?
493 }
494}