Skip to main content

khive_db/stores/
event.rs

1//! SQL-backed `EventStore` implementation.
2//!
3//! FILE SIZE JUSTIFICATION: Event store covers append, query-by-filter,
4//! observation recording, and paginated listing with shared row-mapping and
5//! timestamp serialization helpers. The event schema has complex JSON data
6//! columns (observations, referent kinds, outcomes) whose parsing is shared
7//! across all read paths, making a split impractical without duplicating the
8//! deserialization logic.
9
10use std::sync::Arc;
11
12use async_trait::async_trait;
13use uuid::Uuid;
14
15use khive_storage::error::StorageError;
16use khive_storage::event::{Event, EventFilter, EventObservation, ObservationRole, ReferentKind};
17use khive_storage::types::{BatchWriteSummary, Page, PageRequest};
18use khive_storage::EventStore;
19use khive_storage::StorageCapability;
20use khive_types::{EventKind, EventOutcome, SubstrateKind};
21
22use crate::error::SqliteError;
23use crate::pool::ConnectionPool;
24
25fn map_err(e: rusqlite::Error, op: &'static str) -> StorageError {
26    StorageError::driver(StorageCapability::Events, op, e)
27}
28
29fn map_sqlite_err(e: SqliteError, op: &'static str) -> StorageError {
30    StorageError::driver(StorageCapability::Events, op, e)
31}
32
33/// An EventStore backed by SQLite tables.
34pub struct SqlEventStore {
35    pool: Arc<ConnectionPool>,
36    is_file_backed: bool,
37    namespace: String,
38}
39
40impl SqlEventStore {
41    /// Create a new store scoped to one namespace.
42    pub fn new_scoped(
43        pool: Arc<ConnectionPool>,
44        is_file_backed: bool,
45        namespace: impl Into<String>,
46    ) -> Self {
47        Self {
48            pool,
49            is_file_backed,
50            namespace: namespace.into(),
51        }
52    }
53
54    fn open_standalone_writer(&self) -> Result<rusqlite::Connection, StorageError> {
55        let config = self.pool.config();
56        let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
57            operation: "event_writer".into(),
58            message: "in-memory databases do not support standalone connections".into(),
59        })?;
60
61        let conn = rusqlite::Connection::open_with_flags(
62            path,
63            rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE
64                | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
65                | rusqlite::OpenFlags::SQLITE_OPEN_URI,
66        )
67        .map_err(|e| map_err(e, "open_event_writer"))?;
68
69        conn.busy_timeout(config.busy_timeout)
70            .map_err(|e| map_err(e, "open_event_writer"))?;
71        conn.pragma_update(None, "foreign_keys", "ON")
72            .map_err(|e| map_err(e, "open_event_writer"))?;
73        conn.pragma_update(None, "synchronous", "NORMAL")
74            .map_err(|e| map_err(e, "open_event_writer"))?;
75
76        Ok(conn)
77    }
78
79    fn open_standalone_reader(&self) -> Result<rusqlite::Connection, StorageError> {
80        let config = self.pool.config();
81        let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
82            operation: "event_reader".into(),
83            message: "in-memory databases do not support standalone connections".into(),
84        })?;
85
86        let conn = rusqlite::Connection::open_with_flags(
87            path,
88            rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
89                | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
90                | rusqlite::OpenFlags::SQLITE_OPEN_URI,
91        )
92        .map_err(|e| map_err(e, "open_event_reader"))?;
93
94        conn.busy_timeout(config.busy_timeout)
95            .map_err(|e| map_err(e, "open_event_reader"))?;
96        conn.pragma_update(None, "foreign_keys", "ON")
97            .map_err(|e| map_err(e, "open_event_reader"))?;
98        conn.pragma_update(None, "synchronous", "NORMAL")
99            .map_err(|e| map_err(e, "open_event_reader"))?;
100
101        Ok(conn)
102    }
103
104    async fn with_writer<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
105    where
106        F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
107        R: Send + 'static,
108    {
109        if self.is_file_backed {
110            let conn = self.open_standalone_writer()?;
111            tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
112                .await
113                .map_err(|e| StorageError::driver(StorageCapability::Events, op, e))?
114        } else {
115            let pool = Arc::clone(&self.pool);
116            tokio::task::spawn_blocking(move || {
117                let guard = pool.try_writer().map_err(|e| map_sqlite_err(e, op))?;
118                f(guard.conn()).map_err(|e| map_err(e, op))
119            })
120            .await
121            .map_err(|e| StorageError::driver(StorageCapability::Events, op, e))?
122        }
123    }
124
125    async fn with_reader<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
126    where
127        F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
128        R: Send + 'static,
129    {
130        if self.is_file_backed {
131            let conn = self.open_standalone_reader()?;
132            tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
133                .await
134                .map_err(|e| StorageError::driver(StorageCapability::Events, op, e))?
135        } else {
136            let pool = Arc::clone(&self.pool);
137            tokio::task::spawn_blocking(move || {
138                let guard = pool.reader().map_err(|e| map_sqlite_err(e, op))?;
139                f(guard.conn()).map_err(|e| map_err(e, op))
140            })
141            .await
142            .map_err(|e| StorageError::driver(StorageCapability::Events, op, e))?
143        }
144    }
145}
146
147// =============================================================================
148// Helpers: parse SubstrateKind / EventOutcome / EventKind from DB strings
149// =============================================================================
150
151fn substrate_from_str(s: &str) -> Result<SubstrateKind, rusqlite::Error> {
152    s.parse::<SubstrateKind>().map_err(|_| {
153        rusqlite::Error::FromSqlConversionFailure(
154            0,
155            rusqlite::types::Type::Text,
156            format!("unknown SubstrateKind: {s}").into(),
157        )
158    })
159}
160
161fn outcome_from_str(s: &str) -> Result<EventOutcome, rusqlite::Error> {
162    match s {
163        "success" => Ok(EventOutcome::Success),
164        "denied" => Ok(EventOutcome::Denied),
165        "error" => Ok(EventOutcome::Error),
166        other => Err(rusqlite::Error::FromSqlConversionFailure(
167            0,
168            rusqlite::types::Type::Text,
169            format!("unknown EventOutcome: {other}").into(),
170        )),
171    }
172}
173
174fn kind_from_str(s: &str) -> Result<EventKind, rusqlite::Error> {
175    s.parse::<EventKind>().map_err(|_| {
176        rusqlite::Error::FromSqlConversionFailure(
177            0,
178            rusqlite::types::Type::Text,
179            format!("unknown EventKind: {s}").into(),
180        )
181    })
182}
183
184fn parse_uuid(s: &str) -> Result<Uuid, rusqlite::Error> {
185    Uuid::parse_str(s).map_err(|e| {
186        rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e))
187    })
188}
189
190// Column order: id(0), namespace(1), verb(2), substrate(3), actor(4),
191//               kind(5), outcome(6), payload(7), payload_schema_version(8),
192//               profile_state_version(9), duration_us(10), target_id(11),
193//               session_id(12), aggregate_kind(13), aggregate_id(14), created_at(15)
194fn read_event(row: &rusqlite::Row<'_>) -> Result<Event, rusqlite::Error> {
195    let id_str: String = row.get(0)?;
196    let namespace: String = row.get(1)?;
197    let verb: String = row.get(2)?;
198    let substrate_str: String = row.get(3)?;
199    let actor: String = row.get(4)?;
200    let kind_str: String = row.get(5)?;
201    let outcome_str: String = row.get(6)?;
202    let payload_str: String = row.get(7)?;
203    let payload_schema_version: i64 = row.get(8)?;
204    let profile_state_version: Option<i64> = row.get(9)?;
205    let duration_us: i64 = row.get(10)?;
206    let target_str: Option<String> = row.get(11)?;
207    let session_str: Option<String> = row.get(12)?;
208    let aggregate_kind: Option<String> = row.get(13)?;
209    let aggregate_str: Option<String> = row.get(14)?;
210    let created_at: i64 = row.get(15)?;
211
212    let id = parse_uuid(&id_str)?;
213    let substrate = substrate_from_str(&substrate_str)?;
214    let kind = kind_from_str(&kind_str)?;
215    let outcome = outcome_from_str(&outcome_str)?;
216    let payload: serde_json::Value = serde_json::from_str(&payload_str).map_err(|e| {
217        rusqlite::Error::FromSqlConversionFailure(7, rusqlite::types::Type::Text, Box::new(e))
218    })?;
219    let target_id = target_str.as_deref().map(parse_uuid).transpose()?;
220    let session_id = session_str.as_deref().map(parse_uuid).transpose()?;
221    let aggregate_id = aggregate_str.as_deref().map(parse_uuid).transpose()?;
222
223    Ok(Event {
224        id,
225        namespace,
226        verb,
227        substrate,
228        actor,
229        kind,
230        outcome,
231        payload,
232        payload_schema_version: payload_schema_version as u32,
233        profile_state_version: profile_state_version.map(|v| v as u64),
234        duration_us,
235        target_id,
236        session_id,
237        aggregate_kind,
238        aggregate_id,
239        created_at,
240    })
241}
242
243// =============================================================================
244// Helpers: observation projection write path
245// =============================================================================
246
247fn insert_event_with_observations(
248    conn: &rusqlite::Connection,
249    event: &Event,
250) -> Result<(), rusqlite::Error> {
251    let id_str = event.id.to_string();
252    let substrate_str = event.substrate.name().to_string();
253    let kind_str = event.kind.name().to_string();
254    let outcome_str = event.outcome.name().to_string();
255    let payload_str = event.payload.to_string();
256    let target_str = event.target_id.map(|u| u.to_string());
257    let session_str = event.session_id.map(|u| u.to_string());
258    let aggregate_str = event.aggregate_id.map(|u| u.to_string());
259    let profile_state_version = event.profile_state_version.map(|v| v as i64);
260
261    conn.execute(
262        "INSERT INTO events \
263         (id, namespace, verb, substrate, actor, kind, outcome, payload, payload_schema_version, \
264          profile_state_version, duration_us, target_id, session_id, aggregate_kind, aggregate_id, created_at) \
265         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
266        rusqlite::params![
267            id_str,
268            &event.namespace,
269            &event.verb,
270            substrate_str,
271            &event.actor,
272            kind_str,
273            outcome_str,
274            payload_str,
275            event.payload_schema_version as i64,
276            profile_state_version,
277            event.duration_us,
278            target_str,
279            session_str,
280            &event.aggregate_kind,
281            aggregate_str,
282            event.created_at,
283        ],
284    )?;
285
286    for observation in decode_event_observations(event)? {
287        conn.execute(
288            "INSERT INTO event_observations \
289             (event_id, entity_id, referent_kind, role, position) \
290             VALUES (?1, ?2, ?3, ?4, ?5)",
291            rusqlite::params![
292                observation.event_id.to_string(),
293                observation.entity_id.to_string(),
294                observation.referent_kind.name(),
295                observation.role.name(),
296                observation.position as i64,
297            ],
298        )?;
299    }
300
301    Ok(())
302}
303
304fn decode_event_observations(event: &Event) -> Result<Vec<EventObservation>, rusqlite::Error> {
305    match event.kind {
306        EventKind::RerankExecuted => decode_rank_observations(event),
307        EventKind::RecallExecuted | EventKind::SearchExecuted => decode_rank_observations(event),
308        EventKind::LinkCreated => decode_link_observations(event),
309        EventKind::EntityCreated
310        | EventKind::EntityUpdated
311        | EventKind::EntityDeleted
312        | EventKind::NoteCreated
313        | EventKind::NoteUpdated
314        | EventKind::NoteDeleted
315        | EventKind::TaskTransitioned => decode_target_observation(event),
316        EventKind::FeedbackExplicit => decode_signal_observation(event),
317        _ => Ok(Vec::new()),
318    }
319}
320
321fn payload_uuid_array(event: &Event, field: &'static str) -> Result<Vec<Uuid>, rusqlite::Error> {
322    let Some(values) = event.payload.get(field) else {
323        return Ok(Vec::new());
324    };
325    let Some(array) = values.as_array() else {
326        return Err(invalid_payload(event.kind, field, "expected array"));
327    };
328
329    array
330        .iter()
331        .map(|value| {
332            value
333                .as_str()
334                .ok_or_else(|| invalid_payload(event.kind, field, "expected UUID string"))
335                .and_then(|s| Uuid::parse_str(s).map_err(|e| invalid_payload(event.kind, field, e)))
336        })
337        .collect()
338}
339
340fn payload_uuid(event: &Event, field: &'static str) -> Result<Option<Uuid>, rusqlite::Error> {
341    let Some(value) = event.payload.get(field) else {
342        return Ok(None);
343    };
344    let Some(s) = value.as_str() else {
345        return Err(invalid_payload(event.kind, field, "expected UUID string"));
346    };
347    Uuid::parse_str(s)
348        .map(Some)
349        .map_err(|e| invalid_payload(event.kind, field, e))
350}
351
352fn decode_rank_observations(event: &Event) -> Result<Vec<EventObservation>, rusqlite::Error> {
353    let mut rows = Vec::new();
354
355    for (position, entity_id) in payload_uuid_array(event, "candidates")?
356        .into_iter()
357        .enumerate()
358    {
359        rows.push(EventObservation {
360            event_id: event.id,
361            entity_id,
362            referent_kind: ReferentKind::Note,
363            role: ObservationRole::Candidate,
364            position: position as u32,
365        });
366    }
367
368    let selected = payload_uuid_array(event, "selected")
369        .or_else(|_| payload_uuid_array(event, "reranked"))
370        .or_else(|_| payload_uuid_array(event, "final_scores"))?;
371    for (position, entity_id) in selected.into_iter().enumerate() {
372        rows.push(EventObservation {
373            event_id: event.id,
374            entity_id,
375            referent_kind: ReferentKind::Note,
376            role: ObservationRole::Selected,
377            position: position as u32,
378        });
379    }
380
381    Ok(rows)
382}
383
384fn decode_link_observations(event: &Event) -> Result<Vec<EventObservation>, rusqlite::Error> {
385    let mut rows = Vec::new();
386    if let Some(source) = payload_uuid(event, "source_id")? {
387        rows.push(EventObservation {
388            event_id: event.id,
389            entity_id: source,
390            referent_kind: ReferentKind::Entity,
391            role: ObservationRole::Target,
392            position: 0,
393        });
394    }
395    if let Some(target) = payload_uuid(event, "target_id")? {
396        rows.push(EventObservation {
397            event_id: event.id,
398            entity_id: target,
399            referent_kind: ReferentKind::Entity,
400            role: ObservationRole::Target,
401            position: 1,
402        });
403    }
404    Ok(rows)
405}
406
407fn decode_target_observation(event: &Event) -> Result<Vec<EventObservation>, rusqlite::Error> {
408    let Some(entity_id) = event.target_id.or(payload_uuid(event, "target_id")?) else {
409        return Ok(Vec::new());
410    };
411    Ok(vec![EventObservation {
412        event_id: event.id,
413        entity_id,
414        referent_kind: if event.substrate == SubstrateKind::Note {
415            ReferentKind::Note
416        } else {
417            ReferentKind::Entity
418        },
419        role: ObservationRole::Target,
420        position: 0,
421    }])
422}
423
424fn decode_signal_observation(event: &Event) -> Result<Vec<EventObservation>, rusqlite::Error> {
425    let Some(entity_id) = payload_uuid(event, "about_id")? else {
426        return Ok(Vec::new());
427    };
428    Ok(vec![EventObservation {
429        event_id: event.id,
430        entity_id,
431        referent_kind: ReferentKind::Entity,
432        role: ObservationRole::Signal,
433        position: 0,
434    }])
435}
436
437fn invalid_payload(
438    kind: EventKind,
439    field: &'static str,
440    reason: impl std::fmt::Display,
441) -> rusqlite::Error {
442    rusqlite::Error::ToSqlConversionFailure(
443        format!("invalid payload for {}.{field}: {reason}", kind.name()).into(),
444    )
445}
446
447// =============================================================================
448// Helpers: filter SQL builder
449// =============================================================================
450
451fn build_event_filter_sql(
452    conn: &rusqlite::Connection,
453    default_namespace: &str,
454    filter: &EventFilter,
455) -> Result<(String, Vec<Box<dyn rusqlite::types::ToSql>>), rusqlite::Error> {
456    reject_missing_event_filter_schema(conn, filter)?;
457
458    let mut conditions: Vec<String> = Vec::new();
459    let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
460
461    params.push(Box::new(default_namespace.to_string()));
462    conditions.push(format!("namespace = ?{}", params.len()));
463
464    push_in_clause(
465        &mut conditions,
466        &mut params,
467        "id",
468        filter.ids.iter().map(Uuid::to_string),
469    );
470    push_in_clause(
471        &mut conditions,
472        &mut params,
473        "kind",
474        filter.kinds.iter().map(|kind| kind.name().to_string()),
475    );
476    push_in_clause(
477        &mut conditions,
478        &mut params,
479        "verb",
480        filter.verbs.iter().cloned(),
481    );
482    push_in_clause(
483        &mut conditions,
484        &mut params,
485        "substrate",
486        filter.substrates.iter().map(|s| s.name().to_string()),
487    );
488    push_in_clause(
489        &mut conditions,
490        &mut params,
491        "actor",
492        filter.actors.iter().cloned(),
493    );
494
495    if let Some(after) = filter.after {
496        params.push(Box::new(after));
497        conditions.push(format!("created_at > ?{}", params.len()));
498    }
499
500    if let Some(before) = filter.before {
501        params.push(Box::new(before));
502        conditions.push(format!("created_at < ?{}", params.len()));
503    }
504
505    if let Some(session_id) = filter.session_id {
506        params.push(Box::new(session_id.to_string()));
507        conditions.push(format!("session_id = ?{}", params.len()));
508    }
509
510    push_observation_exists(&mut conditions, &mut params, "candidate", &filter.observed);
511    push_observation_exists(&mut conditions, &mut params, "selected", &filter.selected);
512
513    if let Some(proposal_id) = filter.payload_proposal_id {
514        params.push(Box::new(proposal_id.to_string()));
515        conditions.push(format!(
516            "json_extract(payload, '$.proposal_id') = ?{}",
517            params.len()
518        ));
519    }
520
521    let clause = format!(" WHERE {}", conditions.join(" AND "));
522    Ok((clause, params))
523}
524
525fn push_in_clause<I>(
526    conditions: &mut Vec<String>,
527    params: &mut Vec<Box<dyn rusqlite::types::ToSql>>,
528    column: &'static str,
529    values: I,
530) where
531    I: IntoIterator<Item = String>,
532{
533    let placeholders: Vec<String> = values
534        .into_iter()
535        .map(|value| {
536            params.push(Box::new(value));
537            format!("?{}", params.len())
538        })
539        .collect();
540    if !placeholders.is_empty() {
541        conditions.push(format!("{column} IN ({})", placeholders.join(",")));
542    }
543}
544
545fn push_observation_exists(
546    conditions: &mut Vec<String>,
547    params: &mut Vec<Box<dyn rusqlite::types::ToSql>>,
548    role: &'static str,
549    entity_ids: &[Uuid],
550) {
551    if entity_ids.is_empty() {
552        return;
553    }
554    let placeholders: Vec<String> = entity_ids
555        .iter()
556        .map(|id| {
557            params.push(Box::new(id.to_string()));
558            format!("?{}", params.len())
559        })
560        .collect();
561    conditions.push(format!(
562        "EXISTS (SELECT 1 FROM event_observations o \
563         WHERE o.event_id = events.id AND o.role = '{role}' AND o.entity_id IN ({}))",
564        placeholders.join(",")
565    ));
566}
567
568fn reject_missing_event_filter_schema(
569    conn: &rusqlite::Connection,
570    filter: &EventFilter,
571) -> Result<(), rusqlite::Error> {
572    if filter.session_id.is_some() && !has_column(conn, "events", "session_id")? {
573        return Err(schema_absent("events.session_id"));
574    }
575    if (!filter.observed.is_empty() || !filter.selected.is_empty())
576        && !has_table(conn, "event_observations")?
577    {
578        return Err(schema_absent("event_observations"));
579    }
580    if filter.payload_proposal_id.is_some() && !has_column(conn, "events", "payload")? {
581        return Err(schema_absent("events.payload"));
582    }
583    Ok(())
584}
585
586fn has_table(conn: &rusqlite::Connection, table: &'static str) -> Result<bool, rusqlite::Error> {
587    conn.query_row(
588        "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type = 'table' AND name = ?1",
589        [table],
590        |row| row.get(0),
591    )
592}
593
594fn has_column(
595    conn: &rusqlite::Connection,
596    table: &'static str,
597    column: &'static str,
598) -> Result<bool, rusqlite::Error> {
599    conn.query_row(
600        "SELECT COUNT(*) > 0 FROM pragma_table_info(?1) WHERE name = ?2",
601        rusqlite::params![table, column],
602        |row| row.get(0),
603    )
604}
605
606fn schema_absent(name: &'static str) -> rusqlite::Error {
607    rusqlite::Error::ToSqlConversionFailure(
608        format!("event filter requires missing schema element {name}; run migrations").into(),
609    )
610}
611
612// =============================================================================
613// EventStore implementation
614// =============================================================================
615
616#[async_trait]
617impl EventStore for SqlEventStore {
618    async fn append_event(&self, event: Event) -> Result<(), StorageError> {
619        self.with_writer("append_event", move |conn| {
620            conn.execute_batch("BEGIN IMMEDIATE")?;
621            if let Err(e) = insert_event_with_observations(conn, &event) {
622                let _ = conn.execute_batch("ROLLBACK");
623                return Err(e);
624            }
625            conn.execute_batch("COMMIT")?;
626            Ok(())
627        })
628        .await
629    }
630
631    async fn append_events(&self, events: Vec<Event>) -> Result<BatchWriteSummary, StorageError> {
632        let attempted = events.len() as u64;
633
634        self.with_writer("append_events", move |conn| {
635            conn.execute_batch("BEGIN IMMEDIATE")?;
636            let mut affected = 0u64;
637
638            for event in &events {
639                if let Err(e) = insert_event_with_observations(conn, event) {
640                    let _ = conn.execute_batch("ROLLBACK");
641                    return Err(e);
642                }
643                affected += 1;
644            }
645
646            conn.execute_batch("COMMIT")?;
647            Ok(BatchWriteSummary {
648                attempted,
649                affected,
650                failed: 0,
651                first_error: String::new(),
652            })
653        })
654        .await
655    }
656
657    async fn get_event(&self, id: Uuid) -> Result<Option<Event>, StorageError> {
658        let namespace = self.namespace.clone();
659        let id_str = id.to_string();
660
661        self.with_reader("get_event", move |conn| {
662            let mut stmt = conn.prepare(
663                "SELECT id, namespace, verb, substrate, actor, kind, outcome, payload, \
664                        payload_schema_version, profile_state_version, duration_us, target_id, \
665                        session_id, aggregate_kind, aggregate_id, created_at \
666                 FROM events WHERE namespace = ?1 AND id = ?2",
667            )?;
668            let mut rows = stmt.query(rusqlite::params![namespace, id_str])?;
669            match rows.next()? {
670                Some(row) => Ok(Some(read_event(row)?)),
671                None => Ok(None),
672            }
673        })
674        .await
675    }
676
677    async fn query_events(
678        &self,
679        filter: EventFilter,
680        page: PageRequest,
681    ) -> Result<Page<Event>, StorageError> {
682        let namespace = self.namespace.clone();
683
684        self.with_reader("query_events", move |conn| {
685            let (where_clause, filter_params) = build_event_filter_sql(conn, &namespace, &filter)?;
686
687            let count_sql = format!("SELECT COUNT(*) FROM events{}", where_clause);
688            let total: i64 = {
689                let mut stmt = conn.prepare(&count_sql)?;
690                let param_refs: Vec<&dyn rusqlite::types::ToSql> =
691                    filter_params.iter().map(|p| p.as_ref()).collect();
692                stmt.query_row(param_refs.as_slice(), |row| row.get(0))?
693            };
694
695            let (_, data_filter_params) = build_event_filter_sql(conn, &namespace, &filter)?;
696            let mut all_params: Vec<Box<dyn rusqlite::types::ToSql>> = data_filter_params;
697            all_params.push(Box::new(page.limit as i64));
698            all_params.push(Box::new(page.offset as i64));
699
700            let limit_idx = all_params.len() - 1;
701            let offset_idx = all_params.len();
702
703            let data_sql = format!(
704                "SELECT id, namespace, verb, substrate, actor, kind, outcome, payload, \
705                        payload_schema_version, profile_state_version, duration_us, target_id, \
706                        session_id, aggregate_kind, aggregate_id, created_at \
707                 FROM events{} ORDER BY created_at DESC, id DESC LIMIT ?{} OFFSET ?{}",
708                where_clause, limit_idx, offset_idx,
709            );
710
711            let mut stmt = conn.prepare(&data_sql)?;
712            let param_refs: Vec<&dyn rusqlite::types::ToSql> =
713                all_params.iter().map(|p| p.as_ref()).collect();
714            let rows = stmt.query_map(param_refs.as_slice(), read_event)?;
715
716            let mut items = Vec::new();
717            for row in rows {
718                items.push(row?);
719            }
720
721            Ok(Page {
722                items,
723                total: Some(total as u64),
724            })
725        })
726        .await
727    }
728
729    async fn count_events(&self, filter: EventFilter) -> Result<u64, StorageError> {
730        let namespace = self.namespace.clone();
731
732        self.with_reader("count_events", move |conn| {
733            let (where_clause, params) = build_event_filter_sql(conn, &namespace, &filter)?;
734            let sql = format!("SELECT COUNT(*) FROM events{}", where_clause);
735            let mut stmt = conn.prepare(&sql)?;
736            let param_refs: Vec<&dyn rusqlite::types::ToSql> =
737                params.iter().map(|p| p.as_ref()).collect();
738            let count: i64 = stmt.query_row(param_refs.as_slice(), |row| row.get(0))?;
739            Ok(count as u64)
740        })
741        .await
742    }
743}
744
745// =============================================================================
746// DDL
747// =============================================================================
748
749const EVENTS_DDL: &str = "\
750    CREATE TABLE IF NOT EXISTS events (\
751        id TEXT PRIMARY KEY,\
752        namespace TEXT NOT NULL,\
753        verb TEXT NOT NULL,\
754        substrate TEXT NOT NULL,\
755        actor TEXT NOT NULL,\
756        kind TEXT NOT NULL DEFAULT 'audit',\
757        outcome TEXT NOT NULL,\
758        payload TEXT NOT NULL DEFAULT '{}',\
759        payload_schema_version INTEGER NOT NULL DEFAULT 1,\
760        profile_state_version INTEGER,\
761        duration_us INTEGER NOT NULL DEFAULT 0,\
762        target_id TEXT,\
763        session_id TEXT,\
764        aggregate_kind TEXT,\
765        aggregate_id TEXT,\
766        created_at INTEGER NOT NULL\
767    );\
768    CREATE TABLE IF NOT EXISTS event_observations (\
769        event_id TEXT NOT NULL,\
770        entity_id TEXT NOT NULL,\
771        referent_kind TEXT NOT NULL,\
772        role TEXT NOT NULL,\
773        position INTEGER NOT NULL,\
774        PRIMARY KEY (event_id, role, position)\
775    );\
776    CREATE INDEX IF NOT EXISTS idx_events_namespace ON events(namespace);\
777    CREATE INDEX IF NOT EXISTS idx_events_verb ON events(verb);\
778    CREATE INDEX IF NOT EXISTS idx_events_kind ON events(kind);\
779    CREATE INDEX IF NOT EXISTS idx_events_substrate ON events(substrate);\
780    CREATE INDEX IF NOT EXISTS idx_events_created ON events(created_at DESC);\
781    CREATE INDEX IF NOT EXISTS idx_events_ns_created_id ON events(namespace, created_at DESC, id DESC);\
782    CREATE INDEX IF NOT EXISTS idx_events_session ON events(namespace, session_id, created_at, id);\
783    CREATE INDEX IF NOT EXISTS idx_events_payload_proposal_id ON events(json_extract(payload, '$.proposal_id'));\
784    CREATE INDEX IF NOT EXISTS idx_event_obs_entity ON event_observations(entity_id, role);\
785    CREATE INDEX IF NOT EXISTS idx_event_obs_event_role ON event_observations(event_id, role);\
786";
787
788pub(crate) fn ensure_events_schema(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
789    conn.execute_batch(EVENTS_DDL)
790}
791
792#[cfg(test)]
793#[path = "event_tests.rs"]
794mod tests;