Skip to main content

khive_db/stores/
event.rs

1//! SQL-backed `EventStore` implementation.
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use uuid::Uuid;
7
8use khive_storage::error::StorageError;
9use khive_storage::event::{Event, EventFilter, EventObservation, ObservationRole, ReferentKind};
10use khive_storage::types::{BatchWriteSummary, Page, PageRequest};
11use khive_storage::EventStore;
12use khive_storage::StorageCapability;
13use khive_types::{EventKind, EventOutcome, SubstrateKind};
14
15use crate::error::SqliteError;
16use crate::pool::ConnectionPool;
17
18fn map_err(e: rusqlite::Error, op: &'static str) -> StorageError {
19    StorageError::driver(StorageCapability::Events, op, e)
20}
21
22fn map_sqlite_err(e: SqliteError, op: &'static str) -> StorageError {
23    StorageError::driver(StorageCapability::Events, op, e)
24}
25
26/// An EventStore backed by SQLite tables.
27pub struct SqlEventStore {
28    pool: Arc<ConnectionPool>,
29    is_file_backed: bool,
30    namespace: String,
31}
32
33impl SqlEventStore {
34    /// Create a new store scoped to one namespace.
35    pub fn new_scoped(
36        pool: Arc<ConnectionPool>,
37        is_file_backed: bool,
38        namespace: impl Into<String>,
39    ) -> Self {
40        Self {
41            pool,
42            is_file_backed,
43            namespace: namespace.into(),
44        }
45    }
46
47    fn open_standalone_writer(&self) -> Result<rusqlite::Connection, StorageError> {
48        let config = self.pool.config();
49        let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
50            operation: "event_writer".into(),
51            message: "in-memory databases do not support standalone connections".into(),
52        })?;
53
54        let conn = rusqlite::Connection::open_with_flags(
55            path,
56            rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE
57                | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
58                | rusqlite::OpenFlags::SQLITE_OPEN_URI,
59        )
60        .map_err(|e| map_err(e, "open_event_writer"))?;
61
62        conn.busy_timeout(config.busy_timeout)
63            .map_err(|e| map_err(e, "open_event_writer"))?;
64        conn.pragma_update(None, "foreign_keys", "ON")
65            .map_err(|e| map_err(e, "open_event_writer"))?;
66        conn.pragma_update(None, "synchronous", "NORMAL")
67            .map_err(|e| map_err(e, "open_event_writer"))?;
68
69        Ok(conn)
70    }
71
72    fn open_standalone_reader(&self) -> Result<rusqlite::Connection, StorageError> {
73        let config = self.pool.config();
74        let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
75            operation: "event_reader".into(),
76            message: "in-memory databases do not support standalone connections".into(),
77        })?;
78
79        let conn = rusqlite::Connection::open_with_flags(
80            path,
81            rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
82                | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
83                | rusqlite::OpenFlags::SQLITE_OPEN_URI,
84        )
85        .map_err(|e| map_err(e, "open_event_reader"))?;
86
87        conn.busy_timeout(config.busy_timeout)
88            .map_err(|e| map_err(e, "open_event_reader"))?;
89        conn.pragma_update(None, "foreign_keys", "ON")
90            .map_err(|e| map_err(e, "open_event_reader"))?;
91        conn.pragma_update(None, "synchronous", "NORMAL")
92            .map_err(|e| map_err(e, "open_event_reader"))?;
93
94        Ok(conn)
95    }
96
97    async fn with_writer<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
98    where
99        F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
100        R: Send + 'static,
101    {
102        if self.is_file_backed {
103            let conn = self.open_standalone_writer()?;
104            tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
105                .await
106                .map_err(|e| StorageError::driver(StorageCapability::Events, op, e))?
107        } else {
108            let pool = Arc::clone(&self.pool);
109            tokio::task::spawn_blocking(move || {
110                let guard = pool.try_writer().map_err(|e| map_sqlite_err(e, op))?;
111                f(guard.conn()).map_err(|e| map_err(e, op))
112            })
113            .await
114            .map_err(|e| StorageError::driver(StorageCapability::Events, op, e))?
115        }
116    }
117
118    async fn with_reader<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
119    where
120        F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
121        R: Send + 'static,
122    {
123        if self.is_file_backed {
124            let conn = self.open_standalone_reader()?;
125            tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
126                .await
127                .map_err(|e| StorageError::driver(StorageCapability::Events, op, e))?
128        } else {
129            let pool = Arc::clone(&self.pool);
130            tokio::task::spawn_blocking(move || {
131                let guard = pool.reader().map_err(|e| map_sqlite_err(e, op))?;
132                f(guard.conn()).map_err(|e| map_err(e, op))
133            })
134            .await
135            .map_err(|e| StorageError::driver(StorageCapability::Events, op, e))?
136        }
137    }
138}
139
140// =============================================================================
141// Helpers: parse SubstrateKind / EventOutcome / EventKind from DB strings
142// =============================================================================
143
144fn substrate_from_str(s: &str) -> Result<SubstrateKind, rusqlite::Error> {
145    s.parse::<SubstrateKind>().map_err(|_| {
146        rusqlite::Error::FromSqlConversionFailure(
147            0,
148            rusqlite::types::Type::Text,
149            format!("unknown SubstrateKind: {s}").into(),
150        )
151    })
152}
153
154fn outcome_from_str(s: &str) -> Result<EventOutcome, rusqlite::Error> {
155    match s {
156        "success" => Ok(EventOutcome::Success),
157        "denied" => Ok(EventOutcome::Denied),
158        "error" => Ok(EventOutcome::Error),
159        other => Err(rusqlite::Error::FromSqlConversionFailure(
160            0,
161            rusqlite::types::Type::Text,
162            format!("unknown EventOutcome: {other}").into(),
163        )),
164    }
165}
166
167fn kind_from_str(s: &str) -> Result<EventKind, rusqlite::Error> {
168    s.parse::<EventKind>().map_err(|_| {
169        rusqlite::Error::FromSqlConversionFailure(
170            0,
171            rusqlite::types::Type::Text,
172            format!("unknown EventKind: {s}").into(),
173        )
174    })
175}
176
177fn parse_uuid(s: &str) -> Result<Uuid, rusqlite::Error> {
178    Uuid::parse_str(s).map_err(|e| {
179        rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e))
180    })
181}
182
183// Column order: id(0), namespace(1), verb(2), substrate(3), actor(4),
184//               kind(5), outcome(6), payload(7), payload_schema_version(8),
185//               profile_state_version(9), duration_us(10), target_id(11),
186//               session_id(12), aggregate_kind(13), aggregate_id(14), created_at(15)
187fn read_event(row: &rusqlite::Row<'_>) -> Result<Event, rusqlite::Error> {
188    let id_str: String = row.get(0)?;
189    let namespace: String = row.get(1)?;
190    let verb: String = row.get(2)?;
191    let substrate_str: String = row.get(3)?;
192    let actor: String = row.get(4)?;
193    let kind_str: String = row.get(5)?;
194    let outcome_str: String = row.get(6)?;
195    let payload_str: String = row.get(7)?;
196    let payload_schema_version: i64 = row.get(8)?;
197    let profile_state_version: Option<i64> = row.get(9)?;
198    let duration_us: i64 = row.get(10)?;
199    let target_str: Option<String> = row.get(11)?;
200    let session_str: Option<String> = row.get(12)?;
201    let aggregate_kind: Option<String> = row.get(13)?;
202    let aggregate_str: Option<String> = row.get(14)?;
203    let created_at: i64 = row.get(15)?;
204
205    let id = parse_uuid(&id_str)?;
206    let substrate = substrate_from_str(&substrate_str)?;
207    let kind = kind_from_str(&kind_str)?;
208    let outcome = outcome_from_str(&outcome_str)?;
209    let payload: serde_json::Value = serde_json::from_str(&payload_str).map_err(|e| {
210        rusqlite::Error::FromSqlConversionFailure(7, rusqlite::types::Type::Text, Box::new(e))
211    })?;
212    let target_id = target_str.as_deref().map(parse_uuid).transpose()?;
213    let session_id = session_str.as_deref().map(parse_uuid).transpose()?;
214    let aggregate_id = aggregate_str.as_deref().map(parse_uuid).transpose()?;
215
216    Ok(Event {
217        id,
218        namespace,
219        verb,
220        substrate,
221        actor,
222        kind,
223        outcome,
224        payload,
225        payload_schema_version: payload_schema_version as u32,
226        profile_state_version: profile_state_version.map(|v| v as u64),
227        duration_us,
228        target_id,
229        session_id,
230        aggregate_kind,
231        aggregate_id,
232        created_at,
233    })
234}
235
236// =============================================================================
237// Helpers: observation projection write path
238// =============================================================================
239
240fn insert_event_with_observations(
241    conn: &rusqlite::Connection,
242    event: &Event,
243) -> Result<(), rusqlite::Error> {
244    let id_str = event.id.to_string();
245    let substrate_str = event.substrate.name().to_string();
246    let kind_str = event.kind.name().to_string();
247    let outcome_str = event.outcome.name().to_string();
248    let payload_str = event.payload.to_string();
249    let target_str = event.target_id.map(|u| u.to_string());
250    let session_str = event.session_id.map(|u| u.to_string());
251    let aggregate_str = event.aggregate_id.map(|u| u.to_string());
252    let profile_state_version = event.profile_state_version.map(|v| v as i64);
253
254    conn.execute(
255        "INSERT INTO events \
256         (id, namespace, verb, substrate, actor, kind, outcome, payload, payload_schema_version, \
257          profile_state_version, duration_us, target_id, session_id, aggregate_kind, aggregate_id, created_at) \
258         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
259        rusqlite::params![
260            id_str,
261            &event.namespace,
262            &event.verb,
263            substrate_str,
264            &event.actor,
265            kind_str,
266            outcome_str,
267            payload_str,
268            event.payload_schema_version as i64,
269            profile_state_version,
270            event.duration_us,
271            target_str,
272            session_str,
273            &event.aggregate_kind,
274            aggregate_str,
275            event.created_at,
276        ],
277    )?;
278
279    for observation in decode_event_observations(event)? {
280        conn.execute(
281            "INSERT INTO event_observations \
282             (event_id, entity_id, referent_kind, role, position) \
283             VALUES (?1, ?2, ?3, ?4, ?5)",
284            rusqlite::params![
285                observation.event_id.to_string(),
286                observation.entity_id.to_string(),
287                observation.referent_kind.name(),
288                observation.role.name(),
289                observation.position as i64,
290            ],
291        )?;
292    }
293
294    Ok(())
295}
296
297fn decode_event_observations(event: &Event) -> Result<Vec<EventObservation>, rusqlite::Error> {
298    match event.kind {
299        EventKind::RerankExecuted => decode_rank_observations(event),
300        EventKind::RecallExecuted | EventKind::SearchExecuted => decode_rank_observations(event),
301        EventKind::LinkCreated => decode_link_observations(event),
302        EventKind::EntityCreated
303        | EventKind::EntityUpdated
304        | EventKind::EntityDeleted
305        | EventKind::NoteCreated
306        | EventKind::NoteUpdated
307        | EventKind::NoteDeleted
308        | EventKind::TaskTransitioned => decode_target_observation(event),
309        EventKind::FeedbackExplicit => decode_signal_observation(event),
310        _ => Ok(Vec::new()),
311    }
312}
313
314fn payload_uuid_array(event: &Event, field: &'static str) -> Result<Vec<Uuid>, rusqlite::Error> {
315    let Some(values) = event.payload.get(field) else {
316        return Ok(Vec::new());
317    };
318    let Some(array) = values.as_array() else {
319        return Err(invalid_payload(event.kind, field, "expected array"));
320    };
321
322    array
323        .iter()
324        .map(|value| {
325            value
326                .as_str()
327                .ok_or_else(|| invalid_payload(event.kind, field, "expected UUID string"))
328                .and_then(|s| Uuid::parse_str(s).map_err(|e| invalid_payload(event.kind, field, e)))
329        })
330        .collect()
331}
332
333fn payload_uuid(event: &Event, field: &'static str) -> Result<Option<Uuid>, rusqlite::Error> {
334    let Some(value) = event.payload.get(field) else {
335        return Ok(None);
336    };
337    let Some(s) = value.as_str() else {
338        return Err(invalid_payload(event.kind, field, "expected UUID string"));
339    };
340    Uuid::parse_str(s)
341        .map(Some)
342        .map_err(|e| invalid_payload(event.kind, field, e))
343}
344
345fn decode_rank_observations(event: &Event) -> Result<Vec<EventObservation>, rusqlite::Error> {
346    let mut rows = Vec::new();
347
348    for (position, entity_id) in payload_uuid_array(event, "candidates")?
349        .into_iter()
350        .enumerate()
351    {
352        rows.push(EventObservation {
353            event_id: event.id,
354            entity_id,
355            referent_kind: ReferentKind::Note,
356            role: ObservationRole::Candidate,
357            position: position as u32,
358        });
359    }
360
361    let selected = payload_uuid_array(event, "selected")
362        .or_else(|_| payload_uuid_array(event, "reranked"))
363        .or_else(|_| payload_uuid_array(event, "final_scores"))?;
364    for (position, entity_id) in selected.into_iter().enumerate() {
365        rows.push(EventObservation {
366            event_id: event.id,
367            entity_id,
368            referent_kind: ReferentKind::Note,
369            role: ObservationRole::Selected,
370            position: position as u32,
371        });
372    }
373
374    Ok(rows)
375}
376
377fn decode_link_observations(event: &Event) -> Result<Vec<EventObservation>, rusqlite::Error> {
378    let mut rows = Vec::new();
379    if let Some(source) = payload_uuid(event, "source_id")? {
380        rows.push(EventObservation {
381            event_id: event.id,
382            entity_id: source,
383            referent_kind: ReferentKind::Entity,
384            role: ObservationRole::Target,
385            position: 0,
386        });
387    }
388    if let Some(target) = payload_uuid(event, "target_id")? {
389        rows.push(EventObservation {
390            event_id: event.id,
391            entity_id: target,
392            referent_kind: ReferentKind::Entity,
393            role: ObservationRole::Target,
394            position: 1,
395        });
396    }
397    Ok(rows)
398}
399
400fn decode_target_observation(event: &Event) -> Result<Vec<EventObservation>, rusqlite::Error> {
401    let Some(entity_id) = event.target_id.or(payload_uuid(event, "target_id")?) else {
402        return Ok(Vec::new());
403    };
404    Ok(vec![EventObservation {
405        event_id: event.id,
406        entity_id,
407        referent_kind: if event.substrate == SubstrateKind::Note {
408            ReferentKind::Note
409        } else {
410            ReferentKind::Entity
411        },
412        role: ObservationRole::Target,
413        position: 0,
414    }])
415}
416
417fn decode_signal_observation(event: &Event) -> Result<Vec<EventObservation>, rusqlite::Error> {
418    let Some(entity_id) = payload_uuid(event, "about_id")? else {
419        return Ok(Vec::new());
420    };
421    Ok(vec![EventObservation {
422        event_id: event.id,
423        entity_id,
424        referent_kind: ReferentKind::Entity,
425        role: ObservationRole::Signal,
426        position: 0,
427    }])
428}
429
430fn invalid_payload(
431    kind: EventKind,
432    field: &'static str,
433    reason: impl std::fmt::Display,
434) -> rusqlite::Error {
435    rusqlite::Error::ToSqlConversionFailure(
436        format!("invalid payload for {}.{field}: {reason}", kind.name()).into(),
437    )
438}
439
440// =============================================================================
441// Helpers: filter SQL builder
442// =============================================================================
443
444fn build_event_filter_sql(
445    conn: &rusqlite::Connection,
446    default_namespace: &str,
447    filter: &EventFilter,
448) -> Result<(String, Vec<Box<dyn rusqlite::types::ToSql>>), rusqlite::Error> {
449    reject_missing_event_filter_schema(conn, filter)?;
450
451    let mut conditions: Vec<String> = Vec::new();
452    let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
453
454    params.push(Box::new(default_namespace.to_string()));
455    conditions.push(format!("namespace = ?{}", params.len()));
456
457    push_in_clause(
458        &mut conditions,
459        &mut params,
460        "id",
461        filter.ids.iter().map(Uuid::to_string),
462    );
463    push_in_clause(
464        &mut conditions,
465        &mut params,
466        "kind",
467        filter.kinds.iter().map(|kind| kind.name().to_string()),
468    );
469    push_in_clause(
470        &mut conditions,
471        &mut params,
472        "verb",
473        filter.verbs.iter().cloned(),
474    );
475    push_in_clause(
476        &mut conditions,
477        &mut params,
478        "substrate",
479        filter.substrates.iter().map(|s| s.name().to_string()),
480    );
481    push_in_clause(
482        &mut conditions,
483        &mut params,
484        "actor",
485        filter.actors.iter().cloned(),
486    );
487
488    if let Some(after) = filter.after {
489        params.push(Box::new(after));
490        conditions.push(format!("created_at > ?{}", params.len()));
491    }
492
493    if let Some(before) = filter.before {
494        params.push(Box::new(before));
495        conditions.push(format!("created_at < ?{}", params.len()));
496    }
497
498    if let Some(session_id) = filter.session_id {
499        params.push(Box::new(session_id.to_string()));
500        conditions.push(format!("session_id = ?{}", params.len()));
501    }
502
503    push_observation_exists(&mut conditions, &mut params, "candidate", &filter.observed);
504    push_observation_exists(&mut conditions, &mut params, "selected", &filter.selected);
505
506    if let Some(proposal_id) = filter.payload_proposal_id {
507        params.push(Box::new(proposal_id.to_string()));
508        conditions.push(format!(
509            "json_extract(payload, '$.proposal_id') = ?{}",
510            params.len()
511        ));
512    }
513
514    let clause = format!(" WHERE {}", conditions.join(" AND "));
515    Ok((clause, params))
516}
517
518fn push_in_clause<I>(
519    conditions: &mut Vec<String>,
520    params: &mut Vec<Box<dyn rusqlite::types::ToSql>>,
521    column: &'static str,
522    values: I,
523) where
524    I: IntoIterator<Item = String>,
525{
526    let placeholders: Vec<String> = values
527        .into_iter()
528        .map(|value| {
529            params.push(Box::new(value));
530            format!("?{}", params.len())
531        })
532        .collect();
533    if !placeholders.is_empty() {
534        conditions.push(format!("{column} IN ({})", placeholders.join(",")));
535    }
536}
537
538fn push_observation_exists(
539    conditions: &mut Vec<String>,
540    params: &mut Vec<Box<dyn rusqlite::types::ToSql>>,
541    role: &'static str,
542    entity_ids: &[Uuid],
543) {
544    if entity_ids.is_empty() {
545        return;
546    }
547    let placeholders: Vec<String> = entity_ids
548        .iter()
549        .map(|id| {
550            params.push(Box::new(id.to_string()));
551            format!("?{}", params.len())
552        })
553        .collect();
554    conditions.push(format!(
555        "EXISTS (SELECT 1 FROM event_observations o \
556         WHERE o.event_id = events.id AND o.role = '{role}' AND o.entity_id IN ({}))",
557        placeholders.join(",")
558    ));
559}
560
561fn reject_missing_event_filter_schema(
562    conn: &rusqlite::Connection,
563    filter: &EventFilter,
564) -> Result<(), rusqlite::Error> {
565    if filter.session_id.is_some() && !has_column(conn, "events", "session_id")? {
566        return Err(schema_absent("events.session_id"));
567    }
568    if (!filter.observed.is_empty() || !filter.selected.is_empty())
569        && !has_table(conn, "event_observations")?
570    {
571        return Err(schema_absent("event_observations"));
572    }
573    if filter.payload_proposal_id.is_some() && !has_column(conn, "events", "payload")? {
574        return Err(schema_absent("events.payload"));
575    }
576    Ok(())
577}
578
579fn has_table(conn: &rusqlite::Connection, table: &'static str) -> Result<bool, rusqlite::Error> {
580    conn.query_row(
581        "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type = 'table' AND name = ?1",
582        [table],
583        |row| row.get(0),
584    )
585}
586
587fn has_column(
588    conn: &rusqlite::Connection,
589    table: &'static str,
590    column: &'static str,
591) -> Result<bool, rusqlite::Error> {
592    conn.query_row(
593        "SELECT COUNT(*) > 0 FROM pragma_table_info(?1) WHERE name = ?2",
594        rusqlite::params![table, column],
595        |row| row.get(0),
596    )
597}
598
599fn schema_absent(name: &'static str) -> rusqlite::Error {
600    rusqlite::Error::ToSqlConversionFailure(
601        format!("event filter requires missing schema element {name}; run migrations").into(),
602    )
603}
604
605// =============================================================================
606// EventStore implementation
607// =============================================================================
608
609#[async_trait]
610impl EventStore for SqlEventStore {
611    async fn append_event(&self, event: Event) -> Result<(), StorageError> {
612        self.with_writer("append_event", move |conn| {
613            conn.execute_batch("BEGIN IMMEDIATE")?;
614            if let Err(e) = insert_event_with_observations(conn, &event) {
615                let _ = conn.execute_batch("ROLLBACK");
616                return Err(e);
617            }
618            conn.execute_batch("COMMIT")?;
619            Ok(())
620        })
621        .await
622    }
623
624    async fn append_events(&self, events: Vec<Event>) -> Result<BatchWriteSummary, StorageError> {
625        let attempted = events.len() as u64;
626
627        self.with_writer("append_events", move |conn| {
628            conn.execute_batch("BEGIN IMMEDIATE")?;
629            let mut affected = 0u64;
630
631            for event in &events {
632                if let Err(e) = insert_event_with_observations(conn, event) {
633                    let _ = conn.execute_batch("ROLLBACK");
634                    return Err(e);
635                }
636                affected += 1;
637            }
638
639            conn.execute_batch("COMMIT")?;
640            Ok(BatchWriteSummary {
641                attempted,
642                affected,
643                failed: 0,
644                first_error: String::new(),
645            })
646        })
647        .await
648    }
649
650    async fn get_event(&self, id: Uuid) -> Result<Option<Event>, StorageError> {
651        let namespace = self.namespace.clone();
652        let id_str = id.to_string();
653
654        self.with_reader("get_event", move |conn| {
655            let mut stmt = conn.prepare(
656                "SELECT id, namespace, verb, substrate, actor, kind, outcome, payload, \
657                        payload_schema_version, profile_state_version, duration_us, target_id, \
658                        session_id, aggregate_kind, aggregate_id, created_at \
659                 FROM events WHERE namespace = ?1 AND id = ?2",
660            )?;
661            let mut rows = stmt.query(rusqlite::params![namespace, id_str])?;
662            match rows.next()? {
663                Some(row) => Ok(Some(read_event(row)?)),
664                None => Ok(None),
665            }
666        })
667        .await
668    }
669
670    async fn query_events(
671        &self,
672        filter: EventFilter,
673        page: PageRequest,
674    ) -> Result<Page<Event>, StorageError> {
675        let namespace = self.namespace.clone();
676
677        self.with_reader("query_events", move |conn| {
678            let (where_clause, filter_params) = build_event_filter_sql(conn, &namespace, &filter)?;
679
680            let count_sql = format!("SELECT COUNT(*) FROM events{}", where_clause);
681            let total: i64 = {
682                let mut stmt = conn.prepare(&count_sql)?;
683                let param_refs: Vec<&dyn rusqlite::types::ToSql> =
684                    filter_params.iter().map(|p| p.as_ref()).collect();
685                stmt.query_row(param_refs.as_slice(), |row| row.get(0))?
686            };
687
688            let (_, data_filter_params) = build_event_filter_sql(conn, &namespace, &filter)?;
689            let mut all_params: Vec<Box<dyn rusqlite::types::ToSql>> = data_filter_params;
690            all_params.push(Box::new(page.limit as i64));
691            all_params.push(Box::new(page.offset as i64));
692
693            let limit_idx = all_params.len() - 1;
694            let offset_idx = all_params.len();
695
696            let data_sql = format!(
697                "SELECT id, namespace, verb, substrate, actor, kind, outcome, payload, \
698                        payload_schema_version, profile_state_version, duration_us, target_id, \
699                        session_id, aggregate_kind, aggregate_id, created_at \
700                 FROM events{} ORDER BY created_at DESC, id DESC LIMIT ?{} OFFSET ?{}",
701                where_clause, limit_idx, offset_idx,
702            );
703
704            let mut stmt = conn.prepare(&data_sql)?;
705            let param_refs: Vec<&dyn rusqlite::types::ToSql> =
706                all_params.iter().map(|p| p.as_ref()).collect();
707            let rows = stmt.query_map(param_refs.as_slice(), read_event)?;
708
709            let mut items = Vec::new();
710            for row in rows {
711                items.push(row?);
712            }
713
714            Ok(Page {
715                items,
716                total: Some(total as u64),
717            })
718        })
719        .await
720    }
721
722    async fn count_events(&self, filter: EventFilter) -> Result<u64, StorageError> {
723        let namespace = self.namespace.clone();
724
725        self.with_reader("count_events", move |conn| {
726            let (where_clause, params) = build_event_filter_sql(conn, &namespace, &filter)?;
727            let sql = format!("SELECT COUNT(*) FROM events{}", where_clause);
728            let mut stmt = conn.prepare(&sql)?;
729            let param_refs: Vec<&dyn rusqlite::types::ToSql> =
730                params.iter().map(|p| p.as_ref()).collect();
731            let count: i64 = stmt.query_row(param_refs.as_slice(), |row| row.get(0))?;
732            Ok(count as u64)
733        })
734        .await
735    }
736}
737
738// =============================================================================
739// DDL
740// =============================================================================
741
742const EVENTS_DDL: &str = "\
743    CREATE TABLE IF NOT EXISTS events (\
744        id TEXT PRIMARY KEY,\
745        namespace TEXT NOT NULL,\
746        verb TEXT NOT NULL,\
747        substrate TEXT NOT NULL,\
748        actor TEXT NOT NULL,\
749        kind TEXT NOT NULL DEFAULT 'audit',\
750        outcome TEXT NOT NULL,\
751        payload TEXT NOT NULL DEFAULT '{}',\
752        payload_schema_version INTEGER NOT NULL DEFAULT 1,\
753        profile_state_version INTEGER,\
754        duration_us INTEGER NOT NULL DEFAULT 0,\
755        target_id TEXT,\
756        session_id TEXT,\
757        aggregate_kind TEXT,\
758        aggregate_id TEXT,\
759        created_at INTEGER NOT NULL\
760    );\
761    CREATE TABLE IF NOT EXISTS event_observations (\
762        event_id TEXT NOT NULL,\
763        entity_id TEXT NOT NULL,\
764        referent_kind TEXT NOT NULL,\
765        role TEXT NOT NULL,\
766        position INTEGER NOT NULL,\
767        PRIMARY KEY (event_id, role, position)\
768    );\
769    CREATE INDEX IF NOT EXISTS idx_events_namespace ON events(namespace);\
770    CREATE INDEX IF NOT EXISTS idx_events_verb ON events(verb);\
771    CREATE INDEX IF NOT EXISTS idx_events_kind ON events(kind);\
772    CREATE INDEX IF NOT EXISTS idx_events_substrate ON events(substrate);\
773    CREATE INDEX IF NOT EXISTS idx_events_created ON events(created_at DESC);\
774    CREATE INDEX IF NOT EXISTS idx_events_ns_created_id ON events(namespace, created_at DESC, id DESC);\
775    CREATE INDEX IF NOT EXISTS idx_events_session ON events(namespace, session_id, created_at, id);\
776    CREATE INDEX IF NOT EXISTS idx_events_payload_proposal_id ON events(json_extract(payload, '$.proposal_id'));\
777    CREATE INDEX IF NOT EXISTS idx_event_obs_entity ON event_observations(entity_id, role);\
778    CREATE INDEX IF NOT EXISTS idx_event_obs_event_role ON event_observations(event_id, role);\
779";
780
781pub(crate) fn ensure_events_schema(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
782    conn.execute_batch(EVENTS_DDL)
783}
784
785#[cfg(test)]
786mod tests {
787    use super::*;
788    use crate::pool::PoolConfig;
789    use serde_json::json;
790
791    fn setup_memory_store() -> SqlEventStore {
792        let config = PoolConfig {
793            path: None,
794            ..PoolConfig::default()
795        };
796        let pool = Arc::new(ConnectionPool::new(config).unwrap());
797
798        {
799            let writer = pool.writer().unwrap();
800            writer.conn().execute_batch(EVENTS_DDL).unwrap();
801        }
802
803        SqlEventStore::new_scoped(pool, false, "default")
804    }
805
806    fn make_event(namespace: &str) -> Event {
807        Event::new(
808            namespace,
809            "search",
810            EventKind::SearchExecuted,
811            SubstrateKind::Note,
812            "agent:test",
813        )
814    }
815
816    #[tokio::test]
817    async fn test_append_and_get_event() {
818        let store = setup_memory_store();
819
820        let event = make_event("default");
821        let id = event.id;
822
823        store.append_event(event).await.unwrap();
824
825        let fetched = store.get_event(id).await.unwrap();
826        assert!(fetched.is_some());
827        let fetched = fetched.unwrap();
828        assert_eq!(fetched.id, id);
829        assert_eq!(fetched.verb, "search");
830        assert_eq!(fetched.substrate, SubstrateKind::Note);
831        assert_eq!(fetched.actor, "agent:test");
832        assert_eq!(fetched.outcome, EventOutcome::Success);
833    }
834
835    #[tokio::test]
836    async fn test_append_events_batch() {
837        let store = setup_memory_store();
838
839        let events: Vec<Event> = (0..3).map(|_| make_event("default")).collect();
840        let summary = store.append_events(events).await.unwrap();
841        assert_eq!(summary.attempted, 3);
842        assert_eq!(summary.affected, 3);
843        assert_eq!(summary.failed, 0);
844    }
845
846    #[tokio::test]
847    async fn test_count_events() {
848        let store = setup_memory_store();
849
850        for _ in 0..3 {
851            store.append_event(make_event("default")).await.unwrap();
852        }
853
854        let count = store.count_events(EventFilter::default()).await.unwrap();
855        assert_eq!(count, 3);
856    }
857
858    #[tokio::test]
859    async fn test_query_events_filter_by_verb() {
860        let store = setup_memory_store();
861
862        store.append_event(make_event("default")).await.unwrap();
863
864        let mut create_event = make_event("default");
865        create_event.verb = "create".to_string();
866        store.append_event(create_event).await.unwrap();
867
868        let filter = EventFilter {
869            verbs: vec!["search".to_string()],
870            ..EventFilter::default()
871        };
872        let page = store
873            .query_events(
874                filter,
875                PageRequest {
876                    limit: 10,
877                    offset: 0,
878                },
879            )
880            .await
881            .unwrap();
882        assert_eq!(page.items.len(), 1);
883        assert_eq!(page.items[0].verb, "search");
884    }
885
886    #[tokio::test]
887    async fn test_query_events_filter_by_substrate() {
888        let store = setup_memory_store();
889
890        store.append_event(make_event("default")).await.unwrap();
891
892        let mut entity_event = make_event("default");
893        entity_event.substrate = SubstrateKind::Entity;
894        store.append_event(entity_event).await.unwrap();
895
896        let filter = EventFilter {
897            substrates: vec![SubstrateKind::Entity],
898            ..EventFilter::default()
899        };
900        let page = store
901            .query_events(
902                filter,
903                PageRequest {
904                    limit: 10,
905                    offset: 0,
906                },
907            )
908            .await
909            .unwrap();
910        assert_eq!(page.items.len(), 1);
911        assert_eq!(page.items[0].substrate, SubstrateKind::Entity);
912    }
913
914    #[tokio::test]
915    async fn test_outcome_roundtrip() {
916        let store = setup_memory_store();
917
918        let mut denied = make_event("default");
919        denied.outcome = EventOutcome::Denied;
920        let denied_id = denied.id;
921        store.append_event(denied).await.unwrap();
922
923        let fetched = store.get_event(denied_id).await.unwrap().unwrap();
924        assert_eq!(fetched.outcome, EventOutcome::Denied);
925    }
926
927    #[tokio::test]
928    async fn append_event_writes_observations_atomically() {
929        let store = setup_memory_store();
930        let candidate = Uuid::new_v4();
931        let selected = Uuid::new_v4();
932        let mut event = make_event("default");
933        event.kind = EventKind::RerankExecuted;
934        event.payload = json!({
935            "candidates": [candidate.to_string()],
936            "selected": [selected.to_string()],
937            "served_by_profile_id": "profile-a"
938        });
939        let event_id = event.id;
940
941        store.append_event(event).await.unwrap();
942
943        // Verify event was inserted.
944        let fetched = store.get_event(event_id).await.unwrap();
945        assert!(fetched.is_some());
946
947        // Verify observations were written.
948        let pool = Arc::clone(&store.pool);
949        let event_id_str = event_id.to_string();
950        let (candidate_count, selected_count) = tokio::task::spawn_blocking(move || {
951            let guard = pool.reader().unwrap();
952            let conn = guard.conn();
953            let c: i64 = conn
954                .query_row(
955                    "SELECT COUNT(*) FROM event_observations WHERE event_id = ?1 AND role = 'candidate'",
956                    [&event_id_str],
957                    |r| r.get(0),
958                )
959                .unwrap();
960            let s: i64 = conn
961                .query_row(
962                    "SELECT COUNT(*) FROM event_observations WHERE event_id = ?1 AND role = 'selected'",
963                    [&event_id_str],
964                    |r| r.get(0),
965                )
966                .unwrap();
967            (c, s)
968        })
969        .await
970        .unwrap();
971
972        assert_eq!(candidate_count, 1, "expected one candidate observation row");
973        assert_eq!(selected_count, 1, "expected one selected observation row");
974    }
975
976    #[tokio::test]
977    async fn invalid_projection_payload_aborts_event_insert() {
978        let store = setup_memory_store();
979        let mut event = make_event("default");
980        event.kind = EventKind::RerankExecuted;
981        // "candidates" must be an array of UUID strings, not a plain string.
982        event.payload = json!({ "candidates": "not-array" });
983        let event_id = event.id;
984
985        let result = store.append_event(event).await;
986        assert!(result.is_err(), "invalid payload must return Err");
987
988        // The event row must not exist — transaction was rolled back.
989        let fetched = store.get_event(event_id).await.unwrap();
990        assert!(fetched.is_none(), "event row must not exist after rollback");
991    }
992
993    #[tokio::test]
994    async fn query_events_orders_by_created_at_then_id_desc() {
995        let store = setup_memory_store();
996
997        let ts = chrono::Utc::now().timestamp_micros();
998        let id_low = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap();
999        let id_high = Uuid::parse_str("ffffffff-ffff-ffff-ffff-ffffffffffff").unwrap();
1000
1001        // Insert both events with identical created_at via direct SQL to bypass UUID generation.
1002        let pool = Arc::clone(&store.pool);
1003        tokio::task::spawn_blocking(move || {
1004            let guard = pool.try_writer().unwrap();
1005            let conn = guard.conn();
1006            conn.execute_batch("BEGIN IMMEDIATE").unwrap();
1007            for id in [id_low, id_high] {
1008                conn.execute(
1009                    "INSERT INTO events \
1010                     (id, namespace, verb, substrate, actor, kind, outcome, payload, \
1011                      payload_schema_version, duration_us, created_at) \
1012                     VALUES (?1, 'default', 'search', 'note', 'test', 'audit', 'success', '{}', 1, 0, ?2)",
1013                    rusqlite::params![id.to_string(), ts],
1014                )
1015                .unwrap();
1016            }
1017            conn.execute_batch("COMMIT").unwrap();
1018        })
1019        .await
1020        .unwrap();
1021
1022        let page = store
1023            .query_events(
1024                EventFilter::default(),
1025                PageRequest {
1026                    limit: 10,
1027                    offset: 0,
1028                },
1029            )
1030            .await
1031            .unwrap();
1032
1033        assert_eq!(page.items.len(), 2);
1034        assert_eq!(
1035            page.items[0].id, id_high,
1036            "higher UUID must come first (id DESC tiebreaker)"
1037        );
1038        assert_eq!(page.items[1].id, id_low);
1039    }
1040
1041    #[tokio::test]
1042    async fn query_events_filters_by_kind() {
1043        let store = setup_memory_store();
1044        store.append_event(make_event("default")).await.unwrap();
1045        let mut recall_event = make_event("default");
1046        recall_event.kind = EventKind::RecallExecuted;
1047        store.append_event(recall_event).await.unwrap();
1048
1049        let filter = EventFilter {
1050            kinds: vec![EventKind::RecallExecuted],
1051            ..EventFilter::default()
1052        };
1053        let page = store
1054            .query_events(
1055                filter,
1056                PageRequest {
1057                    limit: 10,
1058                    offset: 0,
1059                },
1060            )
1061            .await
1062            .unwrap();
1063        assert_eq!(page.items.len(), 1);
1064        assert_eq!(page.items[0].kind, EventKind::RecallExecuted);
1065    }
1066
1067    #[tokio::test]
1068    async fn query_events_filters_by_session_id() {
1069        let store = setup_memory_store();
1070        let session = Uuid::new_v4();
1071        let mut event = make_event("default");
1072        event.session_id = Some(session);
1073        store.append_event(event).await.unwrap();
1074        store.append_event(make_event("default")).await.unwrap();
1075
1076        let filter = EventFilter {
1077            session_id: Some(session),
1078            ..EventFilter::default()
1079        };
1080        let page = store
1081            .query_events(
1082                filter,
1083                PageRequest {
1084                    limit: 10,
1085                    offset: 0,
1086                },
1087            )
1088            .await
1089            .unwrap();
1090        assert_eq!(page.items.len(), 1);
1091        assert_eq!(page.items[0].session_id, Some(session));
1092    }
1093
1094    #[tokio::test]
1095    async fn query_events_filters_by_observed() {
1096        let store = setup_memory_store();
1097        let entity_id = Uuid::new_v4();
1098        let mut event = make_event("default");
1099        event.kind = EventKind::RerankExecuted;
1100        event.payload = json!({
1101            "candidates": [entity_id.to_string()],
1102            "selected": []
1103        });
1104        store.append_event(event).await.unwrap();
1105        store.append_event(make_event("default")).await.unwrap();
1106
1107        let filter = EventFilter {
1108            observed: vec![entity_id],
1109            ..EventFilter::default()
1110        };
1111        let page = store
1112            .query_events(
1113                filter,
1114                PageRequest {
1115                    limit: 10,
1116                    offset: 0,
1117                },
1118            )
1119            .await
1120            .unwrap();
1121        assert_eq!(page.items.len(), 1);
1122    }
1123
1124    #[tokio::test]
1125    async fn query_events_filters_by_selected() {
1126        let store = setup_memory_store();
1127        let entity_id = Uuid::new_v4();
1128        let mut event = make_event("default");
1129        event.kind = EventKind::RerankExecuted;
1130        event.payload = json!({
1131            "candidates": [],
1132            "selected": [entity_id.to_string()]
1133        });
1134        store.append_event(event).await.unwrap();
1135        store.append_event(make_event("default")).await.unwrap();
1136
1137        let filter = EventFilter {
1138            selected: vec![entity_id],
1139            ..EventFilter::default()
1140        };
1141        let page = store
1142            .query_events(
1143                filter,
1144                PageRequest {
1145                    limit: 10,
1146                    offset: 0,
1147                },
1148            )
1149            .await
1150            .unwrap();
1151        assert_eq!(page.items.len(), 1);
1152    }
1153
1154    #[tokio::test]
1155    async fn query_events_filters_by_payload_proposal_id() {
1156        let store = setup_memory_store();
1157        let proposal_id = Uuid::new_v4();
1158        let mut event = make_event("default");
1159        event.kind = EventKind::ProposalCreated;
1160        event.payload = json!({ "proposal_id": proposal_id.to_string() });
1161        store.append_event(event).await.unwrap();
1162        store.append_event(make_event("default")).await.unwrap();
1163
1164        let filter = EventFilter {
1165            payload_proposal_id: Some(proposal_id),
1166            ..EventFilter::default()
1167        };
1168        let page = store
1169            .query_events(
1170                filter,
1171                PageRequest {
1172                    limit: 10,
1173                    offset: 0,
1174                },
1175            )
1176            .await
1177            .unwrap();
1178        assert_eq!(page.items.len(), 1);
1179    }
1180
1181    #[tokio::test]
1182    async fn query_events_observed_filter_missing_projection_returns_clean_error() {
1183        // Set up a legacy-schema store (no event_observations table).
1184        let config = PoolConfig {
1185            path: None,
1186            ..PoolConfig::default()
1187        };
1188        let pool = Arc::new(ConnectionPool::new(config).unwrap());
1189        {
1190            let writer = pool.writer().unwrap();
1191            // Create only the events table, without event_observations.
1192            writer.conn().execute_batch(
1193                "CREATE TABLE IF NOT EXISTS events (\
1194                     id TEXT PRIMARY KEY, namespace TEXT NOT NULL, verb TEXT NOT NULL,\
1195                     substrate TEXT NOT NULL, actor TEXT NOT NULL, kind TEXT NOT NULL DEFAULT 'audit',\
1196                     outcome TEXT NOT NULL, payload TEXT NOT NULL DEFAULT '{}',\
1197                     payload_schema_version INTEGER NOT NULL DEFAULT 1,\
1198                     duration_us INTEGER NOT NULL DEFAULT 0, created_at INTEGER NOT NULL\
1199                 );"
1200            ).unwrap();
1201        }
1202        let store = SqlEventStore::new_scoped(pool, false, "default");
1203
1204        let filter = EventFilter {
1205            observed: vec![Uuid::new_v4()],
1206            ..EventFilter::default()
1207        };
1208        let result = store
1209            .query_events(
1210                filter,
1211                PageRequest {
1212                    limit: 10,
1213                    offset: 0,
1214                },
1215            )
1216            .await;
1217        assert!(result.is_err());
1218        let err_msg = result.unwrap_err().to_string();
1219        assert!(
1220            err_msg.contains("event_observations") && err_msg.contains("run migrations"),
1221            "error should mention event_observations and run migrations, got: {err_msg}"
1222        );
1223    }
1224}