Skip to main content

lash_sqlite_store/
triggers.rs

1//! SQLite-backed runtime trigger store.
2//!
3//! This is the durable peer of [`SqliteProcessRegistry`]: it stores trigger
4//! subscriptions and append-only trigger occurrences at deployment scope,
5//! outside any session database.
6
7use super::*;
8
9pub struct SqliteTriggerStore {
10    conn: SqliteConnection,
11}
12
13impl SqliteTriggerStore {
14    pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
15        let conn = SqliteConnection::open(path).await?;
16        ensure_trigger_schema(&conn).await?;
17        apply_pragmas(&conn, StoreBacking::File).await?;
18        Ok(Self { conn })
19    }
20
21    pub async fn memory() -> tokio_rusqlite::Result<Self> {
22        let conn = SqliteConnection::open_in_memory().await?;
23        ensure_trigger_schema(&conn).await?;
24        apply_pragmas(&conn, StoreBacking::Memory).await?;
25        Ok(Self { conn })
26    }
27
28    fn encode_json<T: serde::Serialize>(value: &T) -> Result<String, lash_core::PluginError> {
29        serde_json::to_string(value).map_err(|err| {
30            lash_core::PluginError::Session(format!("failed to encode trigger row: {err}"))
31        })
32    }
33
34    fn decode_subscription(
35        json: String,
36    ) -> Result<lash_core::TriggerSubscriptionRecord, lash_core::PluginError> {
37        serde_json::from_str(&json).map_err(|err| {
38            lash_core::PluginError::Session(format!(
39                "failed to decode trigger subscription row: {err}"
40            ))
41        })
42    }
43
44    fn decode_occurrence(
45        json: String,
46    ) -> Result<lash_core::TriggerOccurrenceRecord, lash_core::PluginError> {
47        serde_json::from_str(&json).map_err(|err| {
48            lash_core::PluginError::Session(format!(
49                "failed to decode trigger occurrence row: {err}"
50            ))
51        })
52    }
53
54    fn session_registrant_scope_id(session_id: &str) -> String {
55        lash_core::ProcessOriginator::session(lash_core::SessionScope::new(session_id)).scope_id()
56    }
57}
58
59fn trigger_tx_outcome<T>(
60    result: Result<T, lash_core::PluginError>,
61) -> TxOutcome<Result<T, lash_core::PluginError>> {
62    match result {
63        Ok(value) => TxOutcome::Commit(Ok(value)),
64        Err(err) => TxOutcome::Rollback(Err(err)),
65    }
66}
67
68#[async_trait::async_trait]
69impl lash_core::TriggerStore for SqliteTriggerStore {
70    fn durability_tier(&self) -> DurabilityTier {
71        DurabilityTier::Durable
72    }
73
74    async fn register_subscription(
75        &self,
76        draft: lash_core::TriggerSubscriptionDraft,
77    ) -> Result<lash_core::TriggerSubscriptionRecord, lash_core::PluginError> {
78        self.conn
79            .write_flow(move |tx| {
80                Ok(trigger_tx_outcome((|| {
81                    tx.execute("INSERT INTO trigger_subscription_seq DEFAULT VALUES", [])
82                        .map_err(process_sqlite_error)?;
83                    let seq = tx.last_insert_rowid();
84                    let handle = format!("trigger:{seq}");
85                    let subscription_id = format!("subscription:{seq}");
86                    let now = current_epoch_ms();
87                    let record = lash_core::TriggerSubscriptionRecord {
88                        subscription_id: subscription_id.clone(),
89                        registrant: draft.registrant,
90                        env_ref: draft.env_ref,
91                        wake_target: draft.wake_target,
92                        handle,
93                        name: draft.name,
94                        source_type: draft.source_type,
95                        source_key: draft.source_key,
96                        source: draft.source,
97                        event_ty: draft.event_ty,
98                        module_ref: draft.module_ref,
99                        host_requirements_ref: draft.host_requirements_ref,
100                        process_ref: draft.process_ref,
101                        process_name: draft.process_name,
102                        input_template: draft.input_template,
103                        enabled: true,
104                        created_at_ms: now,
105                        updated_at_ms: now,
106                    };
107                    tx.execute(
108                        "INSERT INTO trigger_subscriptions (
109                            subscription_id, registrant_scope_id, handle, source_type, source_key,
110                            enabled, created_at_ms, updated_at_ms, record_json
111                         )
112                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
113                        params![
114                            record.subscription_id.as_str(),
115                            record.registrant_scope_id().as_str(),
116                            record.handle.as_str(),
117                            record.source_type.as_str(),
118                            record.source_key.as_str(),
119                            i64::from(record.enabled),
120                            record.created_at_ms as i64,
121                            record.updated_at_ms as i64,
122                            Self::encode_json(&record)?,
123                        ],
124                    )
125                    .map_err(process_sqlite_error)?;
126                    Ok(record)
127                })()))
128            })
129            .await
130            .map_err(process_sqlite_error)?
131    }
132
133    async fn list_subscriptions(
134        &self,
135        filter: lash_core::TriggerSubscriptionFilter,
136    ) -> Result<Vec<lash_core::TriggerSubscriptionRecord>, lash_core::PluginError> {
137        self.conn
138            .call(move |conn| {
139                Ok((|| {
140                    let mut sql =
141                        "SELECT subscription_id, record_json FROM trigger_subscriptions WHERE 1 = 1"
142                            .to_string();
143                    let mut values = Vec::<rusqlite::types::Value>::new();
144                    if let Some(session_id) = filter.session_id.as_ref() {
145                        sql.push_str(" AND registrant_scope_id = ?");
146                        values.push(Self::session_registrant_scope_id(session_id).into());
147                    }
148                    if let Some(handle) = filter.handle.as_ref() {
149                        sql.push_str(" AND handle = ?");
150                        values.push(handle.clone().into());
151                    }
152                    if let Some(source_type) = filter.source_type.as_ref() {
153                        sql.push_str(" AND source_type = ?");
154                        values.push(source_type.clone().into());
155                    }
156                    if let Some(source_key) = filter.source_key.as_ref() {
157                        sql.push_str(" AND source_key = ?");
158                        values.push(source_key.clone().into());
159                    }
160                    if let Some(enabled) = filter.enabled {
161                        sql.push_str(" AND enabled = ?");
162                        values.push(i64::from(enabled).into());
163                    }
164                    sql.push_str(" ORDER BY registrant_scope_id ASC, handle ASC");
165                    let mut stmt = conn.prepare(&sql).map_err(process_sqlite_error)?;
166                    let rows = stmt
167                        .query_map(rusqlite::params_from_iter(values.iter()), |row| {
168                            Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
169                        })
170                        .map_err(process_sqlite_error)?;
171                    let mut records = Vec::new();
172                    for row in rows {
173                        let (subscription_id, json) = row.map_err(process_sqlite_error)?;
174                        let record = match Self::decode_subscription(json) {
175                            Ok(record) => record,
176                            Err(err) => {
177                                tracing::warn!(
178                                    error = %err,
179                                    subscription_id,
180                                    "skipping malformed trigger subscription during listing"
181                                );
182                                continue;
183                            }
184                        };
185                        if filter.matches(&record) {
186                            records.push(record);
187                        }
188                    }
189                    Ok(records)
190                })())
191            })
192            .await
193            .map_err(process_sqlite_error)?
194    }
195
196    async fn cancel_subscription(
197        &self,
198        session_id: &str,
199        handle: &str,
200    ) -> Result<bool, lash_core::PluginError> {
201        let session_id = session_id.to_string();
202        let handle = handle.to_string();
203        self.conn
204            .write_flow(move |tx| {
205                Ok(trigger_tx_outcome((|| {
206                    let registrant_scope_id = Self::session_registrant_scope_id(&session_id);
207                    let row: Option<(i64, String)> = tx
208                        .query_row(
209                            "SELECT enabled, record_json
210                             FROM trigger_subscriptions
211                             WHERE registrant_scope_id = ?1 AND handle = ?2",
212                            params![registrant_scope_id.as_str(), handle.as_str()],
213                            |row| Ok((row.get(0)?, row.get(1)?)),
214                        )
215                        .optional()
216                        .map_err(process_sqlite_error)?;
217                    let Some((enabled, json)) = row else {
218                        return Ok(false);
219                    };
220                    let changed = enabled != 0;
221                    let updated_at_ms = current_epoch_ms();
222                    match Self::decode_subscription(json) {
223                        Ok(mut record) => {
224                            record.enabled = false;
225                            record.updated_at_ms = updated_at_ms;
226                            tx.execute(
227                                "UPDATE trigger_subscriptions
228                                 SET enabled = ?3, updated_at_ms = ?4, record_json = ?5
229                                 WHERE registrant_scope_id = ?1 AND handle = ?2",
230                                params![
231                                    registrant_scope_id.as_str(),
232                                    handle.as_str(),
233                                    i64::from(record.enabled),
234                                    record.updated_at_ms as i64,
235                                    Self::encode_json(&record)?,
236                                ],
237                            )
238                            .map_err(process_sqlite_error)?;
239                        }
240                        Err(err) => {
241                            tracing::warn!(
242                                error = %err,
243                                registrant_scope_id,
244                                handle,
245                                "disabling malformed trigger subscription without rewriting record JSON"
246                            );
247                            tx.execute(
248                                "UPDATE trigger_subscriptions
249                                 SET enabled = ?3, updated_at_ms = ?4
250                                 WHERE registrant_scope_id = ?1 AND handle = ?2",
251                                params![
252                                    registrant_scope_id.as_str(),
253                                    handle.as_str(),
254                                    0i64,
255                                    updated_at_ms as i64,
256                                ],
257                            )
258                            .map_err(process_sqlite_error)?;
259                        }
260                    }
261                    Ok(changed)
262                })()))
263            })
264            .await
265            .map_err(process_sqlite_error)?
266    }
267
268    async fn delete_session_subscriptions(
269        &self,
270        session_id: &str,
271    ) -> Result<usize, lash_core::PluginError> {
272        let session_id = session_id.to_string();
273        self.conn
274            .write_flow(move |tx| {
275                Ok(trigger_tx_outcome((|| {
276                    let registrant_scope_id = Self::session_registrant_scope_id(&session_id);
277                    let deleted = tx
278                        .execute(
279                            "DELETE FROM trigger_subscriptions WHERE registrant_scope_id = ?1",
280                            params![registrant_scope_id.as_str()],
281                        )
282                        .map_err(process_sqlite_error)?;
283                    Ok(deleted)
284                })()))
285            })
286            .await
287            .map_err(process_sqlite_error)?
288    }
289
290    async fn record_occurrence(
291        &self,
292        request: lash_core::TriggerOccurrenceRequest,
293    ) -> Result<lash_core::TriggerOccurrenceRecord, lash_core::PluginError> {
294        lash_core::validate_trigger_occurrence_request(&request)?;
295        let request_hash = lash_core::trigger_occurrence_request_hash(&request)?;
296        let occurrence_id = lash_core::deterministic_occurrence_id(&request)?;
297        self.conn
298            .write_flow(move |tx| {
299                Ok(trigger_tx_outcome((|| {
300                    let existing: Option<(String, String)> = tx
301                        .query_row(
302                            "SELECT request_hash, record_json
303                             FROM trigger_occurrences
304                             WHERE idempotency_key = ?1",
305                            params![request.idempotency_key.as_str()],
306                            |row| Ok((row.get(0)?, row.get(1)?)),
307                        )
308                        .optional()
309                        .map_err(process_sqlite_error)?;
310                    if let Some((existing_hash, existing_json)) = existing {
311                        if existing_hash != request_hash {
312                            return Err(lash_core::PluginError::Session(format!(
313                                "trigger occurrence idempotency conflict for `{}`",
314                                request.idempotency_key
315                            )));
316                        }
317                        return Self::decode_occurrence(existing_json);
318                    }
319                    let record = lash_core::TriggerOccurrenceRecord {
320                        occurrence_id: occurrence_id.clone(),
321                        source_type: request.source_type,
322                        source_key: request.source_key,
323                        payload: request.payload,
324                        idempotency_key: request.idempotency_key,
325                        source: request.source,
326                        occurred_at_ms: current_epoch_ms(),
327                    };
328                    tx.execute(
329                        "INSERT INTO trigger_occurrences (
330                            occurrence_id, idempotency_key, request_hash, source_type,
331                            source_key, occurred_at_ms, record_json
332                         )
333                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
334                        params![
335                            record.occurrence_id.as_str(),
336                            record.idempotency_key.as_str(),
337                            request_hash.as_str(),
338                            record.source_type.as_str(),
339                            record.source_key.as_str(),
340                            record.occurred_at_ms as i64,
341                            Self::encode_json(&record)?,
342                        ],
343                    )
344                    .map_err(process_sqlite_error)?;
345                    Ok(record)
346                })()))
347            })
348            .await
349            .map_err(process_sqlite_error)?
350    }
351
352    async fn reserve_matching_deliveries(
353        &self,
354        occurrence_id: &str,
355    ) -> Result<Vec<lash_core::TriggerDeliveryReservation>, lash_core::PluginError> {
356        let occurrence_id = occurrence_id.to_string();
357        self.conn
358            .write_flow(move |tx| {
359                Ok(trigger_tx_outcome((|| {
360                    let occurrence_json: Option<String> = tx
361                        .query_row(
362                            "SELECT record_json
363                             FROM trigger_occurrences
364                             WHERE occurrence_id = ?1",
365                            params![occurrence_id.as_str()],
366                            |row| row.get(0),
367                        )
368                        .optional()
369                        .map_err(process_sqlite_error)?;
370                    let Some(occurrence_json) = occurrence_json else {
371                        return Err(lash_core::PluginError::Session(format!(
372                            "unknown trigger occurrence `{occurrence_id}`"
373                        )));
374                    };
375                    let occurrence = Self::decode_occurrence(occurrence_json)?;
376                    let subscriptions = {
377                        let mut stmt = tx
378                            .prepare(
379                                "SELECT subscription_id, record_json
380                                 FROM trigger_subscriptions
381                                 WHERE enabled = 1 AND source_type = ?1 AND source_key = ?2
382                                 ORDER BY registrant_scope_id ASC, handle ASC",
383                            )
384                            .map_err(process_sqlite_error)?;
385                        let rows = stmt
386                            .query_map(
387                                params![
388                                    occurrence.source_type.as_str(),
389                                    occurrence.source_key.as_str()
390                                ],
391                                |row| {
392                                    Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
393                                },
394                            )
395                            .map_err(process_sqlite_error)?;
396                        let mut subscriptions = Vec::new();
397                        for row in rows {
398                            let (subscription_id, json) = row.map_err(process_sqlite_error)?;
399                            match Self::decode_subscription(json) {
400                                Ok(subscription) => subscriptions.push(subscription),
401                                Err(err) => tracing::warn!(
402                                    error = %err,
403                                    subscription_id,
404                                    occurrence_id = %occurrence.occurrence_id,
405                                    "skipping malformed trigger subscription during delivery reservation"
406                                ),
407                            }
408                        }
409                        subscriptions
410                    };
411                    let mut reservations = Vec::new();
412                    for subscription in subscriptions {
413                        let process_id = lash_core::deterministic_delivery_process_id(
414                            &occurrence.occurrence_id,
415                            &subscription.subscription_id,
416                        )?;
417                        let inserted = tx
418                            .execute(
419                                "INSERT OR IGNORE INTO trigger_deliveries (
420                                    occurrence_id, subscription_id, process_id, created_at_ms
421                                 )
422                                 VALUES (?1, ?2, ?3, ?4)",
423                                params![
424                                    occurrence.occurrence_id.as_str(),
425                                    subscription.subscription_id.as_str(),
426                                    process_id.as_str(),
427                                    current_epoch_ms() as i64,
428                                ],
429                            )
430                            .map_err(process_sqlite_error)?;
431                        if inserted == 0 {
432                            continue;
433                        }
434                        reservations.push(lash_core::TriggerDeliveryReservation {
435                            occurrence: occurrence.clone(),
436                            subscription,
437                            process_id,
438                        });
439                    }
440                    Ok(reservations)
441                })()))
442            })
443            .await
444            .map_err(process_sqlite_error)?
445    }
446}