1use std::sync::Arc;
4
5use async_trait::async_trait;
6use uuid::Uuid;
7
8use khive_storage::error::StorageError;
9use khive_storage::event::{Event, EventFilter};
10use khive_storage::types::{BatchWriteSummary, Page, PageRequest};
11use khive_storage::EventStore;
12use khive_storage::StorageCapability;
13use khive_types::{EventOutcome, SubstrateKind};
14
15use crate::error::SqliteError;
16use crate::pool::ConnectionPool;
17
18fn map_err(e: rusqlite::Error, op: &'static str) -> StorageError {
19 StorageError::driver(StorageCapability::Event, op, e)
20}
21
22fn map_sqlite_err(e: SqliteError, op: &'static str) -> StorageError {
23 StorageError::driver(StorageCapability::Event, op, e)
24}
25
26pub struct SqlEventStore {
28 pool: Arc<ConnectionPool>,
29 is_file_backed: bool,
30 namespace: String,
31}
32
33impl SqlEventStore {
34 pub fn new_scoped(
36 pool: Arc<ConnectionPool>,
37 is_file_backed: bool,
38 namespace: impl Into<String>,
39 ) -> Self {
40 Self {
41 pool,
42 is_file_backed,
43 namespace: namespace.into(),
44 }
45 }
46
47 fn open_standalone_writer(&self) -> Result<rusqlite::Connection, StorageError> {
48 let config = self.pool.config();
49 let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
50 operation: "event_writer".into(),
51 message: "in-memory databases do not support standalone connections".into(),
52 })?;
53
54 let conn = rusqlite::Connection::open_with_flags(
55 path,
56 rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE
57 | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
58 | rusqlite::OpenFlags::SQLITE_OPEN_URI,
59 )
60 .map_err(|e| map_err(e, "open_event_writer"))?;
61
62 conn.busy_timeout(config.busy_timeout)
63 .map_err(|e| map_err(e, "open_event_writer"))?;
64 conn.pragma_update(None, "foreign_keys", "ON")
65 .map_err(|e| map_err(e, "open_event_writer"))?;
66 conn.pragma_update(None, "synchronous", "NORMAL")
67 .map_err(|e| map_err(e, "open_event_writer"))?;
68
69 Ok(conn)
70 }
71
72 fn open_standalone_reader(&self) -> Result<rusqlite::Connection, StorageError> {
73 let config = self.pool.config();
74 let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
75 operation: "event_reader".into(),
76 message: "in-memory databases do not support standalone connections".into(),
77 })?;
78
79 let conn = rusqlite::Connection::open_with_flags(
80 path,
81 rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
82 | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
83 | rusqlite::OpenFlags::SQLITE_OPEN_URI,
84 )
85 .map_err(|e| map_err(e, "open_event_reader"))?;
86
87 conn.busy_timeout(config.busy_timeout)
88 .map_err(|e| map_err(e, "open_event_reader"))?;
89 conn.pragma_update(None, "foreign_keys", "ON")
90 .map_err(|e| map_err(e, "open_event_reader"))?;
91 conn.pragma_update(None, "synchronous", "NORMAL")
92 .map_err(|e| map_err(e, "open_event_reader"))?;
93
94 Ok(conn)
95 }
96
97 async fn with_writer<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
98 where
99 F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
100 R: Send + 'static,
101 {
102 if self.is_file_backed {
103 let conn = self.open_standalone_writer()?;
104 tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
105 .await
106 .map_err(|e| StorageError::driver(StorageCapability::Event, op, e))?
107 } else {
108 let pool = Arc::clone(&self.pool);
109 tokio::task::spawn_blocking(move || {
110 let guard = pool.try_writer().map_err(|e| map_sqlite_err(e, op))?;
111 f(guard.conn()).map_err(|e| map_err(e, op))
112 })
113 .await
114 .map_err(|e| StorageError::driver(StorageCapability::Event, op, e))?
115 }
116 }
117
118 async fn with_reader<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
119 where
120 F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
121 R: Send + 'static,
122 {
123 if self.is_file_backed {
124 let conn = self.open_standalone_reader()?;
125 tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
126 .await
127 .map_err(|e| StorageError::driver(StorageCapability::Event, op, e))?
128 } else {
129 let pool = Arc::clone(&self.pool);
130 tokio::task::spawn_blocking(move || {
131 let guard = pool.reader().map_err(|e| map_sqlite_err(e, op))?;
132 f(guard.conn()).map_err(|e| map_err(e, op))
133 })
134 .await
135 .map_err(|e| StorageError::driver(StorageCapability::Event, op, e))?
136 }
137 }
138}
139
140fn substrate_from_str(s: &str) -> Result<SubstrateKind, rusqlite::Error> {
145 s.parse::<SubstrateKind>().map_err(|_| {
146 rusqlite::Error::FromSqlConversionFailure(
147 0,
148 rusqlite::types::Type::Text,
149 format!("unknown SubstrateKind: {s}").into(),
150 )
151 })
152}
153
154fn outcome_from_str(s: &str) -> Result<EventOutcome, rusqlite::Error> {
155 match s {
156 "success" => Ok(EventOutcome::Success),
157 "denied" => Ok(EventOutcome::Denied),
158 "error" => Ok(EventOutcome::Error),
159 other => Err(rusqlite::Error::FromSqlConversionFailure(
160 0,
161 rusqlite::types::Type::Text,
162 format!("unknown EventOutcome: {other}").into(),
163 )),
164 }
165}
166
167fn parse_uuid(s: &str) -> Result<Uuid, rusqlite::Error> {
168 Uuid::parse_str(s).map_err(|e| {
169 rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e))
170 })
171}
172
173fn read_event(row: &rusqlite::Row<'_>) -> Result<Event, rusqlite::Error> {
176 let id_str: String = row.get(0)?;
177 let namespace: String = row.get(1)?;
178 let verb: String = row.get(2)?;
179 let substrate_str: String = row.get(3)?;
180 let actor: String = row.get(4)?;
181 let outcome_str: String = row.get(5)?;
182 let data_str: Option<String> = row.get(6)?;
183 let duration_us: i64 = row.get(7)?;
184 let target_str: Option<String> = row.get(8)?;
185 let created_at: i64 = row.get(9)?;
186
187 let id = parse_uuid(&id_str)?;
188 let substrate = substrate_from_str(&substrate_str)?;
189 let outcome = outcome_from_str(&outcome_str)?;
190 let data = data_str
191 .as_deref()
192 .map(serde_json::from_str)
193 .transpose()
194 .map_err(|e| {
195 rusqlite::Error::FromSqlConversionFailure(6, rusqlite::types::Type::Text, Box::new(e))
196 })?;
197 let target_id = target_str.as_deref().map(parse_uuid).transpose()?;
198
199 Ok(Event {
200 id,
201 namespace,
202 verb,
203 substrate,
204 actor,
205 outcome,
206 data,
207 duration_us,
208 target_id,
209 created_at,
210 })
211}
212
213fn build_event_filter_sql(
214 default_namespace: &str,
215 filter: &EventFilter,
216) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
217 let mut conditions: Vec<String> = Vec::new();
218 let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
219
220 if filter.namespaces.is_empty() {
222 params.push(Box::new(default_namespace.to_string()));
223 conditions.push(format!("namespace = ?{}", params.len()));
224 } else if filter.namespaces.len() == 1 {
225 params.push(Box::new(filter.namespaces[0].clone()));
226 conditions.push(format!("namespace = ?{}", params.len()));
227 } else {
228 let placeholders: Vec<String> = filter
229 .namespaces
230 .iter()
231 .map(|ns| {
232 params.push(Box::new(ns.clone()));
233 format!("?{}", params.len())
234 })
235 .collect();
236 conditions.push(format!("namespace IN ({})", placeholders.join(",")));
237 }
238
239 if !filter.ids.is_empty() {
240 let placeholders: Vec<String> = filter
241 .ids
242 .iter()
243 .map(|id| {
244 params.push(Box::new(id.to_string()));
245 format!("?{}", params.len())
246 })
247 .collect();
248 conditions.push(format!("id IN ({})", placeholders.join(",")));
249 }
250
251 if !filter.verbs.is_empty() {
252 let placeholders: Vec<String> = filter
253 .verbs
254 .iter()
255 .map(|v| {
256 params.push(Box::new(v.clone()));
257 format!("?{}", params.len())
258 })
259 .collect();
260 conditions.push(format!("verb IN ({})", placeholders.join(",")));
261 }
262
263 if !filter.substrates.is_empty() {
264 let placeholders: Vec<String> = filter
265 .substrates
266 .iter()
267 .map(|s| {
268 params.push(Box::new(s.name().to_string()));
269 format!("?{}", params.len())
270 })
271 .collect();
272 conditions.push(format!("substrate IN ({})", placeholders.join(",")));
273 }
274
275 if !filter.actors.is_empty() {
276 let placeholders: Vec<String> = filter
277 .actors
278 .iter()
279 .map(|a| {
280 params.push(Box::new(a.clone()));
281 format!("?{}", params.len())
282 })
283 .collect();
284 conditions.push(format!("actor IN ({})", placeholders.join(",")));
285 }
286
287 if let Some(after) = filter.after {
288 params.push(Box::new(after));
289 conditions.push(format!("created_at > ?{}", params.len()));
290 }
291
292 if let Some(before) = filter.before {
293 params.push(Box::new(before));
294 conditions.push(format!("created_at < ?{}", params.len()));
295 }
296
297 let clause = format!(" WHERE {}", conditions.join(" AND "));
298 (clause, params)
299}
300
301#[async_trait]
306impl EventStore for SqlEventStore {
307 async fn append_event(&self, event: Event) -> Result<(), StorageError> {
308 let id_str = event.id.to_string();
309 let substrate_str = event.substrate.name().to_string();
310 let outcome_str = event.outcome.name().to_string();
311 let data_str = event.data.as_ref().map(|v| v.to_string());
312 let target_str = event.target_id.map(|u| u.to_string());
313 let ns = event.namespace.clone();
314 let verb = event.verb.clone();
315 let actor = event.actor.clone();
316 let duration_us = event.duration_us;
317 let created_at = event.created_at;
318
319 self.with_writer("append_event", move |conn| {
320 conn.execute(
321 "INSERT INTO events \
322 (id, namespace, verb, substrate, actor, outcome, data, duration_us, target_id, created_at) \
323 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
324 rusqlite::params![
325 id_str,
326 ns,
327 verb,
328 substrate_str,
329 actor,
330 outcome_str,
331 data_str,
332 duration_us,
333 target_str,
334 created_at,
335 ],
336 )?;
337 Ok(())
338 })
339 .await
340 }
341
342 async fn append_events(&self, events: Vec<Event>) -> Result<BatchWriteSummary, StorageError> {
343 let attempted = events.len() as u64;
344
345 self.with_writer("append_events", move |conn| {
346 conn.execute_batch("BEGIN IMMEDIATE")?;
347 let mut affected = 0u64;
348 let mut failed = 0u64;
349 let mut first_error = String::new();
350
351 for event in &events {
352 let id_str = event.id.to_string();
353 let substrate_str = event.substrate.name().to_string();
354 let outcome_str = event.outcome.name().to_string();
355 let data_str = event.data.as_ref().map(|v| v.to_string());
356 let target_str = event.target_id.map(|u| u.to_string());
357
358 match conn.execute(
359 "INSERT INTO events \
360 (id, namespace, verb, substrate, actor, outcome, data, duration_us, target_id, created_at) \
361 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
362 rusqlite::params![
363 id_str,
364 &event.namespace,
365 &event.verb,
366 substrate_str,
367 &event.actor,
368 outcome_str,
369 data_str,
370 event.duration_us,
371 target_str,
372 event.created_at,
373 ],
374 ) {
375 Ok(_) => affected += 1,
376 Err(e) => {
377 if first_error.is_empty() {
378 first_error = e.to_string();
379 }
380 failed += 1;
381 }
382 }
383 }
384
385 if let Err(e) = conn.execute_batch("COMMIT") {
386 let _ = conn.execute_batch("ROLLBACK");
387 return Err(e);
388 }
389 Ok(BatchWriteSummary {
390 attempted,
391 affected,
392 failed,
393 first_error,
394 })
395 })
396 .await
397 }
398
399 async fn get_event(&self, id: Uuid) -> Result<Option<Event>, StorageError> {
400 let namespace = self.namespace.clone();
401 let id_str = id.to_string();
402
403 self.with_reader("get_event", move |conn| {
404 let mut stmt = conn.prepare(
405 "SELECT id, namespace, verb, substrate, actor, outcome, data, duration_us, target_id, created_at \
406 FROM events WHERE namespace = ?1 AND id = ?2",
407 )?;
408 let mut rows = stmt.query(rusqlite::params![namespace, id_str])?;
409 match rows.next()? {
410 Some(row) => Ok(Some(read_event(row)?)),
411 None => Ok(None),
412 }
413 })
414 .await
415 }
416
417 async fn query_events(
418 &self,
419 filter: EventFilter,
420 page: PageRequest,
421 ) -> Result<Page<Event>, StorageError> {
422 let namespace = self.namespace.clone();
423
424 self.with_reader("query_events", move |conn| {
425 let (where_clause, filter_params) = build_event_filter_sql(&namespace, &filter);
426
427 let count_sql = format!("SELECT COUNT(*) FROM events{}", where_clause);
428 let total: i64 = {
429 let mut stmt = conn.prepare(&count_sql)?;
430 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
431 filter_params.iter().map(|p| p.as_ref()).collect();
432 stmt.query_row(param_refs.as_slice(), |row| row.get(0))?
433 };
434
435 let (_, data_filter_params) = build_event_filter_sql(&namespace, &filter);
436 let mut all_params: Vec<Box<dyn rusqlite::types::ToSql>> = data_filter_params;
437 all_params.push(Box::new(page.limit as i64));
438 all_params.push(Box::new(page.offset as i64));
439
440 let limit_idx = all_params.len() - 1;
441 let offset_idx = all_params.len();
442
443 let data_sql = format!(
444 "SELECT id, namespace, verb, substrate, actor, outcome, data, duration_us, target_id, created_at \
445 FROM events{} ORDER BY created_at DESC LIMIT ?{} OFFSET ?{}",
446 where_clause, limit_idx, offset_idx,
447 );
448
449 let mut stmt = conn.prepare(&data_sql)?;
450 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
451 all_params.iter().map(|p| p.as_ref()).collect();
452 let rows = stmt.query_map(param_refs.as_slice(), read_event)?;
453
454 let mut items = Vec::new();
455 for row in rows {
456 items.push(row?);
457 }
458
459 Ok(Page {
460 items,
461 total: Some(total as u64),
462 })
463 })
464 .await
465 }
466
467 async fn count_events(&self, filter: EventFilter) -> Result<u64, StorageError> {
468 let namespace = self.namespace.clone();
469
470 self.with_reader("count_events", move |conn| {
471 let (where_clause, params) = build_event_filter_sql(&namespace, &filter);
472 let sql = format!("SELECT COUNT(*) FROM events{}", where_clause);
473 let mut stmt = conn.prepare(&sql)?;
474 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
475 params.iter().map(|p| p.as_ref()).collect();
476 let count: i64 = stmt.query_row(param_refs.as_slice(), |row| row.get(0))?;
477 Ok(count as u64)
478 })
479 .await
480 }
481}
482
483const EVENTS_DDL: &str = "\
488 CREATE TABLE IF NOT EXISTS events (\
489 id TEXT PRIMARY KEY,\
490 namespace TEXT NOT NULL,\
491 verb TEXT NOT NULL,\
492 substrate TEXT NOT NULL,\
493 actor TEXT NOT NULL,\
494 outcome TEXT NOT NULL,\
495 data TEXT,\
496 duration_us INTEGER NOT NULL DEFAULT 0,\
497 target_id TEXT,\
498 created_at INTEGER NOT NULL\
499 );\
500 CREATE INDEX IF NOT EXISTS idx_events_namespace ON events(namespace);\
501 CREATE INDEX IF NOT EXISTS idx_events_verb ON events(verb);\
502 CREATE INDEX IF NOT EXISTS idx_events_substrate ON events(substrate);\
503 CREATE INDEX IF NOT EXISTS idx_events_created ON events(created_at DESC);\
504 CREATE INDEX IF NOT EXISTS idx_events_ns_created ON events(namespace, created_at DESC);\
505";
506
507pub(crate) fn ensure_events_schema(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
508 conn.execute_batch(EVENTS_DDL)
509}
510
511#[cfg(test)]
512mod tests {
513 use super::*;
514 use crate::pool::PoolConfig;
515
516 fn setup_memory_store() -> SqlEventStore {
517 let config = PoolConfig {
518 path: None,
519 ..PoolConfig::default()
520 };
521 let pool = Arc::new(ConnectionPool::new(config).unwrap());
522
523 {
524 let writer = pool.writer().unwrap();
525 writer.conn().execute_batch(EVENTS_DDL).unwrap();
526 }
527
528 SqlEventStore::new_scoped(pool, false, "default")
529 }
530
531 fn make_event(namespace: &str) -> Event {
532 Event::new(namespace, "search", SubstrateKind::Note, "agent:test")
533 }
534
535 #[tokio::test]
536 async fn test_append_and_get_event() {
537 let store = setup_memory_store();
538
539 let event = make_event("default");
540 let id = event.id;
541
542 store.append_event(event).await.unwrap();
543
544 let fetched = store.get_event(id).await.unwrap();
545 assert!(fetched.is_some());
546 let fetched = fetched.unwrap();
547 assert_eq!(fetched.id, id);
548 assert_eq!(fetched.verb, "search");
549 assert_eq!(fetched.substrate, SubstrateKind::Note);
550 assert_eq!(fetched.actor, "agent:test");
551 assert_eq!(fetched.outcome, EventOutcome::Success);
552 }
553
554 #[tokio::test]
555 async fn test_append_events_batch() {
556 let store = setup_memory_store();
557
558 let events: Vec<Event> = (0..3).map(|_| make_event("default")).collect();
559 let summary = store.append_events(events).await.unwrap();
560 assert_eq!(summary.attempted, 3);
561 assert_eq!(summary.affected, 3);
562 assert_eq!(summary.failed, 0);
563 }
564
565 #[tokio::test]
566 async fn test_count_events() {
567 let store = setup_memory_store();
568
569 for _ in 0..3 {
570 store.append_event(make_event("default")).await.unwrap();
571 }
572
573 let count = store.count_events(EventFilter::default()).await.unwrap();
574 assert_eq!(count, 3);
575 }
576
577 #[tokio::test]
578 async fn test_query_events_filter_by_verb() {
579 let store = setup_memory_store();
580
581 store.append_event(make_event("default")).await.unwrap();
582
583 let mut create_event = make_event("default");
584 create_event.verb = "create".to_string();
585 store.append_event(create_event).await.unwrap();
586
587 let filter = EventFilter {
588 verbs: vec!["search".to_string()],
589 ..EventFilter::default()
590 };
591 let page = store
592 .query_events(
593 filter,
594 PageRequest {
595 limit: 10,
596 offset: 0,
597 },
598 )
599 .await
600 .unwrap();
601 assert_eq!(page.items.len(), 1);
602 assert_eq!(page.items[0].verb, "search");
603 }
604
605 #[tokio::test]
606 async fn test_query_events_filter_by_substrate() {
607 let store = setup_memory_store();
608
609 store.append_event(make_event("default")).await.unwrap();
610
611 let mut entity_event = make_event("default");
612 entity_event.substrate = SubstrateKind::Entity;
613 store.append_event(entity_event).await.unwrap();
614
615 let filter = EventFilter {
616 substrates: vec![SubstrateKind::Entity],
617 ..EventFilter::default()
618 };
619 let page = store
620 .query_events(
621 filter,
622 PageRequest {
623 limit: 10,
624 offset: 0,
625 },
626 )
627 .await
628 .unwrap();
629 assert_eq!(page.items.len(), 1);
630 assert_eq!(page.items[0].substrate, SubstrateKind::Entity);
631 }
632
633 #[tokio::test]
634 async fn test_outcome_roundtrip() {
635 let store = setup_memory_store();
636
637 let mut denied = make_event("default");
638 denied.outcome = EventOutcome::Denied;
639 let denied_id = denied.id;
640 store.append_event(denied).await.unwrap();
641
642 let fetched = store.get_event(denied_id).await.unwrap().unwrap();
643 assert_eq!(fetched.outcome, EventOutcome::Denied);
644 }
645}