1use super::*;
8
9pub struct SqliteHostEventStore {
10 conn: SqliteConnection,
11}
12
13impl SqliteHostEventStore {
14 pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
15 let conn = SqliteConnection::open(path).await?;
16 ensure_host_event_schema(&conn).await?;
17 apply_pragmas(&conn, StoreBacking::File).await?;
18 Ok(Self { conn })
19 }
20
21 pub async fn memory() -> tokio_rusqlite::Result<Self> {
22 let conn = SqliteConnection::open_in_memory().await?;
23 ensure_host_event_schema(&conn).await?;
24 apply_pragmas(&conn, StoreBacking::Memory).await?;
25 Ok(Self { conn })
26 }
27
28 fn encode_json<T: serde::Serialize>(value: &T) -> Result<String, lash_core::PluginError> {
29 serde_json::to_string(value).map_err(|err| {
30 lash_core::PluginError::Session(format!("failed to encode host event row: {err}"))
31 })
32 }
33
34 fn decode_subscription(
35 json: String,
36 ) -> Result<lash_core::TriggerSubscriptionRecord, lash_core::PluginError> {
37 serde_json::from_str(&json).map_err(|err| {
38 lash_core::PluginError::Session(format!(
39 "failed to decode host event subscription row: {err}"
40 ))
41 })
42 }
43
44 fn decode_occurrence(
45 json: String,
46 ) -> Result<lash_core::HostEventOccurrenceRecord, lash_core::PluginError> {
47 serde_json::from_str(&json).map_err(|err| {
48 lash_core::PluginError::Session(format!(
49 "failed to decode host event occurrence row: {err}"
50 ))
51 })
52 }
53}
54
55fn host_event_tx_outcome<T>(
56 result: Result<T, lash_core::PluginError>,
57) -> TxOutcome<Result<T, lash_core::PluginError>> {
58 match result {
59 Ok(value) => TxOutcome::Commit(Ok(value)),
60 Err(err) => TxOutcome::Rollback(Err(err)),
61 }
62}
63
64#[async_trait::async_trait]
65impl lash_core::HostEventStore for SqliteHostEventStore {
66 fn durability_tier(&self) -> DurabilityTier {
67 DurabilityTier::Durable
68 }
69
70 async fn register_subscription(
71 &self,
72 draft: lash_core::TriggerSubscriptionDraft,
73 ) -> Result<lash_core::TriggerSubscriptionRecord, lash_core::PluginError> {
74 self.conn
75 .write_flow(move |tx| {
76 Ok(host_event_tx_outcome((|| {
77 tx.execute("INSERT INTO host_event_subscription_seq DEFAULT VALUES", [])
78 .map_err(process_sqlite_error)?;
79 let seq = tx.last_insert_rowid();
80 let handle = format!("trigger:{seq}");
81 let subscription_id = format!("subscription:{seq}");
82 let now = current_epoch_ms();
83 let record = lash_core::TriggerSubscriptionRecord {
84 subscription_id: subscription_id.clone(),
85 session_id: draft.session_id,
86 handle,
87 name: draft.name,
88 source_type: draft.source_type,
89 source_key: draft.source_key,
90 source: draft.source,
91 event_ty: draft.event_ty,
92 module_ref: draft.module_ref,
93 required_surface_ref: draft.required_surface_ref,
94 process_ref: draft.process_ref,
95 process_name: draft.process_name,
96 input_template: draft.input_template,
97 enabled: true,
98 created_at_ms: now,
99 updated_at_ms: now,
100 };
101 tx.execute(
102 "INSERT INTO host_event_trigger_subscriptions (
103 subscription_id, session_id, handle, source_type, source_key,
104 enabled, created_at_ms, updated_at_ms, record_json
105 )
106 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
107 params![
108 record.subscription_id.as_str(),
109 record.session_id.as_str(),
110 record.handle.as_str(),
111 record.source_type.as_str(),
112 record.source_key.as_str(),
113 i64::from(record.enabled),
114 record.created_at_ms as i64,
115 record.updated_at_ms as i64,
116 Self::encode_json(&record)?,
117 ],
118 )
119 .map_err(process_sqlite_error)?;
120 Ok(record)
121 })()))
122 })
123 .await
124 .map_err(process_sqlite_error)?
125 }
126
127 async fn list_subscriptions(
128 &self,
129 filter: lash_core::TriggerSubscriptionFilter,
130 ) -> Result<Vec<lash_core::TriggerSubscriptionRecord>, lash_core::PluginError> {
131 self.conn
132 .call(move |conn| {
133 Ok((|| {
134 let mut sql =
135 "SELECT record_json FROM host_event_trigger_subscriptions WHERE 1 = 1"
136 .to_string();
137 let mut values = Vec::<rusqlite::types::Value>::new();
138 if let Some(session_id) = filter.session_id.as_ref() {
139 sql.push_str(" AND session_id = ?");
140 values.push(session_id.clone().into());
141 }
142 if let Some(handle) = filter.handle.as_ref() {
143 sql.push_str(" AND handle = ?");
144 values.push(handle.clone().into());
145 }
146 if let Some(name) = filter.name.as_ref() {
147 sql.push_str(" AND json_extract(record_json, '$.name') = ?");
148 values.push(name.clone().into());
149 }
150 if let Some(source_type) = filter.source_type.as_ref() {
151 sql.push_str(" AND source_type = ?");
152 values.push(source_type.clone().into());
153 }
154 if let Some(source_key) = filter.source_key.as_ref() {
155 sql.push_str(" AND source_key = ?");
156 values.push(source_key.clone().into());
157 }
158 if let Some(enabled) = filter.enabled {
159 sql.push_str(" AND enabled = ?");
160 values.push(i64::from(enabled).into());
161 }
162 sql.push_str(" ORDER BY session_id ASC, handle ASC");
163 let mut stmt = conn.prepare(&sql).map_err(process_sqlite_error)?;
164 let rows = stmt
165 .query_map(rusqlite::params_from_iter(values.iter()), |row| {
166 row.get::<_, String>(0)
167 })
168 .map_err(process_sqlite_error)?;
169 let mut records = Vec::new();
170 for row in rows {
171 let record = Self::decode_subscription(row.map_err(process_sqlite_error)?)?;
172 if filter.matches(&record) {
173 records.push(record);
174 }
175 }
176 Ok(records)
177 })())
178 })
179 .await
180 .map_err(process_sqlite_error)?
181 }
182
183 async fn cancel_subscription(
184 &self,
185 session_id: &str,
186 handle: &str,
187 ) -> Result<bool, lash_core::PluginError> {
188 let session_id = session_id.to_string();
189 let handle = handle.to_string();
190 self.conn
191 .write_flow(move |tx| {
192 Ok(host_event_tx_outcome((|| {
193 let json: Option<String> = tx
194 .query_row(
195 "SELECT record_json
196 FROM host_event_trigger_subscriptions
197 WHERE session_id = ?1 AND handle = ?2",
198 params![session_id.as_str(), handle.as_str()],
199 |row| row.get(0),
200 )
201 .optional()
202 .map_err(process_sqlite_error)?;
203 let Some(json) = json else {
204 return Ok(false);
205 };
206 let mut record = Self::decode_subscription(json)?;
207 let changed = record.enabled;
208 record.enabled = false;
209 record.updated_at_ms = current_epoch_ms();
210 tx.execute(
211 "UPDATE host_event_trigger_subscriptions
212 SET enabled = ?3, updated_at_ms = ?4, record_json = ?5
213 WHERE session_id = ?1 AND handle = ?2",
214 params![
215 session_id.as_str(),
216 handle.as_str(),
217 i64::from(record.enabled),
218 record.updated_at_ms as i64,
219 Self::encode_json(&record)?,
220 ],
221 )
222 .map_err(process_sqlite_error)?;
223 Ok(changed)
224 })()))
225 })
226 .await
227 .map_err(process_sqlite_error)?
228 }
229
230 async fn record_occurrence(
231 &self,
232 request: lash_core::HostEventOccurrenceRequest,
233 ) -> Result<lash_core::HostEventOccurrenceRecord, lash_core::PluginError> {
234 lash_core::validate_host_event_occurrence_request(&request)?;
235 let request_hash = lash_core::host_event_occurrence_request_hash(&request)?;
236 let occurrence_id = lash_core::deterministic_occurrence_id(&request)?;
237 self.conn
238 .write_flow(move |tx| {
239 Ok(host_event_tx_outcome((|| {
240 let existing: Option<(String, String)> = tx
241 .query_row(
242 "SELECT request_hash, record_json
243 FROM host_event_occurrences
244 WHERE idempotency_key = ?1",
245 params![request.idempotency_key.as_str()],
246 |row| Ok((row.get(0)?, row.get(1)?)),
247 )
248 .optional()
249 .map_err(process_sqlite_error)?;
250 if let Some((existing_hash, existing_json)) = existing {
251 if existing_hash != request_hash {
252 return Err(lash_core::PluginError::Session(format!(
253 "host event occurrence idempotency conflict for `{}`",
254 request.idempotency_key
255 )));
256 }
257 return Self::decode_occurrence(existing_json);
258 }
259 let record = lash_core::HostEventOccurrenceRecord {
260 occurrence_id: occurrence_id.clone(),
261 source_type: request.source_type,
262 source_key: request.source_key,
263 payload: request.payload,
264 idempotency_key: request.idempotency_key,
265 source: request.source,
266 occurred_at_ms: current_epoch_ms(),
267 };
268 tx.execute(
269 "INSERT INTO host_event_occurrences (
270 occurrence_id, idempotency_key, request_hash, source_type,
271 source_key, occurred_at_ms, record_json
272 )
273 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
274 params![
275 record.occurrence_id.as_str(),
276 record.idempotency_key.as_str(),
277 request_hash.as_str(),
278 record.source_type.as_str(),
279 record.source_key.as_str(),
280 record.occurred_at_ms as i64,
281 Self::encode_json(&record)?,
282 ],
283 )
284 .map_err(process_sqlite_error)?;
285 Ok(record)
286 })()))
287 })
288 .await
289 .map_err(process_sqlite_error)?
290 }
291
292 async fn reserve_matching_deliveries(
293 &self,
294 occurrence_id: &str,
295 ) -> Result<Vec<lash_core::TriggerDeliveryReservation>, lash_core::PluginError> {
296 let occurrence_id = occurrence_id.to_string();
297 self.conn
298 .write_flow(move |tx| {
299 Ok(host_event_tx_outcome((|| {
300 let occurrence_json: Option<String> = tx
301 .query_row(
302 "SELECT record_json
303 FROM host_event_occurrences
304 WHERE occurrence_id = ?1",
305 params![occurrence_id.as_str()],
306 |row| row.get(0),
307 )
308 .optional()
309 .map_err(process_sqlite_error)?;
310 let Some(occurrence_json) = occurrence_json else {
311 return Err(lash_core::PluginError::Session(format!(
312 "unknown host event occurrence `{occurrence_id}`"
313 )));
314 };
315 let occurrence = Self::decode_occurrence(occurrence_json)?;
316 let subscriptions = {
317 let mut stmt = tx
318 .prepare(
319 "SELECT record_json
320 FROM host_event_trigger_subscriptions
321 WHERE enabled = 1 AND source_type = ?1 AND source_key = ?2
322 ORDER BY session_id ASC, handle ASC",
323 )
324 .map_err(process_sqlite_error)?;
325 let rows = stmt
326 .query_map(
327 params![
328 occurrence.source_type.as_str(),
329 occurrence.source_key.as_str()
330 ],
331 |row| row.get::<_, String>(0),
332 )
333 .map_err(process_sqlite_error)?;
334 let mut subscriptions = Vec::new();
335 for row in rows {
336 subscriptions.push(Self::decode_subscription(
337 row.map_err(process_sqlite_error)?,
338 )?);
339 }
340 subscriptions
341 };
342 let mut reservations = Vec::new();
343 for subscription in subscriptions {
344 let process_id = lash_core::deterministic_delivery_process_id(
345 &occurrence.occurrence_id,
346 &subscription.subscription_id,
347 )?;
348 let inserted = tx
349 .execute(
350 "INSERT OR IGNORE INTO host_event_deliveries (
351 occurrence_id, subscription_id, process_id, created_at_ms
352 )
353 VALUES (?1, ?2, ?3, ?4)",
354 params![
355 occurrence.occurrence_id.as_str(),
356 subscription.subscription_id.as_str(),
357 process_id.as_str(),
358 current_epoch_ms() as i64,
359 ],
360 )
361 .map_err(process_sqlite_error)?;
362 if inserted == 0 {
363 continue;
364 }
365 reservations.push(lash_core::TriggerDeliveryReservation {
366 occurrence: occurrence.clone(),
367 subscription,
368 process_id,
369 });
370 }
371 Ok(reservations)
372 })()))
373 })
374 .await
375 .map_err(process_sqlite_error)?
376 }
377}