1use super::*;
8
9pub struct SqliteTriggerStore {
10 conn: SqliteConnection,
11}
12
13impl SqliteTriggerStore {
14 pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
15 let conn = SqliteConnection::open(path).await?;
16 ensure_trigger_schema(&conn).await?;
17 apply_pragmas(&conn, StoreBacking::File).await?;
18 Ok(Self { conn })
19 }
20
21 pub async fn memory() -> tokio_rusqlite::Result<Self> {
22 let conn = SqliteConnection::open_in_memory().await?;
23 ensure_trigger_schema(&conn).await?;
24 apply_pragmas(&conn, StoreBacking::Memory).await?;
25 Ok(Self { conn })
26 }
27
28 fn encode_json<T: serde::Serialize>(value: &T) -> Result<String, lash_core::PluginError> {
29 serde_json::to_string(value).map_err(|err| {
30 lash_core::PluginError::Session(format!("failed to encode trigger row: {err}"))
31 })
32 }
33
34 fn decode_subscription(
35 json: String,
36 ) -> Result<lash_core::TriggerSubscriptionRecord, lash_core::PluginError> {
37 serde_json::from_str(&json).map_err(|err| {
38 lash_core::PluginError::Session(format!(
39 "failed to decode trigger subscription row: {err}"
40 ))
41 })
42 }
43
44 fn decode_occurrence(
45 json: String,
46 ) -> Result<lash_core::TriggerOccurrenceRecord, lash_core::PluginError> {
47 serde_json::from_str(&json).map_err(|err| {
48 lash_core::PluginError::Session(format!(
49 "failed to decode trigger occurrence row: {err}"
50 ))
51 })
52 }
53}
54
55fn trigger_tx_outcome<T>(
56 result: Result<T, lash_core::PluginError>,
57) -> TxOutcome<Result<T, lash_core::PluginError>> {
58 match result {
59 Ok(value) => TxOutcome::Commit(Ok(value)),
60 Err(err) => TxOutcome::Rollback(Err(err)),
61 }
62}
63
64#[async_trait::async_trait]
65impl lash_core::TriggerStore for SqliteTriggerStore {
66 fn durability_tier(&self) -> DurabilityTier {
67 DurabilityTier::Durable
68 }
69
70 async fn register_subscription(
71 &self,
72 draft: lash_core::TriggerSubscriptionDraft,
73 ) -> Result<lash_core::TriggerSubscriptionRecord, lash_core::PluginError> {
74 self.conn
75 .write_flow(move |tx| {
76 Ok(trigger_tx_outcome((|| {
77 tx.execute("INSERT INTO trigger_subscription_seq DEFAULT VALUES", [])
78 .map_err(process_sqlite_error)?;
79 let seq = tx.last_insert_rowid();
80 let handle = format!("trigger:{seq}");
81 let subscription_id = format!("subscription:{seq}");
82 let now = current_epoch_ms();
83 let record = lash_core::TriggerSubscriptionRecord {
84 subscription_id: subscription_id.clone(),
85 registrant: draft.registrant,
86 env_ref: draft.env_ref,
87 wake_target: draft.wake_target,
88 handle,
89 name: draft.name,
90 source_type: draft.source_type,
91 source_key: draft.source_key,
92 source: draft.source,
93 payload_schema: draft.payload_schema,
94 target: draft.target,
95 target_identity: draft.target_identity,
96 event_types: draft.event_types,
97 input_template: draft.input_template,
98 target_label: draft.target_label,
99 enabled: true,
100 created_at_ms: now,
101 updated_at_ms: now,
102 };
103 tx.execute(
104 "INSERT INTO trigger_subscriptions (
105 subscription_id, registrant_scope_id, handle, source_type, source_key,
106 enabled, created_at_ms, updated_at_ms, record_json
107 )
108 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
109 params![
110 record.subscription_id.as_str(),
111 record.registrant_scope_id().as_str(),
112 record.handle.as_str(),
113 record.source_type.as_str(),
114 record.source_key.as_str(),
115 i64::from(record.enabled),
116 record.created_at_ms as i64,
117 record.updated_at_ms as i64,
118 Self::encode_json(&record)?,
119 ],
120 )
121 .map_err(process_sqlite_error)?;
122 Ok(record)
123 })()))
124 })
125 .await
126 .map_err(process_sqlite_error)?
127 }
128
129 async fn list_subscriptions(
130 &self,
131 filter: lash_core::TriggerSubscriptionFilter,
132 ) -> Result<Vec<lash_core::TriggerSubscriptionRecord>, lash_core::PluginError> {
133 self.conn
134 .call(move |conn| {
135 Ok((|| {
136 let mut sql =
137 "SELECT subscription_id, record_json FROM trigger_subscriptions WHERE 1 = 1"
138 .to_string();
139 let mut values = Vec::<rusqlite::types::Value>::new();
140 if let Some(handle) = filter.handle.as_ref() {
141 sql.push_str(" AND handle = ?");
142 values.push(handle.clone().into());
143 }
144 if let Some(source_type) = filter.source_type.as_ref() {
145 sql.push_str(" AND source_type = ?");
146 values.push(source_type.clone().into());
147 }
148 if let Some(source_key) = filter.source_key.as_ref() {
149 sql.push_str(" AND source_key = ?");
150 values.push(source_key.clone().into());
151 }
152 if let Some(enabled) = filter.enabled {
153 sql.push_str(" AND enabled = ?");
154 values.push(i64::from(enabled).into());
155 }
156 sql.push_str(" ORDER BY registrant_scope_id ASC, handle ASC");
157 let mut stmt = conn.prepare(&sql).map_err(process_sqlite_error)?;
158 let rows = stmt
159 .query_map(rusqlite::params_from_iter(values.iter()), |row| {
160 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
161 })
162 .map_err(process_sqlite_error)?;
163 let mut records = Vec::new();
164 for row in rows {
165 let (subscription_id, json) = row.map_err(process_sqlite_error)?;
166 let record = match Self::decode_subscription(json) {
167 Ok(record) => record,
168 Err(err) => {
169 tracing::warn!(
170 error = %err,
171 subscription_id,
172 "skipping malformed trigger subscription during listing"
173 );
174 continue;
175 }
176 };
177 if filter.matches(&record) {
178 records.push(record);
179 }
180 }
181 Ok(records)
182 })())
183 })
184 .await
185 .map_err(process_sqlite_error)?
186 }
187
188 async fn cancel_subscription(
189 &self,
190 session_id: &str,
191 handle: &str,
192 ) -> Result<bool, lash_core::PluginError> {
193 let session_id = session_id.to_string();
194 let handle = handle.to_string();
195 self.conn
196 .write_flow(move |tx| {
197 Ok(trigger_tx_outcome((|| {
198 let mut stmt = tx
199 .prepare(
200 "SELECT subscription_id, enabled, record_json
201 FROM trigger_subscriptions
202 WHERE handle = ?1",
203 )
204 .map_err(process_sqlite_error)?;
205 let rows = stmt
206 .query_map(params![handle.as_str()], |row| {
207 Ok((
208 row.get::<_, String>(0)?,
209 row.get::<_, i64>(1)?,
210 row.get::<_, String>(2)?,
211 ))
212 })
213 .map_err(process_sqlite_error)?;
214 let mut selected = None;
215 for row in rows {
216 let (subscription_id, enabled, json) = row.map_err(process_sqlite_error)?;
217 let record = match Self::decode_subscription(json.clone()) {
218 Ok(record) => record,
219 Err(err) => {
220 tracing::warn!(
221 error = %err,
222 subscription_id,
223 handle,
224 "skipping malformed trigger subscription during cancel"
225 );
226 continue;
227 }
228 };
229 if record.registrant_session_id() == Some(session_id.as_str()) {
230 selected = Some((subscription_id, enabled, json));
231 break;
232 }
233 }
234 let Some((subscription_id, enabled, json)) = selected else {
235 return Ok(false);
236 };
237 let changed = enabled != 0;
238 let updated_at_ms = current_epoch_ms();
239 match Self::decode_subscription(json) {
240 Ok(mut record) => {
241 record.enabled = false;
242 record.updated_at_ms = updated_at_ms;
243 tx.execute(
244 "UPDATE trigger_subscriptions
245 SET enabled = ?3, updated_at_ms = ?4, record_json = ?5
246 WHERE subscription_id = ?1 AND handle = ?2",
247 params![
248 subscription_id.as_str(),
249 handle.as_str(),
250 i64::from(record.enabled),
251 record.updated_at_ms as i64,
252 Self::encode_json(&record)?,
253 ],
254 )
255 .map_err(process_sqlite_error)?;
256 }
257 Err(err) => {
258 tracing::warn!(
259 error = %err,
260 subscription_id,
261 handle,
262 "disabling malformed trigger subscription without rewriting record JSON"
263 );
264 tx.execute(
265 "UPDATE trigger_subscriptions
266 SET enabled = ?3, updated_at_ms = ?4
267 WHERE subscription_id = ?1 AND handle = ?2",
268 params![
269 subscription_id.as_str(),
270 handle.as_str(),
271 0i64,
272 updated_at_ms as i64,
273 ],
274 )
275 .map_err(process_sqlite_error)?;
276 }
277 }
278 Ok(changed)
279 })()))
280 })
281 .await
282 .map_err(process_sqlite_error)?
283 }
284
285 async fn delete_session_subscriptions(
286 &self,
287 session_id: &str,
288 ) -> Result<usize, lash_core::PluginError> {
289 let session_id = session_id.to_string();
290 self.conn
291 .write_flow(move |tx| {
292 Ok(trigger_tx_outcome((|| {
293 let mut stmt = tx
294 .prepare("SELECT subscription_id, record_json FROM trigger_subscriptions")
295 .map_err(process_sqlite_error)?;
296 let rows = stmt
297 .query_map([], |row| {
298 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
299 })
300 .map_err(process_sqlite_error)?;
301 let mut subscription_ids = Vec::new();
302 for row in rows {
303 let (subscription_id, json) = row.map_err(process_sqlite_error)?;
304 let record = match Self::decode_subscription(json) {
305 Ok(record) => record,
306 Err(err) => {
307 tracing::warn!(
308 error = %err,
309 subscription_id,
310 "skipping malformed trigger subscription during session delete"
311 );
312 continue;
313 }
314 };
315 if record.registrant_session_id() == Some(session_id.as_str()) {
316 subscription_ids.push(subscription_id);
317 }
318 }
319 drop(stmt);
320 let mut deleted = 0usize;
321 for subscription_id in subscription_ids {
322 deleted = deleted.saturating_add(
323 tx.execute(
324 "DELETE FROM trigger_subscriptions WHERE subscription_id = ?1",
325 params![subscription_id.as_str()],
326 )
327 .map_err(process_sqlite_error)?,
328 );
329 }
330 Ok(deleted)
331 })()))
332 })
333 .await
334 .map_err(process_sqlite_error)?
335 }
336
337 async fn record_occurrence(
338 &self,
339 request: lash_core::TriggerOccurrenceRequest,
340 ) -> Result<lash_core::TriggerOccurrenceRecord, lash_core::PluginError> {
341 lash_core::validate_trigger_occurrence_request(&request)?;
342 let request_hash = lash_core::trigger_occurrence_request_hash(&request)?;
343 let occurrence_id = lash_core::deterministic_occurrence_id(&request)?;
344 self.conn
345 .write_flow(move |tx| {
346 Ok(trigger_tx_outcome((|| {
347 let existing: Option<(String, String)> = tx
348 .query_row(
349 "SELECT request_hash, record_json
350 FROM trigger_occurrences
351 WHERE idempotency_key = ?1",
352 params![request.idempotency_key.as_str()],
353 |row| Ok((row.get(0)?, row.get(1)?)),
354 )
355 .optional()
356 .map_err(process_sqlite_error)?;
357 if let Some((existing_hash, existing_json)) = existing {
358 if existing_hash != request_hash {
359 return Err(lash_core::PluginError::Session(format!(
360 "trigger occurrence idempotency conflict for `{}`",
361 request.idempotency_key
362 )));
363 }
364 return Self::decode_occurrence(existing_json);
365 }
366 let record = lash_core::TriggerOccurrenceRecord {
367 occurrence_id: occurrence_id.clone(),
368 source_type: request.source_type,
369 source_key: request.source_key,
370 payload: request.payload,
371 idempotency_key: request.idempotency_key,
372 source: request.source,
373 occurred_at_ms: current_epoch_ms(),
374 };
375 tx.execute(
376 "INSERT INTO trigger_occurrences (
377 occurrence_id, idempotency_key, request_hash, source_type,
378 source_key, occurred_at_ms, record_json
379 )
380 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
381 params![
382 record.occurrence_id.as_str(),
383 record.idempotency_key.as_str(),
384 request_hash.as_str(),
385 record.source_type.as_str(),
386 record.source_key.as_str(),
387 record.occurred_at_ms as i64,
388 Self::encode_json(&record)?,
389 ],
390 )
391 .map_err(process_sqlite_error)?;
392 Ok(record)
393 })()))
394 })
395 .await
396 .map_err(process_sqlite_error)?
397 }
398
399 async fn reserve_matching_deliveries(
400 &self,
401 occurrence_id: &str,
402 ) -> Result<Vec<lash_core::TriggerDeliveryReservation>, lash_core::PluginError> {
403 let occurrence_id = occurrence_id.to_string();
404 self.conn
405 .write_flow(move |tx| {
406 Ok(trigger_tx_outcome((|| {
407 let occurrence_json: Option<String> = tx
408 .query_row(
409 "SELECT record_json
410 FROM trigger_occurrences
411 WHERE occurrence_id = ?1",
412 params![occurrence_id.as_str()],
413 |row| row.get(0),
414 )
415 .optional()
416 .map_err(process_sqlite_error)?;
417 let Some(occurrence_json) = occurrence_json else {
418 return Err(lash_core::PluginError::Session(format!(
419 "unknown trigger occurrence `{occurrence_id}`"
420 )));
421 };
422 let occurrence = Self::decode_occurrence(occurrence_json)?;
423 let subscriptions = {
424 let mut stmt = tx
425 .prepare(
426 "SELECT subscription_id, record_json
427 FROM trigger_subscriptions
428 WHERE enabled = 1 AND source_type = ?1 AND source_key = ?2
429 ORDER BY registrant_scope_id ASC, handle ASC",
430 )
431 .map_err(process_sqlite_error)?;
432 let rows = stmt
433 .query_map(
434 params![
435 occurrence.source_type.as_str(),
436 occurrence.source_key.as_str()
437 ],
438 |row| {
439 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
440 },
441 )
442 .map_err(process_sqlite_error)?;
443 let mut subscriptions = Vec::new();
444 for row in rows {
445 let (subscription_id, json) = row.map_err(process_sqlite_error)?;
446 match Self::decode_subscription(json) {
447 Ok(subscription) => subscriptions.push(subscription),
448 Err(err) => tracing::warn!(
449 error = %err,
450 subscription_id,
451 occurrence_id = %occurrence.occurrence_id,
452 "skipping malformed trigger subscription during delivery reservation"
453 ),
454 }
455 }
456 subscriptions
457 };
458 let mut reservations = Vec::new();
459 for subscription in subscriptions {
460 let process_id = lash_core::deterministic_delivery_process_id(
461 &occurrence.occurrence_id,
462 &subscription.subscription_id,
463 )?;
464 let inserted = tx
465 .execute(
466 "INSERT OR IGNORE INTO trigger_deliveries (
467 occurrence_id, subscription_id, process_id, created_at_ms
468 )
469 VALUES (?1, ?2, ?3, ?4)",
470 params![
471 occurrence.occurrence_id.as_str(),
472 subscription.subscription_id.as_str(),
473 process_id.as_str(),
474 current_epoch_ms() as i64,
475 ],
476 )
477 .map_err(process_sqlite_error)?;
478 if inserted == 0 {
479 continue;
480 }
481 reservations.push(lash_core::TriggerDeliveryReservation {
482 occurrence: occurrence.clone(),
483 subscription,
484 process_id,
485 });
486 }
487 Ok(reservations)
488 })()))
489 })
490 .await
491 .map_err(process_sqlite_error)?
492 }
493}