Skip to main content

khive_db/stores/
event.rs

1//! SQL-backed `EventStore` implementation.
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use uuid::Uuid;
7
8use khive_storage::error::StorageError;
9use khive_storage::event::{Event, EventFilter};
10use khive_storage::types::{BatchWriteSummary, Page, PageRequest};
11use khive_storage::EventStore;
12use khive_storage::StorageCapability;
13use khive_types::{EventOutcome, SubstrateKind};
14
15use crate::error::SqliteError;
16use crate::pool::ConnectionPool;
17
18fn map_err(e: rusqlite::Error, op: &'static str) -> StorageError {
19    StorageError::driver(StorageCapability::Event, op, e)
20}
21
22fn map_sqlite_err(e: SqliteError, op: &'static str) -> StorageError {
23    StorageError::driver(StorageCapability::Event, op, e)
24}
25
26/// An EventStore backed by SQLite tables.
27pub struct SqlEventStore {
28    pool: Arc<ConnectionPool>,
29    is_file_backed: bool,
30    namespace: String,
31}
32
33impl SqlEventStore {
34    /// Create a new store scoped to one namespace.
35    pub fn new_scoped(
36        pool: Arc<ConnectionPool>,
37        is_file_backed: bool,
38        namespace: impl Into<String>,
39    ) -> Self {
40        Self {
41            pool,
42            is_file_backed,
43            namespace: namespace.into(),
44        }
45    }
46
47    fn open_standalone_writer(&self) -> Result<rusqlite::Connection, StorageError> {
48        let config = self.pool.config();
49        let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
50            operation: "event_writer".into(),
51            message: "in-memory databases do not support standalone connections".into(),
52        })?;
53
54        let conn = rusqlite::Connection::open_with_flags(
55            path,
56            rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE
57                | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
58                | rusqlite::OpenFlags::SQLITE_OPEN_URI,
59        )
60        .map_err(|e| map_err(e, "open_event_writer"))?;
61
62        conn.busy_timeout(config.busy_timeout)
63            .map_err(|e| map_err(e, "open_event_writer"))?;
64        conn.pragma_update(None, "foreign_keys", "ON")
65            .map_err(|e| map_err(e, "open_event_writer"))?;
66        conn.pragma_update(None, "synchronous", "NORMAL")
67            .map_err(|e| map_err(e, "open_event_writer"))?;
68
69        Ok(conn)
70    }
71
72    fn open_standalone_reader(&self) -> Result<rusqlite::Connection, StorageError> {
73        let config = self.pool.config();
74        let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
75            operation: "event_reader".into(),
76            message: "in-memory databases do not support standalone connections".into(),
77        })?;
78
79        let conn = rusqlite::Connection::open_with_flags(
80            path,
81            rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
82                | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
83                | rusqlite::OpenFlags::SQLITE_OPEN_URI,
84        )
85        .map_err(|e| map_err(e, "open_event_reader"))?;
86
87        conn.busy_timeout(config.busy_timeout)
88            .map_err(|e| map_err(e, "open_event_reader"))?;
89        conn.pragma_update(None, "foreign_keys", "ON")
90            .map_err(|e| map_err(e, "open_event_reader"))?;
91        conn.pragma_update(None, "synchronous", "NORMAL")
92            .map_err(|e| map_err(e, "open_event_reader"))?;
93
94        Ok(conn)
95    }
96
97    async fn with_writer<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
98    where
99        F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
100        R: Send + 'static,
101    {
102        if self.is_file_backed {
103            let conn = self.open_standalone_writer()?;
104            tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
105                .await
106                .map_err(|e| StorageError::driver(StorageCapability::Event, op, e))?
107        } else {
108            let pool = Arc::clone(&self.pool);
109            tokio::task::spawn_blocking(move || {
110                let guard = pool.try_writer().map_err(|e| map_sqlite_err(e, op))?;
111                f(guard.conn()).map_err(|e| map_err(e, op))
112            })
113            .await
114            .map_err(|e| StorageError::driver(StorageCapability::Event, op, e))?
115        }
116    }
117
118    async fn with_reader<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
119    where
120        F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
121        R: Send + 'static,
122    {
123        if self.is_file_backed {
124            let conn = self.open_standalone_reader()?;
125            tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
126                .await
127                .map_err(|e| StorageError::driver(StorageCapability::Event, op, e))?
128        } else {
129            let pool = Arc::clone(&self.pool);
130            tokio::task::spawn_blocking(move || {
131                let guard = pool.reader().map_err(|e| map_sqlite_err(e, op))?;
132                f(guard.conn()).map_err(|e| map_err(e, op))
133            })
134            .await
135            .map_err(|e| StorageError::driver(StorageCapability::Event, op, e))?
136        }
137    }
138}
139
140// =============================================================================
141// Helpers: parse SubstrateKind / EventOutcome from DB strings
142// =============================================================================
143
144fn substrate_from_str(s: &str) -> Result<SubstrateKind, rusqlite::Error> {
145    s.parse::<SubstrateKind>().map_err(|_| {
146        rusqlite::Error::FromSqlConversionFailure(
147            0,
148            rusqlite::types::Type::Text,
149            format!("unknown SubstrateKind: {s}").into(),
150        )
151    })
152}
153
154fn outcome_from_str(s: &str) -> Result<EventOutcome, rusqlite::Error> {
155    match s {
156        "success" => Ok(EventOutcome::Success),
157        "denied" => Ok(EventOutcome::Denied),
158        "error" => Ok(EventOutcome::Error),
159        other => Err(rusqlite::Error::FromSqlConversionFailure(
160            0,
161            rusqlite::types::Type::Text,
162            format!("unknown EventOutcome: {other}").into(),
163        )),
164    }
165}
166
167fn parse_uuid(s: &str) -> Result<Uuid, rusqlite::Error> {
168    Uuid::parse_str(s).map_err(|e| {
169        rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e))
170    })
171}
172
173// Column order: id(0), namespace(1), verb(2), substrate(3), actor(4),
174//               outcome(5), data(6), duration_us(7), target_id(8), created_at(9)
175fn read_event(row: &rusqlite::Row<'_>) -> Result<Event, rusqlite::Error> {
176    let id_str: String = row.get(0)?;
177    let namespace: String = row.get(1)?;
178    let verb: String = row.get(2)?;
179    let substrate_str: String = row.get(3)?;
180    let actor: String = row.get(4)?;
181    let outcome_str: String = row.get(5)?;
182    let data_str: Option<String> = row.get(6)?;
183    let duration_us: i64 = row.get(7)?;
184    let target_str: Option<String> = row.get(8)?;
185    let created_at: i64 = row.get(9)?;
186
187    let id = parse_uuid(&id_str)?;
188    let substrate = substrate_from_str(&substrate_str)?;
189    let outcome = outcome_from_str(&outcome_str)?;
190    let data = data_str
191        .as_deref()
192        .map(serde_json::from_str)
193        .transpose()
194        .map_err(|e| {
195            rusqlite::Error::FromSqlConversionFailure(6, rusqlite::types::Type::Text, Box::new(e))
196        })?;
197    let target_id = target_str.as_deref().map(parse_uuid).transpose()?;
198
199    Ok(Event {
200        id,
201        namespace,
202        verb,
203        substrate,
204        actor,
205        outcome,
206        data,
207        duration_us,
208        target_id,
209        created_at,
210    })
211}
212
213fn build_event_filter_sql(
214    default_namespace: &str,
215    filter: &EventFilter,
216) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
217    let mut conditions: Vec<String> = Vec::new();
218    let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
219
220    // If filter.namespaces is non-empty, use those; otherwise fall back to default_namespace.
221    if filter.namespaces.is_empty() {
222        params.push(Box::new(default_namespace.to_string()));
223        conditions.push(format!("namespace = ?{}", params.len()));
224    } else if filter.namespaces.len() == 1 {
225        params.push(Box::new(filter.namespaces[0].clone()));
226        conditions.push(format!("namespace = ?{}", params.len()));
227    } else {
228        let placeholders: Vec<String> = filter
229            .namespaces
230            .iter()
231            .map(|ns| {
232                params.push(Box::new(ns.clone()));
233                format!("?{}", params.len())
234            })
235            .collect();
236        conditions.push(format!("namespace IN ({})", placeholders.join(",")));
237    }
238
239    if !filter.ids.is_empty() {
240        let placeholders: Vec<String> = filter
241            .ids
242            .iter()
243            .map(|id| {
244                params.push(Box::new(id.to_string()));
245                format!("?{}", params.len())
246            })
247            .collect();
248        conditions.push(format!("id IN ({})", placeholders.join(",")));
249    }
250
251    if !filter.verbs.is_empty() {
252        let placeholders: Vec<String> = filter
253            .verbs
254            .iter()
255            .map(|v| {
256                params.push(Box::new(v.clone()));
257                format!("?{}", params.len())
258            })
259            .collect();
260        conditions.push(format!("verb IN ({})", placeholders.join(",")));
261    }
262
263    if !filter.substrates.is_empty() {
264        let placeholders: Vec<String> = filter
265            .substrates
266            .iter()
267            .map(|s| {
268                params.push(Box::new(s.name().to_string()));
269                format!("?{}", params.len())
270            })
271            .collect();
272        conditions.push(format!("substrate IN ({})", placeholders.join(",")));
273    }
274
275    if !filter.actors.is_empty() {
276        let placeholders: Vec<String> = filter
277            .actors
278            .iter()
279            .map(|a| {
280                params.push(Box::new(a.clone()));
281                format!("?{}", params.len())
282            })
283            .collect();
284        conditions.push(format!("actor IN ({})", placeholders.join(",")));
285    }
286
287    if let Some(after) = filter.after {
288        params.push(Box::new(after));
289        conditions.push(format!("created_at > ?{}", params.len()));
290    }
291
292    if let Some(before) = filter.before {
293        params.push(Box::new(before));
294        conditions.push(format!("created_at < ?{}", params.len()));
295    }
296
297    let clause = format!(" WHERE {}", conditions.join(" AND "));
298    (clause, params)
299}
300
301// =============================================================================
302// EventStore implementation
303// =============================================================================
304
305#[async_trait]
306impl EventStore for SqlEventStore {
307    async fn append_event(&self, event: Event) -> Result<(), StorageError> {
308        let id_str = event.id.to_string();
309        let substrate_str = event.substrate.name().to_string();
310        let outcome_str = event.outcome.name().to_string();
311        let data_str = event.data.as_ref().map(|v| v.to_string());
312        let target_str = event.target_id.map(|u| u.to_string());
313        let ns = event.namespace.clone();
314        let verb = event.verb.clone();
315        let actor = event.actor.clone();
316        let duration_us = event.duration_us;
317        let created_at = event.created_at;
318
319        self.with_writer("append_event", move |conn| {
320            conn.execute(
321                "INSERT INTO events \
322                 (id, namespace, verb, substrate, actor, outcome, data, duration_us, target_id, created_at) \
323                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
324                rusqlite::params![
325                    id_str,
326                    ns,
327                    verb,
328                    substrate_str,
329                    actor,
330                    outcome_str,
331                    data_str,
332                    duration_us,
333                    target_str,
334                    created_at,
335                ],
336            )?;
337            Ok(())
338        })
339        .await
340    }
341
342    async fn append_events(&self, events: Vec<Event>) -> Result<BatchWriteSummary, StorageError> {
343        let attempted = events.len() as u64;
344
345        self.with_writer("append_events", move |conn| {
346            conn.execute_batch("BEGIN IMMEDIATE")?;
347            let mut affected = 0u64;
348            let mut failed = 0u64;
349            let mut first_error = String::new();
350
351            for event in &events {
352                let id_str = event.id.to_string();
353                let substrate_str = event.substrate.name().to_string();
354                let outcome_str = event.outcome.name().to_string();
355                let data_str = event.data.as_ref().map(|v| v.to_string());
356                let target_str = event.target_id.map(|u| u.to_string());
357
358                match conn.execute(
359                    "INSERT INTO events \
360                     (id, namespace, verb, substrate, actor, outcome, data, duration_us, target_id, created_at) \
361                     VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
362                    rusqlite::params![
363                        id_str,
364                        &event.namespace,
365                        &event.verb,
366                        substrate_str,
367                        &event.actor,
368                        outcome_str,
369                        data_str,
370                        event.duration_us,
371                        target_str,
372                        event.created_at,
373                    ],
374                ) {
375                    Ok(_) => affected += 1,
376                    Err(e) => {
377                        if first_error.is_empty() {
378                            first_error = e.to_string();
379                        }
380                        failed += 1;
381                    }
382                }
383            }
384
385            if let Err(e) = conn.execute_batch("COMMIT") {
386                let _ = conn.execute_batch("ROLLBACK");
387                return Err(e);
388            }
389            Ok(BatchWriteSummary {
390                attempted,
391                affected,
392                failed,
393                first_error,
394            })
395        })
396        .await
397    }
398
399    async fn get_event(&self, id: Uuid) -> Result<Option<Event>, StorageError> {
400        let namespace = self.namespace.clone();
401        let id_str = id.to_string();
402
403        self.with_reader("get_event", move |conn| {
404            let mut stmt = conn.prepare(
405                "SELECT id, namespace, verb, substrate, actor, outcome, data, duration_us, target_id, created_at \
406                 FROM events WHERE namespace = ?1 AND id = ?2",
407            )?;
408            let mut rows = stmt.query(rusqlite::params![namespace, id_str])?;
409            match rows.next()? {
410                Some(row) => Ok(Some(read_event(row)?)),
411                None => Ok(None),
412            }
413        })
414        .await
415    }
416
417    async fn query_events(
418        &self,
419        filter: EventFilter,
420        page: PageRequest,
421    ) -> Result<Page<Event>, StorageError> {
422        let namespace = self.namespace.clone();
423
424        self.with_reader("query_events", move |conn| {
425            let (where_clause, filter_params) = build_event_filter_sql(&namespace, &filter);
426
427            let count_sql = format!("SELECT COUNT(*) FROM events{}", where_clause);
428            let total: i64 = {
429                let mut stmt = conn.prepare(&count_sql)?;
430                let param_refs: Vec<&dyn rusqlite::types::ToSql> =
431                    filter_params.iter().map(|p| p.as_ref()).collect();
432                stmt.query_row(param_refs.as_slice(), |row| row.get(0))?
433            };
434
435            let (_, data_filter_params) = build_event_filter_sql(&namespace, &filter);
436            let mut all_params: Vec<Box<dyn rusqlite::types::ToSql>> = data_filter_params;
437            all_params.push(Box::new(page.limit as i64));
438            all_params.push(Box::new(page.offset as i64));
439
440            let limit_idx = all_params.len() - 1;
441            let offset_idx = all_params.len();
442
443            let data_sql = format!(
444                "SELECT id, namespace, verb, substrate, actor, outcome, data, duration_us, target_id, created_at \
445                 FROM events{} ORDER BY created_at DESC LIMIT ?{} OFFSET ?{}",
446                where_clause, limit_idx, offset_idx,
447            );
448
449            let mut stmt = conn.prepare(&data_sql)?;
450            let param_refs: Vec<&dyn rusqlite::types::ToSql> =
451                all_params.iter().map(|p| p.as_ref()).collect();
452            let rows = stmt.query_map(param_refs.as_slice(), read_event)?;
453
454            let mut items = Vec::new();
455            for row in rows {
456                items.push(row?);
457            }
458
459            Ok(Page {
460                items,
461                total: Some(total as u64),
462            })
463        })
464        .await
465    }
466
467    async fn count_events(&self, filter: EventFilter) -> Result<u64, StorageError> {
468        let namespace = self.namespace.clone();
469
470        self.with_reader("count_events", move |conn| {
471            let (where_clause, params) = build_event_filter_sql(&namespace, &filter);
472            let sql = format!("SELECT COUNT(*) FROM events{}", where_clause);
473            let mut stmt = conn.prepare(&sql)?;
474            let param_refs: Vec<&dyn rusqlite::types::ToSql> =
475                params.iter().map(|p| p.as_ref()).collect();
476            let count: i64 = stmt.query_row(param_refs.as_slice(), |row| row.get(0))?;
477            Ok(count as u64)
478        })
479        .await
480    }
481}
482
483// =============================================================================
484// DDL
485// =============================================================================
486
487const EVENTS_DDL: &str = "\
488    CREATE TABLE IF NOT EXISTS events (\
489        id TEXT PRIMARY KEY,\
490        namespace TEXT NOT NULL,\
491        verb TEXT NOT NULL,\
492        substrate TEXT NOT NULL,\
493        actor TEXT NOT NULL,\
494        outcome TEXT NOT NULL,\
495        data TEXT,\
496        duration_us INTEGER NOT NULL DEFAULT 0,\
497        target_id TEXT,\
498        created_at INTEGER NOT NULL\
499    );\
500    CREATE INDEX IF NOT EXISTS idx_events_namespace ON events(namespace);\
501    CREATE INDEX IF NOT EXISTS idx_events_verb ON events(verb);\
502    CREATE INDEX IF NOT EXISTS idx_events_substrate ON events(substrate);\
503    CREATE INDEX IF NOT EXISTS idx_events_created ON events(created_at DESC);\
504";
505
506pub(crate) fn ensure_events_schema(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
507    conn.execute_batch(EVENTS_DDL)
508}
509
510#[cfg(test)]
511mod tests {
512    use super::*;
513    use crate::pool::PoolConfig;
514
515    fn setup_memory_store() -> SqlEventStore {
516        let config = PoolConfig {
517            path: None,
518            ..PoolConfig::default()
519        };
520        let pool = Arc::new(ConnectionPool::new(config).unwrap());
521
522        {
523            let writer = pool.writer().unwrap();
524            writer.conn().execute_batch(EVENTS_DDL).unwrap();
525        }
526
527        SqlEventStore::new_scoped(pool, false, "default")
528    }
529
530    fn make_event(namespace: &str) -> Event {
531        Event::new(namespace, "search", SubstrateKind::Note, "agent:test")
532    }
533
534    #[tokio::test]
535    async fn test_append_and_get_event() {
536        let store = setup_memory_store();
537
538        let event = make_event("default");
539        let id = event.id;
540
541        store.append_event(event).await.unwrap();
542
543        let fetched = store.get_event(id).await.unwrap();
544        assert!(fetched.is_some());
545        let fetched = fetched.unwrap();
546        assert_eq!(fetched.id, id);
547        assert_eq!(fetched.verb, "search");
548        assert_eq!(fetched.substrate, SubstrateKind::Note);
549        assert_eq!(fetched.actor, "agent:test");
550        assert_eq!(fetched.outcome, EventOutcome::Success);
551    }
552
553    #[tokio::test]
554    async fn test_append_events_batch() {
555        let store = setup_memory_store();
556
557        let events: Vec<Event> = (0..3).map(|_| make_event("default")).collect();
558        let summary = store.append_events(events).await.unwrap();
559        assert_eq!(summary.attempted, 3);
560        assert_eq!(summary.affected, 3);
561        assert_eq!(summary.failed, 0);
562    }
563
564    #[tokio::test]
565    async fn test_count_events() {
566        let store = setup_memory_store();
567
568        for _ in 0..3 {
569            store.append_event(make_event("default")).await.unwrap();
570        }
571
572        let count = store.count_events(EventFilter::default()).await.unwrap();
573        assert_eq!(count, 3);
574    }
575
576    #[tokio::test]
577    async fn test_query_events_filter_by_verb() {
578        let store = setup_memory_store();
579
580        store.append_event(make_event("default")).await.unwrap();
581
582        let mut create_event = make_event("default");
583        create_event.verb = "create".to_string();
584        store.append_event(create_event).await.unwrap();
585
586        let filter = EventFilter {
587            verbs: vec!["search".to_string()],
588            ..EventFilter::default()
589        };
590        let page = store
591            .query_events(
592                filter,
593                PageRequest {
594                    limit: 10,
595                    offset: 0,
596                },
597            )
598            .await
599            .unwrap();
600        assert_eq!(page.items.len(), 1);
601        assert_eq!(page.items[0].verb, "search");
602    }
603
604    #[tokio::test]
605    async fn test_query_events_filter_by_substrate() {
606        let store = setup_memory_store();
607
608        store.append_event(make_event("default")).await.unwrap();
609
610        let mut entity_event = make_event("default");
611        entity_event.substrate = SubstrateKind::Entity;
612        store.append_event(entity_event).await.unwrap();
613
614        let filter = EventFilter {
615            substrates: vec![SubstrateKind::Entity],
616            ..EventFilter::default()
617        };
618        let page = store
619            .query_events(
620                filter,
621                PageRequest {
622                    limit: 10,
623                    offset: 0,
624                },
625            )
626            .await
627            .unwrap();
628        assert_eq!(page.items.len(), 1);
629        assert_eq!(page.items[0].substrate, SubstrateKind::Entity);
630    }
631
632    #[tokio::test]
633    async fn test_outcome_roundtrip() {
634        let store = setup_memory_store();
635
636        let mut denied = make_event("default");
637        denied.outcome = EventOutcome::Denied;
638        let denied_id = denied.id;
639        store.append_event(denied).await.unwrap();
640
641        let fetched = store.get_event(denied_id).await.unwrap().unwrap();
642        assert_eq!(fetched.outcome, EventOutcome::Denied);
643    }
644}