Skip to main content

lash_sqlite_store/
triggers.rs

1//! SQLite-backed runtime trigger store.
2//!
3//! This is the durable peer of [`SqliteProcessRegistry`]: it stores trigger
4//! subscriptions and append-only trigger occurrences at deployment scope,
5//! outside any session database.
6
7use super::*;
8
9pub struct SqliteTriggerStore {
10    conn: SqliteConnection,
11}
12
13impl SqliteTriggerStore {
14    pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
15        let conn = SqliteConnection::open(path).await?;
16        ensure_trigger_schema(&conn).await?;
17        apply_pragmas(&conn, StoreBacking::File).await?;
18        Ok(Self { conn })
19    }
20
21    pub async fn memory() -> tokio_rusqlite::Result<Self> {
22        let conn = SqliteConnection::open_in_memory().await?;
23        ensure_trigger_schema(&conn).await?;
24        apply_pragmas(&conn, StoreBacking::Memory).await?;
25        Ok(Self { conn })
26    }
27
28    fn encode_json<T: serde::Serialize>(value: &T) -> Result<String, lash_core::PluginError> {
29        serde_json::to_string(value).map_err(|err| {
30            lash_core::PluginError::Session(format!("failed to encode trigger row: {err}"))
31        })
32    }
33
34    fn decode_subscription(
35        json: String,
36    ) -> Result<lash_core::TriggerSubscriptionRecord, lash_core::PluginError> {
37        serde_json::from_str(&json).map_err(|err| {
38            lash_core::PluginError::Session(format!(
39                "failed to decode trigger subscription row: {err}"
40            ))
41        })
42    }
43
44    fn decode_occurrence(
45        json: String,
46    ) -> Result<lash_core::TriggerOccurrenceRecord, lash_core::PluginError> {
47        serde_json::from_str(&json).map_err(|err| {
48            lash_core::PluginError::Session(format!(
49                "failed to decode trigger occurrence row: {err}"
50            ))
51        })
52    }
53}
54
55fn trigger_tx_outcome<T>(
56    result: Result<T, lash_core::PluginError>,
57) -> TxOutcome<Result<T, lash_core::PluginError>> {
58    match result {
59        Ok(value) => TxOutcome::Commit(Ok(value)),
60        Err(err) => TxOutcome::Rollback(Err(err)),
61    }
62}
63
64#[async_trait::async_trait]
65impl lash_core::TriggerStore for SqliteTriggerStore {
66    fn durability_tier(&self) -> DurabilityTier {
67        DurabilityTier::Durable
68    }
69
70    async fn register_subscription(
71        &self,
72        draft: lash_core::TriggerSubscriptionDraft,
73    ) -> Result<lash_core::TriggerSubscriptionRecord, lash_core::PluginError> {
74        self.conn
75            .write_flow(move |tx| {
76                Ok(trigger_tx_outcome((|| {
77                    tx.execute("INSERT INTO trigger_subscription_seq DEFAULT VALUES", [])
78                        .map_err(process_sqlite_error)?;
79                    let seq = tx.last_insert_rowid();
80                    let handle = format!("trigger:{seq}");
81                    let subscription_id = format!("subscription:{seq}");
82                    let now = current_epoch_ms();
83                    let record = lash_core::TriggerSubscriptionRecord {
84                        subscription_id: subscription_id.clone(),
85                        registrant: draft.registrant,
86                        env_ref: draft.env_ref,
87                        wake_target: draft.wake_target,
88                        handle,
89                        name: draft.name,
90                        source_type: draft.source_type,
91                        source_key: draft.source_key,
92                        source: draft.source,
93                        payload_schema: draft.payload_schema,
94                        target: draft.target,
95                        target_identity: draft.target_identity,
96                        event_types: draft.event_types,
97                        input_template: draft.input_template,
98                        target_label: draft.target_label,
99                        enabled: true,
100                        created_at_ms: now,
101                        updated_at_ms: now,
102                    };
103                    tx.execute(
104                        "INSERT INTO trigger_subscriptions (
105                            subscription_id, registrant_scope_id, handle, source_type, source_key,
106                            enabled, created_at_ms, updated_at_ms, record_json
107                         )
108                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
109                        params![
110                            record.subscription_id.as_str(),
111                            record.registrant_scope_id().as_str(),
112                            record.handle.as_str(),
113                            record.source_type.as_str(),
114                            record.source_key.as_str(),
115                            i64::from(record.enabled),
116                            record.created_at_ms as i64,
117                            record.updated_at_ms as i64,
118                            Self::encode_json(&record)?,
119                        ],
120                    )
121                    .map_err(process_sqlite_error)?;
122                    Ok(record)
123                })()))
124            })
125            .await
126            .map_err(process_sqlite_error)?
127    }
128
129    async fn list_subscriptions(
130        &self,
131        filter: lash_core::TriggerSubscriptionFilter,
132    ) -> Result<Vec<lash_core::TriggerSubscriptionRecord>, lash_core::PluginError> {
133        self.conn
134            .call(move |conn| {
135                Ok((|| {
136                    let mut sql =
137                        "SELECT subscription_id, record_json FROM trigger_subscriptions WHERE 1 = 1"
138                            .to_string();
139                    let mut values = Vec::<rusqlite::types::Value>::new();
140                    if let Some(handle) = filter.handle.as_ref() {
141                        sql.push_str(" AND handle = ?");
142                        values.push(handle.clone().into());
143                    }
144                    if let Some(source_type) = filter.source_type.as_ref() {
145                        sql.push_str(" AND source_type = ?");
146                        values.push(source_type.clone().into());
147                    }
148                    if let Some(source_key) = filter.source_key.as_ref() {
149                        sql.push_str(" AND source_key = ?");
150                        values.push(source_key.clone().into());
151                    }
152                    if let Some(enabled) = filter.enabled {
153                        sql.push_str(" AND enabled = ?");
154                        values.push(i64::from(enabled).into());
155                    }
156                    sql.push_str(" ORDER BY registrant_scope_id ASC, handle ASC");
157                    let mut stmt = conn.prepare(&sql).map_err(process_sqlite_error)?;
158                    let rows = stmt
159                        .query_map(rusqlite::params_from_iter(values.iter()), |row| {
160                            Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
161                        })
162                        .map_err(process_sqlite_error)?;
163                    let mut records = Vec::new();
164                    for row in rows {
165                        let (subscription_id, json) = row.map_err(process_sqlite_error)?;
166                        let record = match Self::decode_subscription(json) {
167                            Ok(record) => record,
168                            Err(err) => {
169                                tracing::warn!(
170                                    error = %err,
171                                    subscription_id,
172                                    "skipping malformed trigger subscription during listing"
173                                );
174                                continue;
175                            }
176                        };
177                        if filter.matches(&record) {
178                            records.push(record);
179                        }
180                    }
181                    Ok(records)
182                })())
183            })
184            .await
185            .map_err(process_sqlite_error)?
186    }
187
188    async fn cancel_subscription(
189        &self,
190        session_id: &str,
191        handle: &str,
192    ) -> Result<bool, lash_core::PluginError> {
193        let session_id = session_id.to_string();
194        let handle = handle.to_string();
195        self.conn
196            .write_flow(move |tx| {
197                Ok(trigger_tx_outcome((|| {
198                    let mut stmt = tx
199                        .prepare(
200                            "SELECT subscription_id, enabled, record_json
201                             FROM trigger_subscriptions
202                             WHERE handle = ?1",
203                        )
204                        .map_err(process_sqlite_error)?;
205                    let rows = stmt
206                        .query_map(params![handle.as_str()], |row| {
207                            Ok((
208                                row.get::<_, String>(0)?,
209                                row.get::<_, i64>(1)?,
210                                row.get::<_, String>(2)?,
211                            ))
212                        })
213                        .map_err(process_sqlite_error)?;
214                    let mut selected = None;
215                    for row in rows {
216                        let (subscription_id, enabled, json) = row.map_err(process_sqlite_error)?;
217                        let record = match Self::decode_subscription(json.clone()) {
218                            Ok(record) => record,
219                            Err(err) => {
220                                tracing::warn!(
221                                    error = %err,
222                                    subscription_id,
223                                    handle,
224                                    "skipping malformed trigger subscription during cancel"
225                                );
226                                continue;
227                            }
228                        };
229                        if record.registrant_session_id() == Some(session_id.as_str()) {
230                            selected = Some((subscription_id, enabled, json));
231                            break;
232                        }
233                    }
234                    let Some((subscription_id, enabled, json)) = selected else {
235                        return Ok(false);
236                    };
237                    let changed = enabled != 0;
238                    let updated_at_ms = current_epoch_ms();
239                    match Self::decode_subscription(json) {
240                        Ok(mut record) => {
241                            record.enabled = false;
242                            record.updated_at_ms = updated_at_ms;
243                            tx.execute(
244                                "UPDATE trigger_subscriptions
245                                 SET enabled = ?3, updated_at_ms = ?4, record_json = ?5
246                                 WHERE subscription_id = ?1 AND handle = ?2",
247                                params![
248                                    subscription_id.as_str(),
249                                    handle.as_str(),
250                                    i64::from(record.enabled),
251                                    record.updated_at_ms as i64,
252                                    Self::encode_json(&record)?,
253                                ],
254                            )
255                            .map_err(process_sqlite_error)?;
256                        }
257                        Err(err) => {
258                            tracing::warn!(
259                                error = %err,
260                                subscription_id,
261                                handle,
262                                "disabling malformed trigger subscription without rewriting record JSON"
263                            );
264                            tx.execute(
265                                "UPDATE trigger_subscriptions
266                                 SET enabled = ?3, updated_at_ms = ?4
267                                 WHERE subscription_id = ?1 AND handle = ?2",
268                                params![
269                                    subscription_id.as_str(),
270                                    handle.as_str(),
271                                    0i64,
272                                    updated_at_ms as i64,
273                                ],
274                            )
275                            .map_err(process_sqlite_error)?;
276                        }
277                    }
278                    Ok(changed)
279                })()))
280            })
281            .await
282            .map_err(process_sqlite_error)?
283    }
284
285    async fn delete_session_subscriptions(
286        &self,
287        session_id: &str,
288    ) -> Result<usize, lash_core::PluginError> {
289        let session_id = session_id.to_string();
290        self.conn
291            .write_flow(move |tx| {
292                Ok(trigger_tx_outcome((|| {
293                    let mut stmt = tx
294                        .prepare("SELECT subscription_id, record_json FROM trigger_subscriptions")
295                        .map_err(process_sqlite_error)?;
296                    let rows = stmt
297                        .query_map([], |row| {
298                            Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
299                        })
300                        .map_err(process_sqlite_error)?;
301                    let mut subscription_ids = Vec::new();
302                    for row in rows {
303                        let (subscription_id, json) = row.map_err(process_sqlite_error)?;
304                        let record = match Self::decode_subscription(json) {
305                            Ok(record) => record,
306                            Err(err) => {
307                                tracing::warn!(
308                                    error = %err,
309                                    subscription_id,
310                                    "skipping malformed trigger subscription during session delete"
311                                );
312                                continue;
313                            }
314                        };
315                        if record.registrant_session_id() == Some(session_id.as_str()) {
316                            subscription_ids.push(subscription_id);
317                        }
318                    }
319                    drop(stmt);
320                    let mut deleted = 0usize;
321                    for subscription_id in subscription_ids {
322                        deleted = deleted.saturating_add(
323                            tx.execute(
324                                "DELETE FROM trigger_subscriptions WHERE subscription_id = ?1",
325                                params![subscription_id.as_str()],
326                            )
327                            .map_err(process_sqlite_error)?,
328                        );
329                    }
330                    Ok(deleted)
331                })()))
332            })
333            .await
334            .map_err(process_sqlite_error)?
335    }
336
337    async fn record_occurrence(
338        &self,
339        request: lash_core::TriggerOccurrenceRequest,
340    ) -> Result<lash_core::TriggerOccurrenceRecord, lash_core::PluginError> {
341        lash_core::validate_trigger_occurrence_request(&request)?;
342        let request_hash = lash_core::trigger_occurrence_request_hash(&request)?;
343        let occurrence_id = lash_core::deterministic_occurrence_id(&request)?;
344        self.conn
345            .write_flow(move |tx| {
346                Ok(trigger_tx_outcome((|| {
347                    let existing: Option<(String, String)> = tx
348                        .query_row(
349                            "SELECT request_hash, record_json
350                             FROM trigger_occurrences
351                             WHERE idempotency_key = ?1",
352                            params![request.idempotency_key.as_str()],
353                            |row| Ok((row.get(0)?, row.get(1)?)),
354                        )
355                        .optional()
356                        .map_err(process_sqlite_error)?;
357                    if let Some((existing_hash, existing_json)) = existing {
358                        if existing_hash != request_hash {
359                            return Err(lash_core::PluginError::Session(format!(
360                                "trigger occurrence idempotency conflict for `{}`",
361                                request.idempotency_key
362                            )));
363                        }
364                        return Self::decode_occurrence(existing_json);
365                    }
366                    let record = lash_core::TriggerOccurrenceRecord {
367                        occurrence_id: occurrence_id.clone(),
368                        source_type: request.source_type,
369                        source_key: request.source_key,
370                        payload: request.payload,
371                        idempotency_key: request.idempotency_key,
372                        source: request.source,
373                        occurred_at_ms: current_epoch_ms(),
374                    };
375                    tx.execute(
376                        "INSERT INTO trigger_occurrences (
377                            occurrence_id, idempotency_key, request_hash, source_type,
378                            source_key, occurred_at_ms, record_json
379                         )
380                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
381                        params![
382                            record.occurrence_id.as_str(),
383                            record.idempotency_key.as_str(),
384                            request_hash.as_str(),
385                            record.source_type.as_str(),
386                            record.source_key.as_str(),
387                            record.occurred_at_ms as i64,
388                            Self::encode_json(&record)?,
389                        ],
390                    )
391                    .map_err(process_sqlite_error)?;
392                    Ok(record)
393                })()))
394            })
395            .await
396            .map_err(process_sqlite_error)?
397    }
398
399    async fn reserve_matching_deliveries(
400        &self,
401        occurrence_id: &str,
402    ) -> Result<Vec<lash_core::TriggerDeliveryReservation>, lash_core::PluginError> {
403        let occurrence_id = occurrence_id.to_string();
404        self.conn
405            .write_flow(move |tx| {
406                Ok(trigger_tx_outcome((|| {
407                    let occurrence_json: Option<String> = tx
408                        .query_row(
409                            "SELECT record_json
410                             FROM trigger_occurrences
411                             WHERE occurrence_id = ?1",
412                            params![occurrence_id.as_str()],
413                            |row| row.get(0),
414                        )
415                        .optional()
416                        .map_err(process_sqlite_error)?;
417                    let Some(occurrence_json) = occurrence_json else {
418                        return Err(lash_core::PluginError::Session(format!(
419                            "unknown trigger occurrence `{occurrence_id}`"
420                        )));
421                    };
422                    let occurrence = Self::decode_occurrence(occurrence_json)?;
423                    let subscriptions = {
424                        let mut stmt = tx
425                            .prepare(
426                                "SELECT subscription_id, record_json
427                                 FROM trigger_subscriptions
428                                 WHERE enabled = 1 AND source_type = ?1 AND source_key = ?2
429                                 ORDER BY registrant_scope_id ASC, handle ASC",
430                            )
431                            .map_err(process_sqlite_error)?;
432                        let rows = stmt
433                            .query_map(
434                                params![
435                                    occurrence.source_type.as_str(),
436                                    occurrence.source_key.as_str()
437                                ],
438                                |row| {
439                                    Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
440                                },
441                            )
442                            .map_err(process_sqlite_error)?;
443                        let mut subscriptions = Vec::new();
444                        for row in rows {
445                            let (subscription_id, json) = row.map_err(process_sqlite_error)?;
446                            match Self::decode_subscription(json) {
447                                Ok(subscription) => subscriptions.push(subscription),
448                                Err(err) => tracing::warn!(
449                                    error = %err,
450                                    subscription_id,
451                                    occurrence_id = %occurrence.occurrence_id,
452                                    "skipping malformed trigger subscription during delivery reservation"
453                                ),
454                            }
455                        }
456                        subscriptions
457                    };
458                    let mut reservations = Vec::new();
459                    for subscription in subscriptions {
460                        let process_id = lash_core::deterministic_delivery_process_id(
461                            &occurrence.occurrence_id,
462                            &subscription.subscription_id,
463                        )?;
464                        let inserted = tx
465                            .execute(
466                                "INSERT OR IGNORE INTO trigger_deliveries (
467                                    occurrence_id, subscription_id, process_id, created_at_ms
468                                 )
469                                 VALUES (?1, ?2, ?3, ?4)",
470                                params![
471                                    occurrence.occurrence_id.as_str(),
472                                    subscription.subscription_id.as_str(),
473                                    process_id.as_str(),
474                                    current_epoch_ms() as i64,
475                                ],
476                            )
477                            .map_err(process_sqlite_error)?;
478                        if inserted == 0 {
479                            continue;
480                        }
481                        reservations.push(lash_core::TriggerDeliveryReservation {
482                            occurrence: occurrence.clone(),
483                            subscription,
484                            process_id,
485                        });
486                    }
487                    Ok(reservations)
488                })()))
489            })
490            .await
491            .map_err(process_sqlite_error)?
492    }
493}