1use super::*;
8
9pub struct SqliteHostEventStore {
10 conn: SqliteConnection,
11}
12
13impl SqliteHostEventStore {
14 pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
15 let conn = SqliteConnection::open(path).await?;
16 ensure_host_event_schema(&conn).await?;
17 apply_pragmas(&conn, StoreBacking::File).await?;
18 Ok(Self { conn })
19 }
20
21 pub async fn memory() -> tokio_rusqlite::Result<Self> {
22 let conn = SqliteConnection::open_in_memory().await?;
23 ensure_host_event_schema(&conn).await?;
24 apply_pragmas(&conn, StoreBacking::Memory).await?;
25 Ok(Self { conn })
26 }
27
28 fn encode_json<T: serde::Serialize>(value: &T) -> Result<String, lash_core::PluginError> {
29 serde_json::to_string(value).map_err(|err| {
30 lash_core::PluginError::Session(format!("failed to encode host event row: {err}"))
31 })
32 }
33
34 fn decode_subscription(
35 json: String,
36 ) -> Result<lash_core::TriggerSubscriptionRecord, lash_core::PluginError> {
37 serde_json::from_str(&json).map_err(|err| {
38 lash_core::PluginError::Session(format!(
39 "failed to decode host event subscription row: {err}"
40 ))
41 })
42 }
43
44 fn decode_occurrence(
45 json: String,
46 ) -> Result<lash_core::HostEventOccurrenceRecord, lash_core::PluginError> {
47 serde_json::from_str(&json).map_err(|err| {
48 lash_core::PluginError::Session(format!(
49 "failed to decode host event occurrence row: {err}"
50 ))
51 })
52 }
53}
54
55fn host_event_tx_outcome<T>(
56 result: Result<T, lash_core::PluginError>,
57) -> TxOutcome<Result<T, lash_core::PluginError>> {
58 match result {
59 Ok(value) => TxOutcome::Commit(Ok(value)),
60 Err(err) => TxOutcome::Rollback(Err(err)),
61 }
62}
63
64#[async_trait::async_trait]
65impl lash_core::HostEventStore for SqliteHostEventStore {
66 fn durability_tier(&self) -> DurabilityTier {
67 DurabilityTier::Durable
68 }
69
70 async fn register_subscription(
71 &self,
72 draft: lash_core::TriggerSubscriptionDraft,
73 ) -> Result<lash_core::TriggerSubscriptionRecord, lash_core::PluginError> {
74 self.conn
75 .write_flow(move |tx| {
76 Ok(host_event_tx_outcome((|| {
77 tx.execute("INSERT INTO host_event_subscription_seq DEFAULT VALUES", [])
78 .map_err(process_sqlite_error)?;
79 let seq = tx.last_insert_rowid();
80 let handle = format!("trigger:{seq}");
81 let subscription_id = format!("subscription:{seq}");
82 let now = current_epoch_ms();
83 let record = lash_core::TriggerSubscriptionRecord {
84 subscription_id: subscription_id.clone(),
85 registrant: draft.registrant,
86 env_ref: draft.env_ref,
87 wake_target: draft.wake_target,
88 handle,
89 name: draft.name,
90 source_type: draft.source_type,
91 source_key: draft.source_key,
92 source: draft.source,
93 event_ty: draft.event_ty,
94 module_ref: draft.module_ref,
95 required_surface_ref: draft.required_surface_ref,
96 process_ref: draft.process_ref,
97 process_name: draft.process_name,
98 input_template: draft.input_template,
99 enabled: true,
100 created_at_ms: now,
101 updated_at_ms: now,
102 };
103 tx.execute(
104 "INSERT INTO host_event_trigger_subscriptions (
105 subscription_id, registrant_scope_id, handle, source_type, source_key,
106 enabled, created_at_ms, updated_at_ms, record_json
107 )
108 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
109 params![
110 record.subscription_id.as_str(),
111 record.registrant_scope_id().as_str(),
112 record.handle.as_str(),
113 record.source_type.as_str(),
114 record.source_key.as_str(),
115 i64::from(record.enabled),
116 record.created_at_ms as i64,
117 record.updated_at_ms as i64,
118 Self::encode_json(&record)?,
119 ],
120 )
121 .map_err(process_sqlite_error)?;
122 Ok(record)
123 })()))
124 })
125 .await
126 .map_err(process_sqlite_error)?
127 }
128
129 async fn list_subscriptions(
130 &self,
131 filter: lash_core::TriggerSubscriptionFilter,
132 ) -> Result<Vec<lash_core::TriggerSubscriptionRecord>, lash_core::PluginError> {
133 self.conn
134 .call(move |conn| {
135 Ok((|| {
136 let mut sql =
137 "SELECT record_json FROM host_event_trigger_subscriptions WHERE 1 = 1"
138 .to_string();
139 let mut values = Vec::<rusqlite::types::Value>::new();
140 if let Some(handle) = filter.handle.as_ref() {
141 sql.push_str(" AND handle = ?");
142 values.push(handle.clone().into());
143 }
144 if let Some(name) = filter.name.as_ref() {
145 sql.push_str(" AND json_extract(record_json, '$.name') = ?");
146 values.push(name.clone().into());
147 }
148 if let Some(source_type) = filter.source_type.as_ref() {
149 sql.push_str(" AND source_type = ?");
150 values.push(source_type.clone().into());
151 }
152 if let Some(source_key) = filter.source_key.as_ref() {
153 sql.push_str(" AND source_key = ?");
154 values.push(source_key.clone().into());
155 }
156 if let Some(enabled) = filter.enabled {
157 sql.push_str(" AND enabled = ?");
158 values.push(i64::from(enabled).into());
159 }
160 sql.push_str(" ORDER BY registrant_scope_id ASC, handle ASC");
161 let mut stmt = conn.prepare(&sql).map_err(process_sqlite_error)?;
162 let rows = stmt
163 .query_map(rusqlite::params_from_iter(values.iter()), |row| {
164 row.get::<_, String>(0)
165 })
166 .map_err(process_sqlite_error)?;
167 let mut records = Vec::new();
168 for row in rows {
169 let record = Self::decode_subscription(row.map_err(process_sqlite_error)?)?;
170 if filter.matches(&record) {
171 records.push(record);
172 }
173 }
174 Ok(records)
175 })())
176 })
177 .await
178 .map_err(process_sqlite_error)?
179 }
180
181 async fn cancel_subscription(
182 &self,
183 session_id: &str,
184 handle: &str,
185 ) -> Result<bool, lash_core::PluginError> {
186 let session_id = session_id.to_string();
187 let handle = handle.to_string();
188 self.conn
189 .write_flow(move |tx| {
190 Ok(host_event_tx_outcome((|| {
191 let json = {
192 let mut stmt = tx
193 .prepare(
194 "SELECT record_json
195 FROM host_event_trigger_subscriptions
196 WHERE handle = ?1
197 ORDER BY registrant_scope_id ASC",
198 )
199 .map_err(process_sqlite_error)?;
200 let rows = stmt
201 .query_map(params![handle.as_str()], |row| row.get::<_, String>(0))
202 .map_err(process_sqlite_error)?;
203 let mut matched = None;
204 for row in rows {
205 let json = row.map_err(process_sqlite_error)?;
206 let record = Self::decode_subscription(json.clone())?;
207 if record.registrant_session_id() == Some(session_id.as_str()) {
208 matched = Some(json);
209 break;
210 }
211 }
212 matched
213 };
214 let Some(json) = json else {
215 return Ok(false);
216 };
217 let mut record = Self::decode_subscription(json)?;
218 let changed = record.enabled;
219 record.enabled = false;
220 record.updated_at_ms = current_epoch_ms();
221 tx.execute(
222 "UPDATE host_event_trigger_subscriptions
223 SET enabled = ?3, updated_at_ms = ?4, record_json = ?5
224 WHERE registrant_scope_id = ?1 AND handle = ?2",
225 params![
226 record.registrant_scope_id().as_str(),
227 handle.as_str(),
228 i64::from(record.enabled),
229 record.updated_at_ms as i64,
230 Self::encode_json(&record)?,
231 ],
232 )
233 .map_err(process_sqlite_error)?;
234 Ok(changed)
235 })()))
236 })
237 .await
238 .map_err(process_sqlite_error)?
239 }
240
241 async fn delete_session_subscriptions(
242 &self,
243 session_id: &str,
244 ) -> Result<usize, lash_core::PluginError> {
245 let session_id = session_id.to_string();
246 self.conn
247 .write_flow(move |tx| {
248 Ok(host_event_tx_outcome((|| {
249 let rows = {
250 let mut stmt = tx
251 .prepare(
252 "SELECT subscription_id, record_json
253 FROM host_event_trigger_subscriptions
254 ORDER BY registrant_scope_id ASC, handle ASC",
255 )
256 .map_err(process_sqlite_error)?;
257 let rows = stmt
258 .query_map([], |row| {
259 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
260 })
261 .map_err(process_sqlite_error)?;
262 let mut rows_out = Vec::new();
263 for row in rows {
264 rows_out.push(row.map_err(process_sqlite_error)?);
265 }
266 rows_out
267 };
268 let mut deleted = 0usize;
269 for (subscription_id, json) in rows {
270 let record = Self::decode_subscription(json)?;
271 if record.registrant_session_id() != Some(session_id.as_str()) {
272 continue;
273 }
274 tx.execute(
275 "DELETE FROM host_event_trigger_subscriptions WHERE subscription_id = ?1",
276 params![subscription_id.as_str()],
277 )
278 .map_err(process_sqlite_error)?;
279 deleted = deleted.saturating_add(1);
280 }
281 Ok(deleted)
282 })()))
283 })
284 .await
285 .map_err(process_sqlite_error)?
286 }
287
288 async fn record_occurrence(
289 &self,
290 request: lash_core::HostEventOccurrenceRequest,
291 ) -> Result<lash_core::HostEventOccurrenceRecord, lash_core::PluginError> {
292 lash_core::validate_host_event_occurrence_request(&request)?;
293 let request_hash = lash_core::host_event_occurrence_request_hash(&request)?;
294 let occurrence_id = lash_core::deterministic_occurrence_id(&request)?;
295 self.conn
296 .write_flow(move |tx| {
297 Ok(host_event_tx_outcome((|| {
298 let existing: Option<(String, String)> = tx
299 .query_row(
300 "SELECT request_hash, record_json
301 FROM host_event_occurrences
302 WHERE idempotency_key = ?1",
303 params![request.idempotency_key.as_str()],
304 |row| Ok((row.get(0)?, row.get(1)?)),
305 )
306 .optional()
307 .map_err(process_sqlite_error)?;
308 if let Some((existing_hash, existing_json)) = existing {
309 if existing_hash != request_hash {
310 return Err(lash_core::PluginError::Session(format!(
311 "host event occurrence idempotency conflict for `{}`",
312 request.idempotency_key
313 )));
314 }
315 return Self::decode_occurrence(existing_json);
316 }
317 let record = lash_core::HostEventOccurrenceRecord {
318 occurrence_id: occurrence_id.clone(),
319 source_type: request.source_type,
320 source_key: request.source_key,
321 payload: request.payload,
322 idempotency_key: request.idempotency_key,
323 source: request.source,
324 occurred_at_ms: current_epoch_ms(),
325 };
326 tx.execute(
327 "INSERT INTO host_event_occurrences (
328 occurrence_id, idempotency_key, request_hash, source_type,
329 source_key, occurred_at_ms, record_json
330 )
331 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
332 params![
333 record.occurrence_id.as_str(),
334 record.idempotency_key.as_str(),
335 request_hash.as_str(),
336 record.source_type.as_str(),
337 record.source_key.as_str(),
338 record.occurred_at_ms as i64,
339 Self::encode_json(&record)?,
340 ],
341 )
342 .map_err(process_sqlite_error)?;
343 Ok(record)
344 })()))
345 })
346 .await
347 .map_err(process_sqlite_error)?
348 }
349
350 async fn reserve_matching_deliveries(
351 &self,
352 occurrence_id: &str,
353 ) -> Result<Vec<lash_core::TriggerDeliveryReservation>, lash_core::PluginError> {
354 let occurrence_id = occurrence_id.to_string();
355 self.conn
356 .write_flow(move |tx| {
357 Ok(host_event_tx_outcome((|| {
358 let occurrence_json: Option<String> = tx
359 .query_row(
360 "SELECT record_json
361 FROM host_event_occurrences
362 WHERE occurrence_id = ?1",
363 params![occurrence_id.as_str()],
364 |row| row.get(0),
365 )
366 .optional()
367 .map_err(process_sqlite_error)?;
368 let Some(occurrence_json) = occurrence_json else {
369 return Err(lash_core::PluginError::Session(format!(
370 "unknown host event occurrence `{occurrence_id}`"
371 )));
372 };
373 let occurrence = Self::decode_occurrence(occurrence_json)?;
374 let subscriptions = {
375 let mut stmt = tx
376 .prepare(
377 "SELECT record_json
378 FROM host_event_trigger_subscriptions
379 WHERE enabled = 1 AND source_type = ?1 AND source_key = ?2
380 ORDER BY registrant_scope_id ASC, handle ASC",
381 )
382 .map_err(process_sqlite_error)?;
383 let rows = stmt
384 .query_map(
385 params![
386 occurrence.source_type.as_str(),
387 occurrence.source_key.as_str()
388 ],
389 |row| row.get::<_, String>(0),
390 )
391 .map_err(process_sqlite_error)?;
392 let mut subscriptions = Vec::new();
393 for row in rows {
394 subscriptions.push(Self::decode_subscription(
395 row.map_err(process_sqlite_error)?,
396 )?);
397 }
398 subscriptions
399 };
400 let mut reservations = Vec::new();
401 for subscription in subscriptions {
402 let process_id = lash_core::deterministic_delivery_process_id(
403 &occurrence.occurrence_id,
404 &subscription.subscription_id,
405 )?;
406 let inserted = tx
407 .execute(
408 "INSERT OR IGNORE INTO host_event_deliveries (
409 occurrence_id, subscription_id, process_id, created_at_ms
410 )
411 VALUES (?1, ?2, ?3, ?4)",
412 params![
413 occurrence.occurrence_id.as_str(),
414 subscription.subscription_id.as_str(),
415 process_id.as_str(),
416 current_epoch_ms() as i64,
417 ],
418 )
419 .map_err(process_sqlite_error)?;
420 if inserted == 0 {
421 continue;
422 }
423 reservations.push(lash_core::TriggerDeliveryReservation {
424 occurrence: occurrence.clone(),
425 subscription,
426 process_id,
427 });
428 }
429 Ok(reservations)
430 })()))
431 })
432 .await
433 .map_err(process_sqlite_error)?
434 }
435}