Skip to main content

lash_sqlite_store/
triggers.rs

1//! SQLite-backed runtime trigger store.
2//!
3//! This is the durable peer of [`SqliteProcessRegistry`]: it stores trigger
4//! subscriptions and append-only trigger occurrences at deployment scope,
5//! outside any session database.
6
7use super::*;
8
9pub struct SqliteTriggerStore {
10    conn: SqliteConnection,
11}
12
13impl SqliteTriggerStore {
14    pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
15        let conn = SqliteConnection::open(path).await?;
16        ensure_trigger_schema(&conn).await?;
17        apply_pragmas(&conn, StoreBacking::File).await?;
18        Ok(Self { conn })
19    }
20
21    pub async fn memory() -> tokio_rusqlite::Result<Self> {
22        let conn = SqliteConnection::open_in_memory().await?;
23        ensure_trigger_schema(&conn).await?;
24        apply_pragmas(&conn, StoreBacking::Memory).await?;
25        Ok(Self { conn })
26    }
27
28    fn encode_json<T: serde::Serialize>(value: &T) -> Result<String, lash_core::PluginError> {
29        serde_json::to_string(value).map_err(|err| {
30            lash_core::PluginError::Session(format!("failed to encode trigger row: {err}"))
31        })
32    }
33
34    fn decode_subscription(
35        json: String,
36    ) -> Result<lash_core::TriggerSubscriptionRecord, lash_core::PluginError> {
37        serde_json::from_str(&json).map_err(|err| {
38            lash_core::PluginError::Session(format!(
39                "failed to decode trigger subscription row: {err}"
40            ))
41        })
42    }
43
44    fn decode_occurrence(
45        json: String,
46    ) -> Result<lash_core::TriggerOccurrenceRecord, lash_core::PluginError> {
47        serde_json::from_str(&json).map_err(|err| {
48            lash_core::PluginError::Session(format!(
49                "failed to decode trigger occurrence row: {err}"
50            ))
51        })
52    }
53}
54
55fn trigger_tx_outcome<T>(
56    result: Result<T, lash_core::PluginError>,
57) -> TxOutcome<Result<T, lash_core::PluginError>> {
58    match result {
59        Ok(value) => TxOutcome::Commit(Ok(value)),
60        Err(err) => TxOutcome::Rollback(Err(err)),
61    }
62}
63
64#[async_trait::async_trait]
65impl lash_core::TriggerStore for SqliteTriggerStore {
66    fn durability_tier(&self) -> DurabilityTier {
67        DurabilityTier::Durable
68    }
69
70    async fn register_subscription(
71        &self,
72        draft: lash_core::TriggerSubscriptionDraft,
73    ) -> Result<lash_core::TriggerSubscriptionRecord, lash_core::PluginError> {
74        draft.validate()?;
75        self.conn
76            .write_flow(move |tx| {
77                Ok(trigger_tx_outcome((|| {
78                    tx.execute("INSERT INTO trigger_subscription_seq DEFAULT VALUES", [])
79                        .map_err(process_sqlite_error)?;
80                    let seq = tx.last_insert_rowid();
81                    let handle = format!("trigger:{seq}");
82                    let subscription_id = format!("subscription:{seq}");
83                    let now = current_epoch_ms();
84                    let record = lash_core::TriggerSubscriptionRecord {
85                        subscription_id: subscription_id.clone(),
86                        registrant: draft.registrant,
87                        env_ref: draft.env_ref,
88                        wake_target: draft.wake_target,
89                        handle,
90                        name: draft.name,
91                        source_type: draft.source_type,
92                        source_key: draft.source_key,
93                        source: draft.source,
94                        payload_schema: draft.payload_schema,
95                        target: draft.target,
96                        target_identity: draft.target_identity,
97                        event_types: draft.event_types,
98                        input_template: draft.input_template,
99                        target_label: draft.target_label,
100                        enabled: true,
101                        created_at_ms: now,
102                        updated_at_ms: now,
103                    };
104                    tx.execute(
105                        "INSERT INTO trigger_subscriptions (
106                            subscription_id, registrant_scope_id, handle, source_type, source_key,
107                            enabled, created_at_ms, updated_at_ms, record_json
108                         )
109                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
110                        params![
111                            record.subscription_id.as_str(),
112                            record.registrant_scope_id().as_str(),
113                            record.handle.as_str(),
114                            record.source_type.as_str(),
115                            record.source_key.as_str(),
116                            i64::from(record.enabled),
117                            record.created_at_ms as i64,
118                            record.updated_at_ms as i64,
119                            Self::encode_json(&record)?,
120                        ],
121                    )
122                    .map_err(process_sqlite_error)?;
123                    Ok(record)
124                })()))
125            })
126            .await
127            .map_err(process_sqlite_error)?
128    }
129
130    async fn list_subscriptions(
131        &self,
132        filter: lash_core::TriggerSubscriptionFilter,
133    ) -> Result<Vec<lash_core::TriggerSubscriptionRecord>, lash_core::PluginError> {
134        self.conn
135            .call(move |conn| {
136                Ok((|| {
137                    let mut sql =
138                        "SELECT subscription_id, record_json FROM trigger_subscriptions WHERE 1 = 1"
139                            .to_string();
140                    let mut values = Vec::<rusqlite::types::Value>::new();
141                    if let Some(handle) = filter.handle.as_ref() {
142                        sql.push_str(" AND handle = ?");
143                        values.push(handle.clone().into());
144                    }
145                    if let Some(source_type) = filter.source_type.as_ref() {
146                        sql.push_str(" AND source_type = ?");
147                        values.push(source_type.clone().into());
148                    }
149                    if let Some(source_key) = filter.source_key.as_ref() {
150                        sql.push_str(" AND source_key = ?");
151                        values.push(source_key.clone().into());
152                    }
153                    if let Some(enabled) = filter.enabled {
154                        sql.push_str(" AND enabled = ?");
155                        values.push(i64::from(enabled).into());
156                    }
157                    sql.push_str(" ORDER BY registrant_scope_id ASC, handle ASC");
158                    let mut stmt = conn.prepare(&sql).map_err(process_sqlite_error)?;
159                    let rows = stmt
160                        .query_map(rusqlite::params_from_iter(values.iter()), |row| {
161                            Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
162                        })
163                        .map_err(process_sqlite_error)?;
164                    let mut records = Vec::new();
165                    for row in rows {
166                        let (subscription_id, json) = row.map_err(process_sqlite_error)?;
167                        let record = match Self::decode_subscription(json) {
168                            Ok(record) => record,
169                            Err(err) => {
170                                tracing::warn!(
171                                    error = %err,
172                                    subscription_id,
173                                    "skipping malformed trigger subscription during listing"
174                                );
175                                continue;
176                            }
177                        };
178                        if filter.matches(&record) {
179                            records.push(record);
180                        }
181                    }
182                    Ok(records)
183                })())
184            })
185            .await
186            .map_err(process_sqlite_error)?
187    }
188
189    async fn cancel_subscription(
190        &self,
191        session_id: &str,
192        handle: &str,
193    ) -> Result<bool, lash_core::PluginError> {
194        let session_id = session_id.to_string();
195        let handle = handle.to_string();
196        self.conn
197            .write_flow(move |tx| {
198                Ok(trigger_tx_outcome((|| {
199                    let mut stmt = tx
200                        .prepare(
201                            "SELECT subscription_id, enabled, record_json
202                             FROM trigger_subscriptions
203                             WHERE handle = ?1",
204                        )
205                        .map_err(process_sqlite_error)?;
206                    let rows = stmt
207                        .query_map(params![handle.as_str()], |row| {
208                            Ok((
209                                row.get::<_, String>(0)?,
210                                row.get::<_, i64>(1)?,
211                                row.get::<_, String>(2)?,
212                            ))
213                        })
214                        .map_err(process_sqlite_error)?;
215                    let mut selected = None;
216                    for row in rows {
217                        let (subscription_id, enabled, json) = row.map_err(process_sqlite_error)?;
218                        let record = match Self::decode_subscription(json.clone()) {
219                            Ok(record) => record,
220                            Err(err) => {
221                                tracing::warn!(
222                                    error = %err,
223                                    subscription_id,
224                                    handle,
225                                    "skipping malformed trigger subscription during cancel"
226                                );
227                                continue;
228                            }
229                        };
230                        if record.registrant_session_id() == Some(session_id.as_str()) {
231                            selected = Some((subscription_id, enabled, json));
232                            break;
233                        }
234                    }
235                    let Some((subscription_id, enabled, json)) = selected else {
236                        return Ok(false);
237                    };
238                    let changed = enabled != 0;
239                    let updated_at_ms = current_epoch_ms();
240                    match Self::decode_subscription(json) {
241                        Ok(mut record) => {
242                            record.enabled = false;
243                            record.updated_at_ms = updated_at_ms;
244                            tx.execute(
245                                "UPDATE trigger_subscriptions
246                                 SET enabled = ?3, updated_at_ms = ?4, record_json = ?5
247                                 WHERE subscription_id = ?1 AND handle = ?2",
248                                params![
249                                    subscription_id.as_str(),
250                                    handle.as_str(),
251                                    i64::from(record.enabled),
252                                    record.updated_at_ms as i64,
253                                    Self::encode_json(&record)?,
254                                ],
255                            )
256                            .map_err(process_sqlite_error)?;
257                        }
258                        Err(err) => {
259                            tracing::warn!(
260                                error = %err,
261                                subscription_id,
262                                handle,
263                                "disabling malformed trigger subscription without rewriting record JSON"
264                            );
265                            tx.execute(
266                                "UPDATE trigger_subscriptions
267                                 SET enabled = ?3, updated_at_ms = ?4
268                                 WHERE subscription_id = ?1 AND handle = ?2",
269                                params![
270                                    subscription_id.as_str(),
271                                    handle.as_str(),
272                                    0i64,
273                                    updated_at_ms as i64,
274                                ],
275                            )
276                            .map_err(process_sqlite_error)?;
277                        }
278                    }
279                    Ok(changed)
280                })()))
281            })
282            .await
283            .map_err(process_sqlite_error)?
284    }
285
286    async fn delete_session_subscriptions(
287        &self,
288        session_id: &str,
289    ) -> Result<usize, lash_core::PluginError> {
290        let session_id = session_id.to_string();
291        self.conn
292            .write_flow(move |tx| {
293                Ok(trigger_tx_outcome((|| {
294                    let mut stmt = tx
295                        .prepare("SELECT subscription_id, record_json FROM trigger_subscriptions")
296                        .map_err(process_sqlite_error)?;
297                    let rows = stmt
298                        .query_map([], |row| {
299                            Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
300                        })
301                        .map_err(process_sqlite_error)?;
302                    let mut subscription_ids = Vec::new();
303                    for row in rows {
304                        let (subscription_id, json) = row.map_err(process_sqlite_error)?;
305                        let record = match Self::decode_subscription(json) {
306                            Ok(record) => record,
307                            Err(err) => {
308                                tracing::warn!(
309                                    error = %err,
310                                    subscription_id,
311                                    "skipping malformed trigger subscription during session delete"
312                                );
313                                continue;
314                            }
315                        };
316                        if record.registrant_session_id() == Some(session_id.as_str()) {
317                            subscription_ids.push(subscription_id);
318                        }
319                    }
320                    drop(stmt);
321                    let mut deleted = 0usize;
322                    for subscription_id in subscription_ids {
323                        deleted = deleted.saturating_add(
324                            tx.execute(
325                                "DELETE FROM trigger_subscriptions WHERE subscription_id = ?1",
326                                params![subscription_id.as_str()],
327                            )
328                            .map_err(process_sqlite_error)?,
329                        );
330                    }
331                    Ok(deleted)
332                })()))
333            })
334            .await
335            .map_err(process_sqlite_error)?
336    }
337
338    async fn record_occurrence(
339        &self,
340        request: lash_core::TriggerOccurrenceRequest,
341    ) -> Result<lash_core::TriggerOccurrenceRecord, lash_core::PluginError> {
342        lash_core::validate_trigger_occurrence_request(&request)?;
343        let request_hash = lash_core::trigger_occurrence_request_hash(&request)?;
344        let occurrence_id = lash_core::deterministic_occurrence_id(&request)?;
345        self.conn
346            .write_flow(move |tx| {
347                Ok(trigger_tx_outcome((|| {
348                    let existing: Option<(String, String)> = tx
349                        .query_row(
350                            "SELECT request_hash, record_json
351                             FROM trigger_occurrences
352                             WHERE idempotency_key = ?1",
353                            params![request.idempotency_key.as_str()],
354                            |row| Ok((row.get(0)?, row.get(1)?)),
355                        )
356                        .optional()
357                        .map_err(process_sqlite_error)?;
358                    if let Some((existing_hash, existing_json)) = existing {
359                        if existing_hash != request_hash {
360                            return Err(lash_core::PluginError::Session(format!(
361                                "trigger occurrence idempotency conflict for `{}`",
362                                request.idempotency_key
363                            )));
364                        }
365                        return Self::decode_occurrence(existing_json);
366                    }
367                    let record = lash_core::TriggerOccurrenceRecord {
368                        occurrence_id: occurrence_id.clone(),
369                        source_type: request.source_type,
370                        source_key: request.source_key,
371                        payload: request.payload,
372                        idempotency_key: request.idempotency_key,
373                        source: request.source,
374                        occurred_at_ms: current_epoch_ms(),
375                    };
376                    tx.execute(
377                        "INSERT INTO trigger_occurrences (
378                            occurrence_id, idempotency_key, request_hash, source_type,
379                            source_key, occurred_at_ms, record_json
380                         )
381                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
382                        params![
383                            record.occurrence_id.as_str(),
384                            record.idempotency_key.as_str(),
385                            request_hash.as_str(),
386                            record.source_type.as_str(),
387                            record.source_key.as_str(),
388                            record.occurred_at_ms as i64,
389                            Self::encode_json(&record)?,
390                        ],
391                    )
392                    .map_err(process_sqlite_error)?;
393                    Ok(record)
394                })()))
395            })
396            .await
397            .map_err(process_sqlite_error)?
398    }
399
400    async fn reserve_matching_deliveries(
401        &self,
402        occurrence_id: &str,
403    ) -> Result<Vec<lash_core::TriggerDeliveryReservation>, lash_core::PluginError> {
404        let occurrence_id = occurrence_id.to_string();
405        self.conn
406            .write_flow(move |tx| {
407                Ok(trigger_tx_outcome((|| {
408                    let occurrence_json: Option<String> = tx
409                        .query_row(
410                            "SELECT record_json
411                             FROM trigger_occurrences
412                             WHERE occurrence_id = ?1",
413                            params![occurrence_id.as_str()],
414                            |row| row.get(0),
415                        )
416                        .optional()
417                        .map_err(process_sqlite_error)?;
418                    let Some(occurrence_json) = occurrence_json else {
419                        return Err(lash_core::PluginError::Session(format!(
420                            "unknown trigger occurrence `{occurrence_id}`"
421                        )));
422                    };
423                    let occurrence = Self::decode_occurrence(occurrence_json)?;
424                    let subscriptions = {
425                        let mut stmt = tx
426                            .prepare(
427                                "SELECT subscription_id, record_json
428                                 FROM trigger_subscriptions
429                                 WHERE enabled = 1 AND source_type = ?1 AND source_key = ?2
430                                 ORDER BY registrant_scope_id ASC, handle ASC",
431                            )
432                            .map_err(process_sqlite_error)?;
433                        let rows = stmt
434                            .query_map(
435                                params![
436                                    occurrence.source_type.as_str(),
437                                    occurrence.source_key.as_str()
438                                ],
439                                |row| {
440                                    Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
441                                },
442                            )
443                            .map_err(process_sqlite_error)?;
444                        let mut subscriptions = Vec::new();
445                        for row in rows {
446                            let (subscription_id, json) = row.map_err(process_sqlite_error)?;
447                            match Self::decode_subscription(json) {
448                                Ok(subscription) => subscriptions.push(subscription),
449                                Err(err) => tracing::warn!(
450                                    error = %err,
451                                    subscription_id,
452                                    occurrence_id = %occurrence.occurrence_id,
453                                    "skipping malformed trigger subscription during delivery reservation"
454                                ),
455                            }
456                        }
457                        subscriptions
458                    };
459                    let mut reservations = Vec::new();
460                    for subscription in subscriptions {
461                        let process_id = lash_core::deterministic_delivery_process_id(
462                            &occurrence.occurrence_id,
463                            &subscription.subscription_id,
464                        )?;
465                        let inserted = tx
466                            .execute(
467                                "INSERT OR IGNORE INTO trigger_deliveries (
468                                    occurrence_id, subscription_id, process_id, created_at_ms
469                                 )
470                                 VALUES (?1, ?2, ?3, ?4)",
471                                params![
472                                    occurrence.occurrence_id.as_str(),
473                                    subscription.subscription_id.as_str(),
474                                    process_id.as_str(),
475                                    current_epoch_ms() as i64,
476                                ],
477                            )
478                            .map_err(process_sqlite_error)?;
479                        if inserted == 0 {
480                            continue;
481                        }
482                        reservations.push(lash_core::TriggerDeliveryReservation {
483                            occurrence: occurrence.clone(),
484                            subscription,
485                            process_id,
486                        });
487                    }
488                    Ok(reservations)
489                })()))
490            })
491            .await
492            .map_err(process_sqlite_error)?
493    }
494}