Skip to main content

khive_db/stores/
event.rs

1//! SQL-backed `EventStore` implementation.
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use uuid::Uuid;
7
8use khive_storage::error::StorageError;
9use khive_storage::event::{Event, EventFilter};
10use khive_storage::types::{BatchWriteSummary, Page, PageRequest};
11use khive_storage::EventStore;
12use khive_storage::StorageCapability;
13use khive_types::{EventOutcome, SubstrateKind};
14
15use crate::error::SqliteError;
16use crate::pool::ConnectionPool;
17
18fn map_err(e: rusqlite::Error, op: &'static str) -> StorageError {
19    StorageError::driver(StorageCapability::Event, op, e)
20}
21
22fn map_sqlite_err(e: SqliteError, op: &'static str) -> StorageError {
23    StorageError::driver(StorageCapability::Event, op, e)
24}
25
26/// An EventStore backed by SQLite tables.
27pub struct SqlEventStore {
28    pool: Arc<ConnectionPool>,
29    is_file_backed: bool,
30    namespace: String,
31}
32
33impl SqlEventStore {
34    /// Create a new store scoped to one namespace.
35    pub fn new_scoped(
36        pool: Arc<ConnectionPool>,
37        is_file_backed: bool,
38        namespace: impl Into<String>,
39    ) -> Self {
40        Self {
41            pool,
42            is_file_backed,
43            namespace: namespace.into(),
44        }
45    }
46
47    fn open_standalone_writer(&self) -> Result<rusqlite::Connection, StorageError> {
48        let config = self.pool.config();
49        let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
50            operation: "event_writer".into(),
51            message: "in-memory databases do not support standalone connections".into(),
52        })?;
53
54        let conn = rusqlite::Connection::open_with_flags(
55            path,
56            rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE
57                | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
58                | rusqlite::OpenFlags::SQLITE_OPEN_URI,
59        )
60        .map_err(|e| map_err(e, "open_event_writer"))?;
61
62        conn.busy_timeout(config.busy_timeout)
63            .map_err(|e| map_err(e, "open_event_writer"))?;
64        conn.pragma_update(None, "foreign_keys", "ON")
65            .map_err(|e| map_err(e, "open_event_writer"))?;
66        conn.pragma_update(None, "synchronous", "NORMAL")
67            .map_err(|e| map_err(e, "open_event_writer"))?;
68
69        Ok(conn)
70    }
71
72    fn open_standalone_reader(&self) -> Result<rusqlite::Connection, StorageError> {
73        let config = self.pool.config();
74        let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
75            operation: "event_reader".into(),
76            message: "in-memory databases do not support standalone connections".into(),
77        })?;
78
79        let conn = rusqlite::Connection::open_with_flags(
80            path,
81            rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
82                | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
83                | rusqlite::OpenFlags::SQLITE_OPEN_URI,
84        )
85        .map_err(|e| map_err(e, "open_event_reader"))?;
86
87        conn.busy_timeout(config.busy_timeout)
88            .map_err(|e| map_err(e, "open_event_reader"))?;
89        conn.pragma_update(None, "foreign_keys", "ON")
90            .map_err(|e| map_err(e, "open_event_reader"))?;
91        conn.pragma_update(None, "synchronous", "NORMAL")
92            .map_err(|e| map_err(e, "open_event_reader"))?;
93
94        Ok(conn)
95    }
96
97    async fn with_writer<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
98    where
99        F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
100        R: Send + 'static,
101    {
102        if self.is_file_backed {
103            let conn = self.open_standalone_writer()?;
104            tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
105                .await
106                .map_err(|e| StorageError::driver(StorageCapability::Event, op, e))?
107        } else {
108            let pool = Arc::clone(&self.pool);
109            tokio::task::spawn_blocking(move || {
110                let guard = pool.try_writer().map_err(|e| map_sqlite_err(e, op))?;
111                f(guard.conn()).map_err(|e| map_err(e, op))
112            })
113            .await
114            .map_err(|e| StorageError::driver(StorageCapability::Event, op, e))?
115        }
116    }
117
118    async fn with_reader<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
119    where
120        F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
121        R: Send + 'static,
122    {
123        if self.is_file_backed {
124            let conn = self.open_standalone_reader()?;
125            tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
126                .await
127                .map_err(|e| StorageError::driver(StorageCapability::Event, op, e))?
128        } else {
129            let pool = Arc::clone(&self.pool);
130            tokio::task::spawn_blocking(move || {
131                let guard = pool.reader().map_err(|e| map_sqlite_err(e, op))?;
132                f(guard.conn()).map_err(|e| map_err(e, op))
133            })
134            .await
135            .map_err(|e| StorageError::driver(StorageCapability::Event, op, e))?
136        }
137    }
138}
139
140// =============================================================================
141// Helpers: parse SubstrateKind / EventOutcome from DB strings
142// =============================================================================
143
144fn substrate_from_str(s: &str) -> Result<SubstrateKind, rusqlite::Error> {
145    s.parse::<SubstrateKind>().map_err(|_| {
146        rusqlite::Error::FromSqlConversionFailure(
147            0,
148            rusqlite::types::Type::Text,
149            format!("unknown SubstrateKind: {s}").into(),
150        )
151    })
152}
153
154fn outcome_from_str(s: &str) -> Result<EventOutcome, rusqlite::Error> {
155    match s {
156        "success" => Ok(EventOutcome::Success),
157        "denied" => Ok(EventOutcome::Denied),
158        "error" => Ok(EventOutcome::Error),
159        other => Err(rusqlite::Error::FromSqlConversionFailure(
160            0,
161            rusqlite::types::Type::Text,
162            format!("unknown EventOutcome: {other}").into(),
163        )),
164    }
165}
166
167fn parse_uuid(s: &str) -> Result<Uuid, rusqlite::Error> {
168    Uuid::parse_str(s).map_err(|e| {
169        rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e))
170    })
171}
172
173// Column order: id(0), namespace(1), verb(2), substrate(3), actor(4),
174//               outcome(5), data(6), duration_us(7), target_id(8), created_at(9)
175fn read_event(row: &rusqlite::Row<'_>) -> Result<Event, rusqlite::Error> {
176    let id_str: String = row.get(0)?;
177    let namespace: String = row.get(1)?;
178    let verb: String = row.get(2)?;
179    let substrate_str: String = row.get(3)?;
180    let actor: String = row.get(4)?;
181    let outcome_str: String = row.get(5)?;
182    let data_str: Option<String> = row.get(6)?;
183    let duration_us: i64 = row.get(7)?;
184    let target_str: Option<String> = row.get(8)?;
185    let created_at: i64 = row.get(9)?;
186
187    let id = parse_uuid(&id_str)?;
188    let substrate = substrate_from_str(&substrate_str)?;
189    let outcome = outcome_from_str(&outcome_str)?;
190    let data = data_str
191        .as_deref()
192        .map(serde_json::from_str)
193        .transpose()
194        .map_err(|e| {
195            rusqlite::Error::FromSqlConversionFailure(6, rusqlite::types::Type::Text, Box::new(e))
196        })?;
197    let target_id = target_str.as_deref().map(parse_uuid).transpose()?;
198
199    Ok(Event {
200        id,
201        namespace,
202        verb,
203        substrate,
204        actor,
205        outcome,
206        data,
207        duration_us,
208        target_id,
209        created_at,
210    })
211}
212
213fn build_event_filter_sql(
214    default_namespace: &str,
215    filter: &EventFilter,
216) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
217    let mut conditions: Vec<String> = Vec::new();
218    let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
219
220    // If filter.namespaces is non-empty, use those; otherwise fall back to default_namespace.
221    if filter.namespaces.is_empty() {
222        params.push(Box::new(default_namespace.to_string()));
223        conditions.push(format!("namespace = ?{}", params.len()));
224    } else if filter.namespaces.len() == 1 {
225        params.push(Box::new(filter.namespaces[0].clone()));
226        conditions.push(format!("namespace = ?{}", params.len()));
227    } else {
228        let placeholders: Vec<String> = filter
229            .namespaces
230            .iter()
231            .map(|ns| {
232                params.push(Box::new(ns.clone()));
233                format!("?{}", params.len())
234            })
235            .collect();
236        conditions.push(format!("namespace IN ({})", placeholders.join(",")));
237    }
238
239    if !filter.ids.is_empty() {
240        let placeholders: Vec<String> = filter
241            .ids
242            .iter()
243            .map(|id| {
244                params.push(Box::new(id.to_string()));
245                format!("?{}", params.len())
246            })
247            .collect();
248        conditions.push(format!("id IN ({})", placeholders.join(",")));
249    }
250
251    if !filter.verbs.is_empty() {
252        let placeholders: Vec<String> = filter
253            .verbs
254            .iter()
255            .map(|v| {
256                params.push(Box::new(v.clone()));
257                format!("?{}", params.len())
258            })
259            .collect();
260        conditions.push(format!("verb IN ({})", placeholders.join(",")));
261    }
262
263    if !filter.substrates.is_empty() {
264        let placeholders: Vec<String> = filter
265            .substrates
266            .iter()
267            .map(|s| {
268                params.push(Box::new(s.name().to_string()));
269                format!("?{}", params.len())
270            })
271            .collect();
272        conditions.push(format!("substrate IN ({})", placeholders.join(",")));
273    }
274
275    if !filter.actors.is_empty() {
276        let placeholders: Vec<String> = filter
277            .actors
278            .iter()
279            .map(|a| {
280                params.push(Box::new(a.clone()));
281                format!("?{}", params.len())
282            })
283            .collect();
284        conditions.push(format!("actor IN ({})", placeholders.join(",")));
285    }
286
287    if let Some(after) = filter.after {
288        params.push(Box::new(after));
289        conditions.push(format!("created_at > ?{}", params.len()));
290    }
291
292    if let Some(before) = filter.before {
293        params.push(Box::new(before));
294        conditions.push(format!("created_at < ?{}", params.len()));
295    }
296
297    let clause = format!(" WHERE {}", conditions.join(" AND "));
298    (clause, params)
299}
300
301// =============================================================================
302// EventStore implementation
303// =============================================================================
304
305#[async_trait]
306impl EventStore for SqlEventStore {
307    async fn append_event(&self, event: Event) -> Result<(), StorageError> {
308        let id_str = event.id.to_string();
309        let substrate_str = event.substrate.name().to_string();
310        let outcome_str = event.outcome.name().to_string();
311        let data_str = event.data.as_ref().map(|v| v.to_string());
312        let target_str = event.target_id.map(|u| u.to_string());
313        let ns = event.namespace.clone();
314        let verb = event.verb.clone();
315        let actor = event.actor.clone();
316        let duration_us = event.duration_us;
317        let created_at = event.created_at;
318
319        self.with_writer("append_event", move |conn| {
320            conn.execute(
321                "INSERT INTO events \
322                 (id, namespace, verb, substrate, actor, outcome, data, duration_us, target_id, created_at) \
323                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
324                rusqlite::params![
325                    id_str,
326                    ns,
327                    verb,
328                    substrate_str,
329                    actor,
330                    outcome_str,
331                    data_str,
332                    duration_us,
333                    target_str,
334                    created_at,
335                ],
336            )?;
337            Ok(())
338        })
339        .await
340    }
341
342    async fn append_events(&self, events: Vec<Event>) -> Result<BatchWriteSummary, StorageError> {
343        let attempted = events.len() as u64;
344
345        self.with_writer("append_events", move |conn| {
346            conn.execute_batch("BEGIN IMMEDIATE")?;
347            let mut affected = 0u64;
348            let mut failed = 0u64;
349            let mut first_error = String::new();
350
351            for event in &events {
352                let id_str = event.id.to_string();
353                let substrate_str = event.substrate.name().to_string();
354                let outcome_str = event.outcome.name().to_string();
355                let data_str = event.data.as_ref().map(|v| v.to_string());
356                let target_str = event.target_id.map(|u| u.to_string());
357
358                match conn.execute(
359                    "INSERT INTO events \
360                     (id, namespace, verb, substrate, actor, outcome, data, duration_us, target_id, created_at) \
361                     VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
362                    rusqlite::params![
363                        id_str,
364                        &event.namespace,
365                        &event.verb,
366                        substrate_str,
367                        &event.actor,
368                        outcome_str,
369                        data_str,
370                        event.duration_us,
371                        target_str,
372                        event.created_at,
373                    ],
374                ) {
375                    Ok(_) => affected += 1,
376                    Err(e) => {
377                        if first_error.is_empty() {
378                            first_error = e.to_string();
379                        }
380                        failed += 1;
381                    }
382                }
383            }
384
385            if let Err(e) = conn.execute_batch("COMMIT") {
386                let _ = conn.execute_batch("ROLLBACK");
387                return Err(e);
388            }
389            Ok(BatchWriteSummary {
390                attempted,
391                affected,
392                failed,
393                first_error,
394            })
395        })
396        .await
397    }
398
399    async fn get_event(&self, id: Uuid) -> Result<Option<Event>, StorageError> {
400        let namespace = self.namespace.clone();
401        let id_str = id.to_string();
402
403        self.with_reader("get_event", move |conn| {
404            let mut stmt = conn.prepare(
405                "SELECT id, namespace, verb, substrate, actor, outcome, data, duration_us, target_id, created_at \
406                 FROM events WHERE namespace = ?1 AND id = ?2",
407            )?;
408            let mut rows = stmt.query(rusqlite::params![namespace, id_str])?;
409            match rows.next()? {
410                Some(row) => Ok(Some(read_event(row)?)),
411                None => Ok(None),
412            }
413        })
414        .await
415    }
416
417    async fn query_events(
418        &self,
419        filter: EventFilter,
420        page: PageRequest,
421    ) -> Result<Page<Event>, StorageError> {
422        let namespace = self.namespace.clone();
423
424        self.with_reader("query_events", move |conn| {
425            let (where_clause, filter_params) = build_event_filter_sql(&namespace, &filter);
426
427            let count_sql = format!("SELECT COUNT(*) FROM events{}", where_clause);
428            let total: i64 = {
429                let mut stmt = conn.prepare(&count_sql)?;
430                let param_refs: Vec<&dyn rusqlite::types::ToSql> =
431                    filter_params.iter().map(|p| p.as_ref()).collect();
432                stmt.query_row(param_refs.as_slice(), |row| row.get(0))?
433            };
434
435            let (_, data_filter_params) = build_event_filter_sql(&namespace, &filter);
436            let mut all_params: Vec<Box<dyn rusqlite::types::ToSql>> = data_filter_params;
437            all_params.push(Box::new(page.limit as i64));
438            all_params.push(Box::new(page.offset as i64));
439
440            let limit_idx = all_params.len() - 1;
441            let offset_idx = all_params.len();
442
443            let data_sql = format!(
444                "SELECT id, namespace, verb, substrate, actor, outcome, data, duration_us, target_id, created_at \
445                 FROM events{} ORDER BY created_at DESC LIMIT ?{} OFFSET ?{}",
446                where_clause, limit_idx, offset_idx,
447            );
448
449            let mut stmt = conn.prepare(&data_sql)?;
450            let param_refs: Vec<&dyn rusqlite::types::ToSql> =
451                all_params.iter().map(|p| p.as_ref()).collect();
452            let rows = stmt.query_map(param_refs.as_slice(), read_event)?;
453
454            let mut items = Vec::new();
455            for row in rows {
456                items.push(row?);
457            }
458
459            Ok(Page {
460                items,
461                total: Some(total as u64),
462            })
463        })
464        .await
465    }
466
467    async fn count_events(&self, filter: EventFilter) -> Result<u64, StorageError> {
468        let namespace = self.namespace.clone();
469
470        self.with_reader("count_events", move |conn| {
471            let (where_clause, params) = build_event_filter_sql(&namespace, &filter);
472            let sql = format!("SELECT COUNT(*) FROM events{}", where_clause);
473            let mut stmt = conn.prepare(&sql)?;
474            let param_refs: Vec<&dyn rusqlite::types::ToSql> =
475                params.iter().map(|p| p.as_ref()).collect();
476            let count: i64 = stmt.query_row(param_refs.as_slice(), |row| row.get(0))?;
477            Ok(count as u64)
478        })
479        .await
480    }
481}
482
483// =============================================================================
484// DDL
485// =============================================================================
486
487const EVENTS_DDL: &str = "\
488    CREATE TABLE IF NOT EXISTS events (\
489        id TEXT PRIMARY KEY,\
490        namespace TEXT NOT NULL,\
491        verb TEXT NOT NULL,\
492        substrate TEXT NOT NULL,\
493        actor TEXT NOT NULL,\
494        outcome TEXT NOT NULL,\
495        data TEXT,\
496        duration_us INTEGER NOT NULL DEFAULT 0,\
497        target_id TEXT,\
498        created_at INTEGER NOT NULL\
499    );\
500    CREATE INDEX IF NOT EXISTS idx_events_namespace ON events(namespace);\
501    CREATE INDEX IF NOT EXISTS idx_events_verb ON events(verb);\
502    CREATE INDEX IF NOT EXISTS idx_events_substrate ON events(substrate);\
503    CREATE INDEX IF NOT EXISTS idx_events_created ON events(created_at DESC);\
504    CREATE INDEX IF NOT EXISTS idx_events_ns_created ON events(namespace, created_at DESC);\
505";
506
507pub(crate) fn ensure_events_schema(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
508    conn.execute_batch(EVENTS_DDL)
509}
510
511#[cfg(test)]
512mod tests {
513    use super::*;
514    use crate::pool::PoolConfig;
515
516    fn setup_memory_store() -> SqlEventStore {
517        let config = PoolConfig {
518            path: None,
519            ..PoolConfig::default()
520        };
521        let pool = Arc::new(ConnectionPool::new(config).unwrap());
522
523        {
524            let writer = pool.writer().unwrap();
525            writer.conn().execute_batch(EVENTS_DDL).unwrap();
526        }
527
528        SqlEventStore::new_scoped(pool, false, "default")
529    }
530
531    fn make_event(namespace: &str) -> Event {
532        Event::new(namespace, "search", SubstrateKind::Note, "agent:test")
533    }
534
535    #[tokio::test]
536    async fn test_append_and_get_event() {
537        let store = setup_memory_store();
538
539        let event = make_event("default");
540        let id = event.id;
541
542        store.append_event(event).await.unwrap();
543
544        let fetched = store.get_event(id).await.unwrap();
545        assert!(fetched.is_some());
546        let fetched = fetched.unwrap();
547        assert_eq!(fetched.id, id);
548        assert_eq!(fetched.verb, "search");
549        assert_eq!(fetched.substrate, SubstrateKind::Note);
550        assert_eq!(fetched.actor, "agent:test");
551        assert_eq!(fetched.outcome, EventOutcome::Success);
552    }
553
554    #[tokio::test]
555    async fn test_append_events_batch() {
556        let store = setup_memory_store();
557
558        let events: Vec<Event> = (0..3).map(|_| make_event("default")).collect();
559        let summary = store.append_events(events).await.unwrap();
560        assert_eq!(summary.attempted, 3);
561        assert_eq!(summary.affected, 3);
562        assert_eq!(summary.failed, 0);
563    }
564
565    #[tokio::test]
566    async fn test_count_events() {
567        let store = setup_memory_store();
568
569        for _ in 0..3 {
570            store.append_event(make_event("default")).await.unwrap();
571        }
572
573        let count = store.count_events(EventFilter::default()).await.unwrap();
574        assert_eq!(count, 3);
575    }
576
577    #[tokio::test]
578    async fn test_query_events_filter_by_verb() {
579        let store = setup_memory_store();
580
581        store.append_event(make_event("default")).await.unwrap();
582
583        let mut create_event = make_event("default");
584        create_event.verb = "create".to_string();
585        store.append_event(create_event).await.unwrap();
586
587        let filter = EventFilter {
588            verbs: vec!["search".to_string()],
589            ..EventFilter::default()
590        };
591        let page = store
592            .query_events(
593                filter,
594                PageRequest {
595                    limit: 10,
596                    offset: 0,
597                },
598            )
599            .await
600            .unwrap();
601        assert_eq!(page.items.len(), 1);
602        assert_eq!(page.items[0].verb, "search");
603    }
604
605    #[tokio::test]
606    async fn test_query_events_filter_by_substrate() {
607        let store = setup_memory_store();
608
609        store.append_event(make_event("default")).await.unwrap();
610
611        let mut entity_event = make_event("default");
612        entity_event.substrate = SubstrateKind::Entity;
613        store.append_event(entity_event).await.unwrap();
614
615        let filter = EventFilter {
616            substrates: vec![SubstrateKind::Entity],
617            ..EventFilter::default()
618        };
619        let page = store
620            .query_events(
621                filter,
622                PageRequest {
623                    limit: 10,
624                    offset: 0,
625                },
626            )
627            .await
628            .unwrap();
629        assert_eq!(page.items.len(), 1);
630        assert_eq!(page.items[0].substrate, SubstrateKind::Entity);
631    }
632
633    #[tokio::test]
634    async fn test_outcome_roundtrip() {
635        let store = setup_memory_store();
636
637        let mut denied = make_event("default");
638        denied.outcome = EventOutcome::Denied;
639        let denied_id = denied.id;
640        store.append_event(denied).await.unwrap();
641
642        let fetched = store.get_event(denied_id).await.unwrap().unwrap();
643        assert_eq!(fetched.outcome, EventOutcome::Denied);
644    }
645}