Skip to main content

lash_sqlite_store/
host_events.rs

1//! SQLite-backed runtime host-event store.
2//!
3//! This is the durable peer of [`SqliteProcessRegistry`]: it stores trigger
4//! subscriptions and append-only host-event occurrences at deployment scope,
5//! outside any session database.
6
7use super::*;
8
9pub struct SqliteHostEventStore {
10    conn: SqliteConnection,
11}
12
13impl SqliteHostEventStore {
14    pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
15        let conn = SqliteConnection::open(path).await?;
16        ensure_host_event_schema(&conn).await?;
17        apply_pragmas(&conn, StoreBacking::File).await?;
18        Ok(Self { conn })
19    }
20
21    pub async fn memory() -> tokio_rusqlite::Result<Self> {
22        let conn = SqliteConnection::open_in_memory().await?;
23        ensure_host_event_schema(&conn).await?;
24        apply_pragmas(&conn, StoreBacking::Memory).await?;
25        Ok(Self { conn })
26    }
27
28    fn encode_json<T: serde::Serialize>(value: &T) -> Result<String, lash_core::PluginError> {
29        serde_json::to_string(value).map_err(|err| {
30            lash_core::PluginError::Session(format!("failed to encode host event row: {err}"))
31        })
32    }
33
34    fn decode_subscription(
35        json: String,
36    ) -> Result<lash_core::TriggerSubscriptionRecord, lash_core::PluginError> {
37        serde_json::from_str(&json).map_err(|err| {
38            lash_core::PluginError::Session(format!(
39                "failed to decode host event subscription row: {err}"
40            ))
41        })
42    }
43
44    fn decode_occurrence(
45        json: String,
46    ) -> Result<lash_core::HostEventOccurrenceRecord, lash_core::PluginError> {
47        serde_json::from_str(&json).map_err(|err| {
48            lash_core::PluginError::Session(format!(
49                "failed to decode host event occurrence row: {err}"
50            ))
51        })
52    }
53}
54
55fn host_event_tx_outcome<T>(
56    result: Result<T, lash_core::PluginError>,
57) -> TxOutcome<Result<T, lash_core::PluginError>> {
58    match result {
59        Ok(value) => TxOutcome::Commit(Ok(value)),
60        Err(err) => TxOutcome::Rollback(Err(err)),
61    }
62}
63
64#[async_trait::async_trait]
65impl lash_core::HostEventStore for SqliteHostEventStore {
66    fn durability_tier(&self) -> DurabilityTier {
67        DurabilityTier::Durable
68    }
69
70    async fn register_subscription(
71        &self,
72        draft: lash_core::TriggerSubscriptionDraft,
73    ) -> Result<lash_core::TriggerSubscriptionRecord, lash_core::PluginError> {
74        self.conn
75            .write_flow(move |tx| {
76                Ok(host_event_tx_outcome((|| {
77                    tx.execute("INSERT INTO host_event_subscription_seq DEFAULT VALUES", [])
78                        .map_err(process_sqlite_error)?;
79                    let seq = tx.last_insert_rowid();
80                    let handle = format!("trigger:{seq}");
81                    let subscription_id = format!("subscription:{seq}");
82                    let now = current_epoch_ms();
83                    let record = lash_core::TriggerSubscriptionRecord {
84                        subscription_id: subscription_id.clone(),
85                        session_id: draft.session_id,
86                        handle,
87                        name: draft.name,
88                        source_type: draft.source_type,
89                        source_key: draft.source_key,
90                        source: draft.source,
91                        event_ty: draft.event_ty,
92                        module_ref: draft.module_ref,
93                        required_surface_ref: draft.required_surface_ref,
94                        process_ref: draft.process_ref,
95                        process_name: draft.process_name,
96                        input_template: draft.input_template,
97                        enabled: true,
98                        created_at_ms: now,
99                        updated_at_ms: now,
100                    };
101                    tx.execute(
102                        "INSERT INTO host_event_trigger_subscriptions (
103                            subscription_id, session_id, handle, source_type, source_key,
104                            enabled, created_at_ms, updated_at_ms, record_json
105                         )
106                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
107                        params![
108                            record.subscription_id.as_str(),
109                            record.session_id.as_str(),
110                            record.handle.as_str(),
111                            record.source_type.as_str(),
112                            record.source_key.as_str(),
113                            i64::from(record.enabled),
114                            record.created_at_ms as i64,
115                            record.updated_at_ms as i64,
116                            Self::encode_json(&record)?,
117                        ],
118                    )
119                    .map_err(process_sqlite_error)?;
120                    Ok(record)
121                })()))
122            })
123            .await
124            .map_err(process_sqlite_error)?
125    }
126
127    async fn list_subscriptions(
128        &self,
129        filter: lash_core::TriggerSubscriptionFilter,
130    ) -> Result<Vec<lash_core::TriggerSubscriptionRecord>, lash_core::PluginError> {
131        self.conn
132            .call(move |conn| {
133                Ok((|| {
134                    let mut sql =
135                        "SELECT record_json FROM host_event_trigger_subscriptions WHERE 1 = 1"
136                            .to_string();
137                    let mut values = Vec::<rusqlite::types::Value>::new();
138                    if let Some(session_id) = filter.session_id.as_ref() {
139                        sql.push_str(" AND session_id = ?");
140                        values.push(session_id.clone().into());
141                    }
142                    if let Some(handle) = filter.handle.as_ref() {
143                        sql.push_str(" AND handle = ?");
144                        values.push(handle.clone().into());
145                    }
146                    if let Some(name) = filter.name.as_ref() {
147                        sql.push_str(" AND json_extract(record_json, '$.name') = ?");
148                        values.push(name.clone().into());
149                    }
150                    if let Some(source_type) = filter.source_type.as_ref() {
151                        sql.push_str(" AND source_type = ?");
152                        values.push(source_type.clone().into());
153                    }
154                    if let Some(source_key) = filter.source_key.as_ref() {
155                        sql.push_str(" AND source_key = ?");
156                        values.push(source_key.clone().into());
157                    }
158                    if let Some(enabled) = filter.enabled {
159                        sql.push_str(" AND enabled = ?");
160                        values.push(i64::from(enabled).into());
161                    }
162                    sql.push_str(" ORDER BY session_id ASC, handle ASC");
163                    let mut stmt = conn.prepare(&sql).map_err(process_sqlite_error)?;
164                    let rows = stmt
165                        .query_map(rusqlite::params_from_iter(values.iter()), |row| {
166                            row.get::<_, String>(0)
167                        })
168                        .map_err(process_sqlite_error)?;
169                    let mut records = Vec::new();
170                    for row in rows {
171                        let record = Self::decode_subscription(row.map_err(process_sqlite_error)?)?;
172                        if filter.matches(&record) {
173                            records.push(record);
174                        }
175                    }
176                    Ok(records)
177                })())
178            })
179            .await
180            .map_err(process_sqlite_error)?
181    }
182
183    async fn cancel_subscription(
184        &self,
185        session_id: &str,
186        handle: &str,
187    ) -> Result<bool, lash_core::PluginError> {
188        let session_id = session_id.to_string();
189        let handle = handle.to_string();
190        self.conn
191            .write_flow(move |tx| {
192                Ok(host_event_tx_outcome((|| {
193                    let json: Option<String> = tx
194                        .query_row(
195                            "SELECT record_json
196                             FROM host_event_trigger_subscriptions
197                             WHERE session_id = ?1 AND handle = ?2",
198                            params![session_id.as_str(), handle.as_str()],
199                            |row| row.get(0),
200                        )
201                        .optional()
202                        .map_err(process_sqlite_error)?;
203                    let Some(json) = json else {
204                        return Ok(false);
205                    };
206                    let mut record = Self::decode_subscription(json)?;
207                    let changed = record.enabled;
208                    record.enabled = false;
209                    record.updated_at_ms = current_epoch_ms();
210                    tx.execute(
211                        "UPDATE host_event_trigger_subscriptions
212                         SET enabled = ?3, updated_at_ms = ?4, record_json = ?5
213                         WHERE session_id = ?1 AND handle = ?2",
214                        params![
215                            session_id.as_str(),
216                            handle.as_str(),
217                            i64::from(record.enabled),
218                            record.updated_at_ms as i64,
219                            Self::encode_json(&record)?,
220                        ],
221                    )
222                    .map_err(process_sqlite_error)?;
223                    Ok(changed)
224                })()))
225            })
226            .await
227            .map_err(process_sqlite_error)?
228    }
229
230    async fn record_occurrence(
231        &self,
232        request: lash_core::HostEventOccurrenceRequest,
233    ) -> Result<lash_core::HostEventOccurrenceRecord, lash_core::PluginError> {
234        lash_core::validate_host_event_occurrence_request(&request)?;
235        let request_hash = lash_core::host_event_occurrence_request_hash(&request)?;
236        let occurrence_id = lash_core::deterministic_occurrence_id(&request)?;
237        self.conn
238            .write_flow(move |tx| {
239                Ok(host_event_tx_outcome((|| {
240                    let existing: Option<(String, String)> = tx
241                        .query_row(
242                            "SELECT request_hash, record_json
243                             FROM host_event_occurrences
244                             WHERE idempotency_key = ?1",
245                            params![request.idempotency_key.as_str()],
246                            |row| Ok((row.get(0)?, row.get(1)?)),
247                        )
248                        .optional()
249                        .map_err(process_sqlite_error)?;
250                    if let Some((existing_hash, existing_json)) = existing {
251                        if existing_hash != request_hash {
252                            return Err(lash_core::PluginError::Session(format!(
253                                "host event occurrence idempotency conflict for `{}`",
254                                request.idempotency_key
255                            )));
256                        }
257                        return Self::decode_occurrence(existing_json);
258                    }
259                    let record = lash_core::HostEventOccurrenceRecord {
260                        occurrence_id: occurrence_id.clone(),
261                        source_type: request.source_type,
262                        source_key: request.source_key,
263                        payload: request.payload,
264                        idempotency_key: request.idempotency_key,
265                        source: request.source,
266                        occurred_at_ms: current_epoch_ms(),
267                    };
268                    tx.execute(
269                        "INSERT INTO host_event_occurrences (
270                            occurrence_id, idempotency_key, request_hash, source_type,
271                            source_key, occurred_at_ms, record_json
272                         )
273                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
274                        params![
275                            record.occurrence_id.as_str(),
276                            record.idempotency_key.as_str(),
277                            request_hash.as_str(),
278                            record.source_type.as_str(),
279                            record.source_key.as_str(),
280                            record.occurred_at_ms as i64,
281                            Self::encode_json(&record)?,
282                        ],
283                    )
284                    .map_err(process_sqlite_error)?;
285                    Ok(record)
286                })()))
287            })
288            .await
289            .map_err(process_sqlite_error)?
290    }
291
292    async fn reserve_matching_deliveries(
293        &self,
294        occurrence_id: &str,
295    ) -> Result<Vec<lash_core::TriggerDeliveryReservation>, lash_core::PluginError> {
296        let occurrence_id = occurrence_id.to_string();
297        self.conn
298            .write_flow(move |tx| {
299                Ok(host_event_tx_outcome((|| {
300                    let occurrence_json: Option<String> = tx
301                        .query_row(
302                            "SELECT record_json
303                             FROM host_event_occurrences
304                             WHERE occurrence_id = ?1",
305                            params![occurrence_id.as_str()],
306                            |row| row.get(0),
307                        )
308                        .optional()
309                        .map_err(process_sqlite_error)?;
310                    let Some(occurrence_json) = occurrence_json else {
311                        return Err(lash_core::PluginError::Session(format!(
312                            "unknown host event occurrence `{occurrence_id}`"
313                        )));
314                    };
315                    let occurrence = Self::decode_occurrence(occurrence_json)?;
316                    let subscriptions = {
317                        let mut stmt = tx
318                            .prepare(
319                                "SELECT record_json
320                                 FROM host_event_trigger_subscriptions
321                                 WHERE enabled = 1 AND source_type = ?1 AND source_key = ?2
322                                 ORDER BY session_id ASC, handle ASC",
323                            )
324                            .map_err(process_sqlite_error)?;
325                        let rows = stmt
326                            .query_map(
327                                params![
328                                    occurrence.source_type.as_str(),
329                                    occurrence.source_key.as_str()
330                                ],
331                                |row| row.get::<_, String>(0),
332                            )
333                            .map_err(process_sqlite_error)?;
334                        let mut subscriptions = Vec::new();
335                        for row in rows {
336                            subscriptions.push(Self::decode_subscription(
337                                row.map_err(process_sqlite_error)?,
338                            )?);
339                        }
340                        subscriptions
341                    };
342                    let mut reservations = Vec::new();
343                    for subscription in subscriptions {
344                        let process_id = lash_core::deterministic_delivery_process_id(
345                            &occurrence.occurrence_id,
346                            &subscription.subscription_id,
347                        )?;
348                        let inserted = tx
349                            .execute(
350                                "INSERT OR IGNORE INTO host_event_deliveries (
351                                    occurrence_id, subscription_id, process_id, created_at_ms
352                                 )
353                                 VALUES (?1, ?2, ?3, ?4)",
354                                params![
355                                    occurrence.occurrence_id.as_str(),
356                                    subscription.subscription_id.as_str(),
357                                    process_id.as_str(),
358                                    current_epoch_ms() as i64,
359                                ],
360                            )
361                            .map_err(process_sqlite_error)?;
362                        if inserted == 0 {
363                            continue;
364                        }
365                        reservations.push(lash_core::TriggerDeliveryReservation {
366                            occurrence: occurrence.clone(),
367                            subscription,
368                            process_id,
369                        });
370                    }
371                    Ok(reservations)
372                })()))
373            })
374            .await
375            .map_err(process_sqlite_error)?
376    }
377}