1use std::sync::Arc;
4
5use async_trait::async_trait;
6use uuid::Uuid;
7
8use khive_storage::error::StorageError;
9use khive_storage::event::{Event, EventFilter};
10use khive_storage::types::{BatchWriteSummary, Page, PageRequest};
11use khive_storage::EventStore;
12use khive_storage::StorageCapability;
13use khive_types::{EventOutcome, SubstrateKind};
14
15use crate::error::SqliteError;
16use crate::pool::ConnectionPool;
17
18fn map_err(e: rusqlite::Error, op: &'static str) -> StorageError {
19 StorageError::driver(StorageCapability::Event, op, e)
20}
21
22fn map_sqlite_err(e: SqliteError, op: &'static str) -> StorageError {
23 StorageError::driver(StorageCapability::Event, op, e)
24}
25
26pub struct SqlEventStore {
28 pool: Arc<ConnectionPool>,
29 is_file_backed: bool,
30 namespace: String,
31}
32
33impl SqlEventStore {
34 pub fn new_scoped(
36 pool: Arc<ConnectionPool>,
37 is_file_backed: bool,
38 namespace: impl Into<String>,
39 ) -> Self {
40 Self {
41 pool,
42 is_file_backed,
43 namespace: namespace.into(),
44 }
45 }
46
47 fn open_standalone_writer(&self) -> Result<rusqlite::Connection, StorageError> {
48 let config = self.pool.config();
49 let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
50 operation: "event_writer".into(),
51 message: "in-memory databases do not support standalone connections".into(),
52 })?;
53
54 let conn = rusqlite::Connection::open_with_flags(
55 path,
56 rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE
57 | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
58 | rusqlite::OpenFlags::SQLITE_OPEN_URI,
59 )
60 .map_err(|e| map_err(e, "open_event_writer"))?;
61
62 conn.busy_timeout(config.busy_timeout)
63 .map_err(|e| map_err(e, "open_event_writer"))?;
64 conn.pragma_update(None, "foreign_keys", "ON")
65 .map_err(|e| map_err(e, "open_event_writer"))?;
66 conn.pragma_update(None, "synchronous", "NORMAL")
67 .map_err(|e| map_err(e, "open_event_writer"))?;
68
69 Ok(conn)
70 }
71
72 fn open_standalone_reader(&self) -> Result<rusqlite::Connection, StorageError> {
73 let config = self.pool.config();
74 let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
75 operation: "event_reader".into(),
76 message: "in-memory databases do not support standalone connections".into(),
77 })?;
78
79 let conn = rusqlite::Connection::open_with_flags(
80 path,
81 rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
82 | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
83 | rusqlite::OpenFlags::SQLITE_OPEN_URI,
84 )
85 .map_err(|e| map_err(e, "open_event_reader"))?;
86
87 conn.busy_timeout(config.busy_timeout)
88 .map_err(|e| map_err(e, "open_event_reader"))?;
89 conn.pragma_update(None, "foreign_keys", "ON")
90 .map_err(|e| map_err(e, "open_event_reader"))?;
91 conn.pragma_update(None, "synchronous", "NORMAL")
92 .map_err(|e| map_err(e, "open_event_reader"))?;
93
94 Ok(conn)
95 }
96
97 async fn with_writer<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
98 where
99 F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
100 R: Send + 'static,
101 {
102 if self.is_file_backed {
103 let conn = self.open_standalone_writer()?;
104 tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
105 .await
106 .map_err(|e| StorageError::driver(StorageCapability::Event, op, e))?
107 } else {
108 let pool = Arc::clone(&self.pool);
109 tokio::task::spawn_blocking(move || {
110 let guard = pool.try_writer().map_err(|e| map_sqlite_err(e, op))?;
111 f(guard.conn()).map_err(|e| map_err(e, op))
112 })
113 .await
114 .map_err(|e| StorageError::driver(StorageCapability::Event, op, e))?
115 }
116 }
117
118 async fn with_reader<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
119 where
120 F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
121 R: Send + 'static,
122 {
123 if self.is_file_backed {
124 let conn = self.open_standalone_reader()?;
125 tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
126 .await
127 .map_err(|e| StorageError::driver(StorageCapability::Event, op, e))?
128 } else {
129 let pool = Arc::clone(&self.pool);
130 tokio::task::spawn_blocking(move || {
131 let guard = pool.reader().map_err(|e| map_sqlite_err(e, op))?;
132 f(guard.conn()).map_err(|e| map_err(e, op))
133 })
134 .await
135 .map_err(|e| StorageError::driver(StorageCapability::Event, op, e))?
136 }
137 }
138}
139
140fn substrate_from_str(s: &str) -> Result<SubstrateKind, rusqlite::Error> {
145 s.parse::<SubstrateKind>().map_err(|_| {
146 rusqlite::Error::FromSqlConversionFailure(
147 0,
148 rusqlite::types::Type::Text,
149 format!("unknown SubstrateKind: {s}").into(),
150 )
151 })
152}
153
154fn outcome_from_str(s: &str) -> Result<EventOutcome, rusqlite::Error> {
155 match s {
156 "success" => Ok(EventOutcome::Success),
157 "denied" => Ok(EventOutcome::Denied),
158 "error" => Ok(EventOutcome::Error),
159 other => Err(rusqlite::Error::FromSqlConversionFailure(
160 0,
161 rusqlite::types::Type::Text,
162 format!("unknown EventOutcome: {other}").into(),
163 )),
164 }
165}
166
167fn parse_uuid(s: &str) -> Result<Uuid, rusqlite::Error> {
168 Uuid::parse_str(s).map_err(|e| {
169 rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e))
170 })
171}
172
173fn read_event(row: &rusqlite::Row<'_>) -> Result<Event, rusqlite::Error> {
176 let id_str: String = row.get(0)?;
177 let namespace: String = row.get(1)?;
178 let verb: String = row.get(2)?;
179 let substrate_str: String = row.get(3)?;
180 let actor: String = row.get(4)?;
181 let outcome_str: String = row.get(5)?;
182 let data_str: Option<String> = row.get(6)?;
183 let duration_us: i64 = row.get(7)?;
184 let target_str: Option<String> = row.get(8)?;
185 let created_at: i64 = row.get(9)?;
186
187 let id = parse_uuid(&id_str)?;
188 let substrate = substrate_from_str(&substrate_str)?;
189 let outcome = outcome_from_str(&outcome_str)?;
190 let data = data_str
191 .as_deref()
192 .map(serde_json::from_str)
193 .transpose()
194 .map_err(|e| {
195 rusqlite::Error::FromSqlConversionFailure(6, rusqlite::types::Type::Text, Box::new(e))
196 })?;
197 let target_id = target_str.as_deref().map(parse_uuid).transpose()?;
198
199 Ok(Event {
200 id,
201 namespace,
202 verb,
203 substrate,
204 actor,
205 outcome,
206 data,
207 duration_us,
208 target_id,
209 created_at,
210 })
211}
212
213fn build_event_filter_sql(
214 default_namespace: &str,
215 filter: &EventFilter,
216) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
217 let mut conditions: Vec<String> = Vec::new();
218 let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
219
220 if filter.namespaces.is_empty() {
222 params.push(Box::new(default_namespace.to_string()));
223 conditions.push(format!("namespace = ?{}", params.len()));
224 } else if filter.namespaces.len() == 1 {
225 params.push(Box::new(filter.namespaces[0].clone()));
226 conditions.push(format!("namespace = ?{}", params.len()));
227 } else {
228 let placeholders: Vec<String> = filter
229 .namespaces
230 .iter()
231 .map(|ns| {
232 params.push(Box::new(ns.clone()));
233 format!("?{}", params.len())
234 })
235 .collect();
236 conditions.push(format!("namespace IN ({})", placeholders.join(",")));
237 }
238
239 if !filter.ids.is_empty() {
240 let placeholders: Vec<String> = filter
241 .ids
242 .iter()
243 .map(|id| {
244 params.push(Box::new(id.to_string()));
245 format!("?{}", params.len())
246 })
247 .collect();
248 conditions.push(format!("id IN ({})", placeholders.join(",")));
249 }
250
251 if !filter.verbs.is_empty() {
252 let placeholders: Vec<String> = filter
253 .verbs
254 .iter()
255 .map(|v| {
256 params.push(Box::new(v.clone()));
257 format!("?{}", params.len())
258 })
259 .collect();
260 conditions.push(format!("verb IN ({})", placeholders.join(",")));
261 }
262
263 if !filter.substrates.is_empty() {
264 let placeholders: Vec<String> = filter
265 .substrates
266 .iter()
267 .map(|s| {
268 params.push(Box::new(s.name().to_string()));
269 format!("?{}", params.len())
270 })
271 .collect();
272 conditions.push(format!("substrate IN ({})", placeholders.join(",")));
273 }
274
275 if !filter.actors.is_empty() {
276 let placeholders: Vec<String> = filter
277 .actors
278 .iter()
279 .map(|a| {
280 params.push(Box::new(a.clone()));
281 format!("?{}", params.len())
282 })
283 .collect();
284 conditions.push(format!("actor IN ({})", placeholders.join(",")));
285 }
286
287 if let Some(after) = filter.after {
288 params.push(Box::new(after));
289 conditions.push(format!("created_at > ?{}", params.len()));
290 }
291
292 if let Some(before) = filter.before {
293 params.push(Box::new(before));
294 conditions.push(format!("created_at < ?{}", params.len()));
295 }
296
297 let clause = format!(" WHERE {}", conditions.join(" AND "));
298 (clause, params)
299}
300
301#[async_trait]
306impl EventStore for SqlEventStore {
307 async fn append_event(&self, event: Event) -> Result<(), StorageError> {
308 let id_str = event.id.to_string();
309 let substrate_str = event.substrate.name().to_string();
310 let outcome_str = event.outcome.name().to_string();
311 let data_str = event.data.as_ref().map(|v| v.to_string());
312 let target_str = event.target_id.map(|u| u.to_string());
313 let ns = event.namespace.clone();
314 let verb = event.verb.clone();
315 let actor = event.actor.clone();
316 let duration_us = event.duration_us;
317 let created_at = event.created_at;
318
319 self.with_writer("append_event", move |conn| {
320 conn.execute(
321 "INSERT INTO events \
322 (id, namespace, verb, substrate, actor, outcome, data, duration_us, target_id, created_at) \
323 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
324 rusqlite::params![
325 id_str,
326 ns,
327 verb,
328 substrate_str,
329 actor,
330 outcome_str,
331 data_str,
332 duration_us,
333 target_str,
334 created_at,
335 ],
336 )?;
337 Ok(())
338 })
339 .await
340 }
341
342 async fn append_events(&self, events: Vec<Event>) -> Result<BatchWriteSummary, StorageError> {
343 let attempted = events.len() as u64;
344
345 self.with_writer("append_events", move |conn| {
346 conn.execute_batch("BEGIN IMMEDIATE")?;
347 let mut affected = 0u64;
348 let mut failed = 0u64;
349 let mut first_error = String::new();
350
351 for event in &events {
352 let id_str = event.id.to_string();
353 let substrate_str = event.substrate.name().to_string();
354 let outcome_str = event.outcome.name().to_string();
355 let data_str = event.data.as_ref().map(|v| v.to_string());
356 let target_str = event.target_id.map(|u| u.to_string());
357
358 match conn.execute(
359 "INSERT INTO events \
360 (id, namespace, verb, substrate, actor, outcome, data, duration_us, target_id, created_at) \
361 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
362 rusqlite::params![
363 id_str,
364 &event.namespace,
365 &event.verb,
366 substrate_str,
367 &event.actor,
368 outcome_str,
369 data_str,
370 event.duration_us,
371 target_str,
372 event.created_at,
373 ],
374 ) {
375 Ok(_) => affected += 1,
376 Err(e) => {
377 if first_error.is_empty() {
378 first_error = e.to_string();
379 }
380 failed += 1;
381 }
382 }
383 }
384
385 if let Err(e) = conn.execute_batch("COMMIT") {
386 let _ = conn.execute_batch("ROLLBACK");
387 return Err(e);
388 }
389 Ok(BatchWriteSummary {
390 attempted,
391 affected,
392 failed,
393 first_error,
394 })
395 })
396 .await
397 }
398
399 async fn get_event(&self, id: Uuid) -> Result<Option<Event>, StorageError> {
400 let namespace = self.namespace.clone();
401 let id_str = id.to_string();
402
403 self.with_reader("get_event", move |conn| {
404 let mut stmt = conn.prepare(
405 "SELECT id, namespace, verb, substrate, actor, outcome, data, duration_us, target_id, created_at \
406 FROM events WHERE namespace = ?1 AND id = ?2",
407 )?;
408 let mut rows = stmt.query(rusqlite::params![namespace, id_str])?;
409 match rows.next()? {
410 Some(row) => Ok(Some(read_event(row)?)),
411 None => Ok(None),
412 }
413 })
414 .await
415 }
416
417 async fn query_events(
418 &self,
419 filter: EventFilter,
420 page: PageRequest,
421 ) -> Result<Page<Event>, StorageError> {
422 let namespace = self.namespace.clone();
423
424 self.with_reader("query_events", move |conn| {
425 let (where_clause, filter_params) = build_event_filter_sql(&namespace, &filter);
426
427 let count_sql = format!("SELECT COUNT(*) FROM events{}", where_clause);
428 let total: i64 = {
429 let mut stmt = conn.prepare(&count_sql)?;
430 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
431 filter_params.iter().map(|p| p.as_ref()).collect();
432 stmt.query_row(param_refs.as_slice(), |row| row.get(0))?
433 };
434
435 let (_, data_filter_params) = build_event_filter_sql(&namespace, &filter);
436 let mut all_params: Vec<Box<dyn rusqlite::types::ToSql>> = data_filter_params;
437 all_params.push(Box::new(page.limit as i64));
438 all_params.push(Box::new(page.offset as i64));
439
440 let limit_idx = all_params.len() - 1;
441 let offset_idx = all_params.len();
442
443 let data_sql = format!(
444 "SELECT id, namespace, verb, substrate, actor, outcome, data, duration_us, target_id, created_at \
445 FROM events{} ORDER BY created_at DESC LIMIT ?{} OFFSET ?{}",
446 where_clause, limit_idx, offset_idx,
447 );
448
449 let mut stmt = conn.prepare(&data_sql)?;
450 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
451 all_params.iter().map(|p| p.as_ref()).collect();
452 let rows = stmt.query_map(param_refs.as_slice(), read_event)?;
453
454 let mut items = Vec::new();
455 for row in rows {
456 items.push(row?);
457 }
458
459 Ok(Page {
460 items,
461 total: Some(total as u64),
462 })
463 })
464 .await
465 }
466
467 async fn count_events(&self, filter: EventFilter) -> Result<u64, StorageError> {
468 let namespace = self.namespace.clone();
469
470 self.with_reader("count_events", move |conn| {
471 let (where_clause, params) = build_event_filter_sql(&namespace, &filter);
472 let sql = format!("SELECT COUNT(*) FROM events{}", where_clause);
473 let mut stmt = conn.prepare(&sql)?;
474 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
475 params.iter().map(|p| p.as_ref()).collect();
476 let count: i64 = stmt.query_row(param_refs.as_slice(), |row| row.get(0))?;
477 Ok(count as u64)
478 })
479 .await
480 }
481}
482
483const EVENTS_DDL: &str = "\
488 CREATE TABLE IF NOT EXISTS events (\
489 id TEXT PRIMARY KEY,\
490 namespace TEXT NOT NULL,\
491 verb TEXT NOT NULL,\
492 substrate TEXT NOT NULL,\
493 actor TEXT NOT NULL,\
494 outcome TEXT NOT NULL,\
495 data TEXT,\
496 duration_us INTEGER NOT NULL DEFAULT 0,\
497 target_id TEXT,\
498 created_at INTEGER NOT NULL\
499 );\
500 CREATE INDEX IF NOT EXISTS idx_events_namespace ON events(namespace);\
501 CREATE INDEX IF NOT EXISTS idx_events_verb ON events(verb);\
502 CREATE INDEX IF NOT EXISTS idx_events_substrate ON events(substrate);\
503 CREATE INDEX IF NOT EXISTS idx_events_created ON events(created_at DESC);\
504";
505
506pub(crate) fn ensure_events_schema(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
507 conn.execute_batch(EVENTS_DDL)
508}
509
510#[cfg(test)]
511mod tests {
512 use super::*;
513 use crate::pool::PoolConfig;
514
515 fn setup_memory_store() -> SqlEventStore {
516 let config = PoolConfig {
517 path: None,
518 ..PoolConfig::default()
519 };
520 let pool = Arc::new(ConnectionPool::new(config).unwrap());
521
522 {
523 let writer = pool.writer().unwrap();
524 writer.conn().execute_batch(EVENTS_DDL).unwrap();
525 }
526
527 SqlEventStore::new_scoped(pool, false, "default")
528 }
529
530 fn make_event(namespace: &str) -> Event {
531 Event::new(namespace, "search", SubstrateKind::Note, "agent:test")
532 }
533
534 #[tokio::test]
535 async fn test_append_and_get_event() {
536 let store = setup_memory_store();
537
538 let event = make_event("default");
539 let id = event.id;
540
541 store.append_event(event).await.unwrap();
542
543 let fetched = store.get_event(id).await.unwrap();
544 assert!(fetched.is_some());
545 let fetched = fetched.unwrap();
546 assert_eq!(fetched.id, id);
547 assert_eq!(fetched.verb, "search");
548 assert_eq!(fetched.substrate, SubstrateKind::Note);
549 assert_eq!(fetched.actor, "agent:test");
550 assert_eq!(fetched.outcome, EventOutcome::Success);
551 }
552
553 #[tokio::test]
554 async fn test_append_events_batch() {
555 let store = setup_memory_store();
556
557 let events: Vec<Event> = (0..3).map(|_| make_event("default")).collect();
558 let summary = store.append_events(events).await.unwrap();
559 assert_eq!(summary.attempted, 3);
560 assert_eq!(summary.affected, 3);
561 assert_eq!(summary.failed, 0);
562 }
563
564 #[tokio::test]
565 async fn test_count_events() {
566 let store = setup_memory_store();
567
568 for _ in 0..3 {
569 store.append_event(make_event("default")).await.unwrap();
570 }
571
572 let count = store.count_events(EventFilter::default()).await.unwrap();
573 assert_eq!(count, 3);
574 }
575
576 #[tokio::test]
577 async fn test_query_events_filter_by_verb() {
578 let store = setup_memory_store();
579
580 store.append_event(make_event("default")).await.unwrap();
581
582 let mut create_event = make_event("default");
583 create_event.verb = "create".to_string();
584 store.append_event(create_event).await.unwrap();
585
586 let filter = EventFilter {
587 verbs: vec!["search".to_string()],
588 ..EventFilter::default()
589 };
590 let page = store
591 .query_events(
592 filter,
593 PageRequest {
594 limit: 10,
595 offset: 0,
596 },
597 )
598 .await
599 .unwrap();
600 assert_eq!(page.items.len(), 1);
601 assert_eq!(page.items[0].verb, "search");
602 }
603
604 #[tokio::test]
605 async fn test_query_events_filter_by_substrate() {
606 let store = setup_memory_store();
607
608 store.append_event(make_event("default")).await.unwrap();
609
610 let mut entity_event = make_event("default");
611 entity_event.substrate = SubstrateKind::Entity;
612 store.append_event(entity_event).await.unwrap();
613
614 let filter = EventFilter {
615 substrates: vec![SubstrateKind::Entity],
616 ..EventFilter::default()
617 };
618 let page = store
619 .query_events(
620 filter,
621 PageRequest {
622 limit: 10,
623 offset: 0,
624 },
625 )
626 .await
627 .unwrap();
628 assert_eq!(page.items.len(), 1);
629 assert_eq!(page.items[0].substrate, SubstrateKind::Entity);
630 }
631
632 #[tokio::test]
633 async fn test_outcome_roundtrip() {
634 let store = setup_memory_store();
635
636 let mut denied = make_event("default");
637 denied.outcome = EventOutcome::Denied;
638 let denied_id = denied.id;
639 store.append_event(denied).await.unwrap();
640
641 let fetched = store.get_event(denied_id).await.unwrap().unwrap();
642 assert_eq!(fetched.outcome, EventOutcome::Denied);
643 }
644}