Skip to main content

lash_sqlite_store/
host_events.rs

1//! SQLite-backed runtime host-event store.
2//!
3//! This is the durable peer of [`SqliteProcessRegistry`]: it stores trigger
4//! subscriptions and append-only host-event occurrences at deployment scope,
5//! outside any session database.
6
7use super::*;
8
9pub struct SqliteHostEventStore {
10    conn: SqliteConnection,
11}
12
13impl SqliteHostEventStore {
14    pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
15        let conn = SqliteConnection::open(path).await?;
16        ensure_host_event_schema(&conn).await?;
17        apply_pragmas(&conn, StoreBacking::File).await?;
18        Ok(Self { conn })
19    }
20
21    pub async fn memory() -> tokio_rusqlite::Result<Self> {
22        let conn = SqliteConnection::open_in_memory().await?;
23        ensure_host_event_schema(&conn).await?;
24        apply_pragmas(&conn, StoreBacking::Memory).await?;
25        Ok(Self { conn })
26    }
27
28    fn encode_json<T: serde::Serialize>(value: &T) -> Result<String, lash_core::PluginError> {
29        serde_json::to_string(value).map_err(|err| {
30            lash_core::PluginError::Session(format!("failed to encode host event row: {err}"))
31        })
32    }
33
34    fn decode_subscription(
35        json: String,
36    ) -> Result<lash_core::TriggerSubscriptionRecord, lash_core::PluginError> {
37        serde_json::from_str(&json).map_err(|err| {
38            lash_core::PluginError::Session(format!(
39                "failed to decode host event subscription row: {err}"
40            ))
41        })
42    }
43
44    fn decode_occurrence(
45        json: String,
46    ) -> Result<lash_core::HostEventOccurrenceRecord, lash_core::PluginError> {
47        serde_json::from_str(&json).map_err(|err| {
48            lash_core::PluginError::Session(format!(
49                "failed to decode host event occurrence row: {err}"
50            ))
51        })
52    }
53}
54
55fn host_event_tx_outcome<T>(
56    result: Result<T, lash_core::PluginError>,
57) -> TxOutcome<Result<T, lash_core::PluginError>> {
58    match result {
59        Ok(value) => TxOutcome::Commit(Ok(value)),
60        Err(err) => TxOutcome::Rollback(Err(err)),
61    }
62}
63
64#[async_trait::async_trait]
65impl lash_core::HostEventStore for SqliteHostEventStore {
66    fn durability_tier(&self) -> DurabilityTier {
67        DurabilityTier::Durable
68    }
69
70    async fn register_subscription(
71        &self,
72        draft: lash_core::TriggerSubscriptionDraft,
73    ) -> Result<lash_core::TriggerSubscriptionRecord, lash_core::PluginError> {
74        self.conn
75            .write_flow(move |tx| {
76                Ok(host_event_tx_outcome((|| {
77                    tx.execute("INSERT INTO host_event_subscription_seq DEFAULT VALUES", [])
78                        .map_err(process_sqlite_error)?;
79                    let seq = tx.last_insert_rowid();
80                    let handle = format!("trigger:{seq}");
81                    let subscription_id = format!("subscription:{seq}");
82                    let now = current_epoch_ms();
83                    let record = lash_core::TriggerSubscriptionRecord {
84                        subscription_id: subscription_id.clone(),
85                        registrant: draft.registrant,
86                        env_ref: draft.env_ref,
87                        wake_target: draft.wake_target,
88                        handle,
89                        name: draft.name,
90                        source_type: draft.source_type,
91                        source_key: draft.source_key,
92                        source: draft.source,
93                        event_ty: draft.event_ty,
94                        module_ref: draft.module_ref,
95                        required_surface_ref: draft.required_surface_ref,
96                        process_ref: draft.process_ref,
97                        process_name: draft.process_name,
98                        input_template: draft.input_template,
99                        enabled: true,
100                        created_at_ms: now,
101                        updated_at_ms: now,
102                    };
103                    tx.execute(
104                        "INSERT INTO host_event_trigger_subscriptions (
105                            subscription_id, registrant_scope_id, handle, source_type, source_key,
106                            enabled, created_at_ms, updated_at_ms, record_json
107                         )
108                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
109                        params![
110                            record.subscription_id.as_str(),
111                            record.registrant_scope_id().as_str(),
112                            record.handle.as_str(),
113                            record.source_type.as_str(),
114                            record.source_key.as_str(),
115                            i64::from(record.enabled),
116                            record.created_at_ms as i64,
117                            record.updated_at_ms as i64,
118                            Self::encode_json(&record)?,
119                        ],
120                    )
121                    .map_err(process_sqlite_error)?;
122                    Ok(record)
123                })()))
124            })
125            .await
126            .map_err(process_sqlite_error)?
127    }
128
129    async fn list_subscriptions(
130        &self,
131        filter: lash_core::TriggerSubscriptionFilter,
132    ) -> Result<Vec<lash_core::TriggerSubscriptionRecord>, lash_core::PluginError> {
133        self.conn
134            .call(move |conn| {
135                Ok((|| {
136                    let mut sql =
137                        "SELECT record_json FROM host_event_trigger_subscriptions WHERE 1 = 1"
138                            .to_string();
139                    let mut values = Vec::<rusqlite::types::Value>::new();
140                    if let Some(handle) = filter.handle.as_ref() {
141                        sql.push_str(" AND handle = ?");
142                        values.push(handle.clone().into());
143                    }
144                    if let Some(name) = filter.name.as_ref() {
145                        sql.push_str(" AND json_extract(record_json, '$.name') = ?");
146                        values.push(name.clone().into());
147                    }
148                    if let Some(source_type) = filter.source_type.as_ref() {
149                        sql.push_str(" AND source_type = ?");
150                        values.push(source_type.clone().into());
151                    }
152                    if let Some(source_key) = filter.source_key.as_ref() {
153                        sql.push_str(" AND source_key = ?");
154                        values.push(source_key.clone().into());
155                    }
156                    if let Some(enabled) = filter.enabled {
157                        sql.push_str(" AND enabled = ?");
158                        values.push(i64::from(enabled).into());
159                    }
160                    sql.push_str(" ORDER BY registrant_scope_id ASC, handle ASC");
161                    let mut stmt = conn.prepare(&sql).map_err(process_sqlite_error)?;
162                    let rows = stmt
163                        .query_map(rusqlite::params_from_iter(values.iter()), |row| {
164                            row.get::<_, String>(0)
165                        })
166                        .map_err(process_sqlite_error)?;
167                    let mut records = Vec::new();
168                    for row in rows {
169                        let record = Self::decode_subscription(row.map_err(process_sqlite_error)?)?;
170                        if filter.matches(&record) {
171                            records.push(record);
172                        }
173                    }
174                    Ok(records)
175                })())
176            })
177            .await
178            .map_err(process_sqlite_error)?
179    }
180
181    async fn cancel_subscription(
182        &self,
183        session_id: &str,
184        handle: &str,
185    ) -> Result<bool, lash_core::PluginError> {
186        let session_id = session_id.to_string();
187        let handle = handle.to_string();
188        self.conn
189            .write_flow(move |tx| {
190                Ok(host_event_tx_outcome((|| {
191                    let json = {
192                        let mut stmt = tx
193                            .prepare(
194                                "SELECT record_json
195                                 FROM host_event_trigger_subscriptions
196                                 WHERE handle = ?1
197                                 ORDER BY registrant_scope_id ASC",
198                            )
199                            .map_err(process_sqlite_error)?;
200                        let rows = stmt
201                            .query_map(params![handle.as_str()], |row| row.get::<_, String>(0))
202                            .map_err(process_sqlite_error)?;
203                        let mut matched = None;
204                        for row in rows {
205                            let json = row.map_err(process_sqlite_error)?;
206                            let record = Self::decode_subscription(json.clone())?;
207                            if record.registrant_session_id() == Some(session_id.as_str()) {
208                                matched = Some(json);
209                                break;
210                            }
211                        }
212                        matched
213                    };
214                    let Some(json) = json else {
215                        return Ok(false);
216                    };
217                    let mut record = Self::decode_subscription(json)?;
218                    let changed = record.enabled;
219                    record.enabled = false;
220                    record.updated_at_ms = current_epoch_ms();
221                    tx.execute(
222                        "UPDATE host_event_trigger_subscriptions
223                         SET enabled = ?3, updated_at_ms = ?4, record_json = ?5
224                         WHERE registrant_scope_id = ?1 AND handle = ?2",
225                        params![
226                            record.registrant_scope_id().as_str(),
227                            handle.as_str(),
228                            i64::from(record.enabled),
229                            record.updated_at_ms as i64,
230                            Self::encode_json(&record)?,
231                        ],
232                    )
233                    .map_err(process_sqlite_error)?;
234                    Ok(changed)
235                })()))
236            })
237            .await
238            .map_err(process_sqlite_error)?
239    }
240
241    async fn delete_session_subscriptions(
242        &self,
243        session_id: &str,
244    ) -> Result<usize, lash_core::PluginError> {
245        let session_id = session_id.to_string();
246        self.conn
247            .write_flow(move |tx| {
248                Ok(host_event_tx_outcome((|| {
249                    let rows = {
250                        let mut stmt = tx
251                            .prepare(
252                                "SELECT subscription_id, record_json
253                                 FROM host_event_trigger_subscriptions
254                                 ORDER BY registrant_scope_id ASC, handle ASC",
255                            )
256                            .map_err(process_sqlite_error)?;
257                        let rows = stmt
258                            .query_map([], |row| {
259                                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
260                            })
261                            .map_err(process_sqlite_error)?;
262                        let mut rows_out = Vec::new();
263                        for row in rows {
264                            rows_out.push(row.map_err(process_sqlite_error)?);
265                        }
266                        rows_out
267                    };
268                    let mut deleted = 0usize;
269                    for (subscription_id, json) in rows {
270                        let record = Self::decode_subscription(json)?;
271                        if record.registrant_session_id() != Some(session_id.as_str()) {
272                            continue;
273                        }
274                        tx.execute(
275                            "DELETE FROM host_event_trigger_subscriptions WHERE subscription_id = ?1",
276                            params![subscription_id.as_str()],
277                        )
278                        .map_err(process_sqlite_error)?;
279                        deleted = deleted.saturating_add(1);
280                    }
281                    Ok(deleted)
282                })()))
283            })
284            .await
285            .map_err(process_sqlite_error)?
286    }
287
288    async fn record_occurrence(
289        &self,
290        request: lash_core::HostEventOccurrenceRequest,
291    ) -> Result<lash_core::HostEventOccurrenceRecord, lash_core::PluginError> {
292        lash_core::validate_host_event_occurrence_request(&request)?;
293        let request_hash = lash_core::host_event_occurrence_request_hash(&request)?;
294        let occurrence_id = lash_core::deterministic_occurrence_id(&request)?;
295        self.conn
296            .write_flow(move |tx| {
297                Ok(host_event_tx_outcome((|| {
298                    let existing: Option<(String, String)> = tx
299                        .query_row(
300                            "SELECT request_hash, record_json
301                             FROM host_event_occurrences
302                             WHERE idempotency_key = ?1",
303                            params![request.idempotency_key.as_str()],
304                            |row| Ok((row.get(0)?, row.get(1)?)),
305                        )
306                        .optional()
307                        .map_err(process_sqlite_error)?;
308                    if let Some((existing_hash, existing_json)) = existing {
309                        if existing_hash != request_hash {
310                            return Err(lash_core::PluginError::Session(format!(
311                                "host event occurrence idempotency conflict for `{}`",
312                                request.idempotency_key
313                            )));
314                        }
315                        return Self::decode_occurrence(existing_json);
316                    }
317                    let record = lash_core::HostEventOccurrenceRecord {
318                        occurrence_id: occurrence_id.clone(),
319                        source_type: request.source_type,
320                        source_key: request.source_key,
321                        payload: request.payload,
322                        idempotency_key: request.idempotency_key,
323                        source: request.source,
324                        occurred_at_ms: current_epoch_ms(),
325                    };
326                    tx.execute(
327                        "INSERT INTO host_event_occurrences (
328                            occurrence_id, idempotency_key, request_hash, source_type,
329                            source_key, occurred_at_ms, record_json
330                         )
331                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
332                        params![
333                            record.occurrence_id.as_str(),
334                            record.idempotency_key.as_str(),
335                            request_hash.as_str(),
336                            record.source_type.as_str(),
337                            record.source_key.as_str(),
338                            record.occurred_at_ms as i64,
339                            Self::encode_json(&record)?,
340                        ],
341                    )
342                    .map_err(process_sqlite_error)?;
343                    Ok(record)
344                })()))
345            })
346            .await
347            .map_err(process_sqlite_error)?
348    }
349
350    async fn reserve_matching_deliveries(
351        &self,
352        occurrence_id: &str,
353    ) -> Result<Vec<lash_core::TriggerDeliveryReservation>, lash_core::PluginError> {
354        let occurrence_id = occurrence_id.to_string();
355        self.conn
356            .write_flow(move |tx| {
357                Ok(host_event_tx_outcome((|| {
358                    let occurrence_json: Option<String> = tx
359                        .query_row(
360                            "SELECT record_json
361                             FROM host_event_occurrences
362                             WHERE occurrence_id = ?1",
363                            params![occurrence_id.as_str()],
364                            |row| row.get(0),
365                        )
366                        .optional()
367                        .map_err(process_sqlite_error)?;
368                    let Some(occurrence_json) = occurrence_json else {
369                        return Err(lash_core::PluginError::Session(format!(
370                            "unknown host event occurrence `{occurrence_id}`"
371                        )));
372                    };
373                    let occurrence = Self::decode_occurrence(occurrence_json)?;
374                    let subscriptions = {
375                        let mut stmt = tx
376                            .prepare(
377                                "SELECT record_json
378                                 FROM host_event_trigger_subscriptions
379                                 WHERE enabled = 1 AND source_type = ?1 AND source_key = ?2
380                                 ORDER BY registrant_scope_id ASC, handle ASC",
381                            )
382                            .map_err(process_sqlite_error)?;
383                        let rows = stmt
384                            .query_map(
385                                params![
386                                    occurrence.source_type.as_str(),
387                                    occurrence.source_key.as_str()
388                                ],
389                                |row| row.get::<_, String>(0),
390                            )
391                            .map_err(process_sqlite_error)?;
392                        let mut subscriptions = Vec::new();
393                        for row in rows {
394                            subscriptions.push(Self::decode_subscription(
395                                row.map_err(process_sqlite_error)?,
396                            )?);
397                        }
398                        subscriptions
399                    };
400                    let mut reservations = Vec::new();
401                    for subscription in subscriptions {
402                        let process_id = lash_core::deterministic_delivery_process_id(
403                            &occurrence.occurrence_id,
404                            &subscription.subscription_id,
405                        )?;
406                        let inserted = tx
407                            .execute(
408                                "INSERT OR IGNORE INTO host_event_deliveries (
409                                    occurrence_id, subscription_id, process_id, created_at_ms
410                                 )
411                                 VALUES (?1, ?2, ?3, ?4)",
412                                params![
413                                    occurrence.occurrence_id.as_str(),
414                                    subscription.subscription_id.as_str(),
415                                    process_id.as_str(),
416                                    current_epoch_ms() as i64,
417                                ],
418                            )
419                            .map_err(process_sqlite_error)?;
420                        if inserted == 0 {
421                            continue;
422                        }
423                        reservations.push(lash_core::TriggerDeliveryReservation {
424                            occurrence: occurrence.clone(),
425                            subscription,
426                            process_id,
427                        });
428                    }
429                    Ok(reservations)
430                })()))
431            })
432            .await
433            .map_err(process_sqlite_error)?
434    }
435}