Skip to main content

khive_db/stores/
event.rs

1//! SQL-backed `EventStore` implementation.
2//!
3//! FILE SIZE JUSTIFICATION: Event store covers append, query-by-filter,
4//! observation recording, and paginated listing with shared row-mapping and
5//! timestamp serialization helpers. The event schema has complex JSON data
6//! columns (observations, referent kinds, outcomes) whose parsing is shared
7//! across all read paths, making a split impractical without duplicating the
8//! deserialization logic.
9
10use std::sync::Arc;
11
12use async_trait::async_trait;
13use uuid::Uuid;
14
15use khive_storage::error::StorageError;
16use khive_storage::event::{Event, EventFilter, EventObservation, ObservationRole, ReferentKind};
17use khive_storage::types::{BatchWriteSummary, Page, PageRequest};
18use khive_storage::EventStore;
19use khive_storage::StorageCapability;
20use khive_types::{EventKind, EventOutcome, SubstrateKind};
21
22use crate::error::SqliteError;
23use crate::pool::ConnectionPool;
24
25fn map_err(e: rusqlite::Error, op: &'static str) -> StorageError {
26    StorageError::driver(StorageCapability::Events, op, e)
27}
28
29fn map_sqlite_err(e: SqliteError, op: &'static str) -> StorageError {
30    StorageError::driver(StorageCapability::Events, op, e)
31}
32
33/// An EventStore backed by SQLite tables.
34pub struct SqlEventStore {
35    pool: Arc<ConnectionPool>,
36    is_file_backed: bool,
37    namespace: String,
38}
39
40impl SqlEventStore {
41    /// Create a new store scoped to one namespace.
42    pub fn new_scoped(
43        pool: Arc<ConnectionPool>,
44        is_file_backed: bool,
45        namespace: impl Into<String>,
46    ) -> Self {
47        Self {
48            pool,
49            is_file_backed,
50            namespace: namespace.into(),
51        }
52    }
53
54    fn open_standalone_writer(&self) -> Result<rusqlite::Connection, StorageError> {
55        let config = self.pool.config();
56        let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
57            operation: "event_writer".into(),
58            message: "in-memory databases do not support standalone connections".into(),
59        })?;
60
61        let conn = rusqlite::Connection::open_with_flags(
62            path,
63            rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE
64                | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
65                | rusqlite::OpenFlags::SQLITE_OPEN_URI,
66        )
67        .map_err(|e| map_err(e, "open_event_writer"))?;
68
69        conn.busy_timeout(config.busy_timeout)
70            .map_err(|e| map_err(e, "open_event_writer"))?;
71        conn.pragma_update(None, "foreign_keys", "ON")
72            .map_err(|e| map_err(e, "open_event_writer"))?;
73        conn.pragma_update(None, "synchronous", "NORMAL")
74            .map_err(|e| map_err(e, "open_event_writer"))?;
75
76        Ok(conn)
77    }
78
79    fn open_standalone_reader(&self) -> Result<rusqlite::Connection, StorageError> {
80        let config = self.pool.config();
81        let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
82            operation: "event_reader".into(),
83            message: "in-memory databases do not support standalone connections".into(),
84        })?;
85
86        let conn = rusqlite::Connection::open_with_flags(
87            path,
88            rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
89                | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
90                | rusqlite::OpenFlags::SQLITE_OPEN_URI,
91        )
92        .map_err(|e| map_err(e, "open_event_reader"))?;
93
94        conn.busy_timeout(config.busy_timeout)
95            .map_err(|e| map_err(e, "open_event_reader"))?;
96        conn.pragma_update(None, "foreign_keys", "ON")
97            .map_err(|e| map_err(e, "open_event_reader"))?;
98        conn.pragma_update(None, "synchronous", "NORMAL")
99            .map_err(|e| map_err(e, "open_event_reader"))?;
100
101        Ok(conn)
102    }
103
104    async fn with_writer<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
105    where
106        F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
107        R: Send + 'static,
108    {
109        if self.is_file_backed {
110            let conn = self.open_standalone_writer()?;
111            tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
112                .await
113                .map_err(|e| StorageError::driver(StorageCapability::Events, op, e))?
114        } else {
115            let pool = Arc::clone(&self.pool);
116            tokio::task::spawn_blocking(move || {
117                let guard = pool.try_writer().map_err(|e| map_sqlite_err(e, op))?;
118                f(guard.conn()).map_err(|e| map_err(e, op))
119            })
120            .await
121            .map_err(|e| StorageError::driver(StorageCapability::Events, op, e))?
122        }
123    }
124
125    async fn with_reader<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
126    where
127        F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
128        R: Send + 'static,
129    {
130        if self.is_file_backed {
131            let conn = self.open_standalone_reader()?;
132            tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
133                .await
134                .map_err(|e| StorageError::driver(StorageCapability::Events, op, e))?
135        } else {
136            let pool = Arc::clone(&self.pool);
137            tokio::task::spawn_blocking(move || {
138                let guard = pool.reader().map_err(|e| map_sqlite_err(e, op))?;
139                f(guard.conn()).map_err(|e| map_err(e, op))
140            })
141            .await
142            .map_err(|e| StorageError::driver(StorageCapability::Events, op, e))?
143        }
144    }
145}
146
147// =============================================================================
148// Helpers: parse SubstrateKind / EventOutcome / EventKind from DB strings
149// =============================================================================
150
151fn substrate_from_str(s: &str) -> Result<SubstrateKind, rusqlite::Error> {
152    s.parse::<SubstrateKind>().map_err(|_| {
153        rusqlite::Error::FromSqlConversionFailure(
154            0,
155            rusqlite::types::Type::Text,
156            format!("unknown SubstrateKind: {s}").into(),
157        )
158    })
159}
160
161fn outcome_from_str(s: &str) -> Result<EventOutcome, rusqlite::Error> {
162    match s {
163        "success" => Ok(EventOutcome::Success),
164        "denied" => Ok(EventOutcome::Denied),
165        "error" => Ok(EventOutcome::Error),
166        other => Err(rusqlite::Error::FromSqlConversionFailure(
167            0,
168            rusqlite::types::Type::Text,
169            format!("unknown EventOutcome: {other}").into(),
170        )),
171    }
172}
173
174fn kind_from_str(s: &str) -> Result<EventKind, rusqlite::Error> {
175    s.parse::<EventKind>().map_err(|_| {
176        rusqlite::Error::FromSqlConversionFailure(
177            0,
178            rusqlite::types::Type::Text,
179            format!("unknown EventKind: {s}").into(),
180        )
181    })
182}
183
184fn parse_uuid(s: &str) -> Result<Uuid, rusqlite::Error> {
185    Uuid::parse_str(s).map_err(|e| {
186        rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e))
187    })
188}
189
190// Column order: id(0), namespace(1), verb(2), substrate(3), actor(4),
191//               kind(5), outcome(6), payload(7), payload_schema_version(8),
192//               profile_state_version(9), duration_us(10), target_id(11),
193//               session_id(12), aggregate_kind(13), aggregate_id(14), created_at(15)
194fn read_event(row: &rusqlite::Row<'_>) -> Result<Event, rusqlite::Error> {
195    let id_str: String = row.get(0)?;
196    let namespace: String = row.get(1)?;
197    let verb: String = row.get(2)?;
198    let substrate_str: String = row.get(3)?;
199    let actor: String = row.get(4)?;
200    let kind_str: String = row.get(5)?;
201    let outcome_str: String = row.get(6)?;
202    let payload_str: String = row.get(7)?;
203    let payload_schema_version: i64 = row.get(8)?;
204    let profile_state_version: Option<i64> = row.get(9)?;
205    let duration_us: i64 = row.get(10)?;
206    let target_str: Option<String> = row.get(11)?;
207    let session_str: Option<String> = row.get(12)?;
208    let aggregate_kind: Option<String> = row.get(13)?;
209    let aggregate_str: Option<String> = row.get(14)?;
210    let created_at: i64 = row.get(15)?;
211
212    let id = parse_uuid(&id_str)?;
213    let substrate = substrate_from_str(&substrate_str)?;
214    let kind = kind_from_str(&kind_str)?;
215    let outcome = outcome_from_str(&outcome_str)?;
216    let payload: serde_json::Value = serde_json::from_str(&payload_str).map_err(|e| {
217        rusqlite::Error::FromSqlConversionFailure(7, rusqlite::types::Type::Text, Box::new(e))
218    })?;
219    let target_id = target_str.as_deref().map(parse_uuid).transpose()?;
220    let session_id = session_str.as_deref().map(parse_uuid).transpose()?;
221    let aggregate_id = aggregate_str.as_deref().map(parse_uuid).transpose()?;
222    let payload_schema_version_u32: u32 = payload_schema_version.try_into().map_err(|_| {
223        rusqlite::Error::FromSqlConversionFailure(
224            8,
225            rusqlite::types::Type::Integer,
226            format!("payload_schema_version {payload_schema_version} out of u32 range").into(),
227        )
228    })?;
229    let profile_state_version_u64: Option<u64> = profile_state_version
230        .map(|v| {
231            u64::try_from(v).map_err(|_| {
232                rusqlite::Error::FromSqlConversionFailure(
233                    9,
234                    rusqlite::types::Type::Integer,
235                    format!("profile_state_version {v} out of u64 range").into(),
236                )
237            })
238        })
239        .transpose()?;
240
241    Ok(Event {
242        id,
243        namespace,
244        verb,
245        substrate,
246        actor,
247        kind,
248        outcome,
249        payload,
250        payload_schema_version: payload_schema_version_u32,
251        profile_state_version: profile_state_version_u64,
252        duration_us,
253        target_id,
254        session_id,
255        aggregate_kind,
256        aggregate_id,
257        created_at,
258    })
259}
260
261// =============================================================================
262// Helpers: observation projection write path
263// =============================================================================
264
265fn insert_event_with_observations(
266    conn: &rusqlite::Connection,
267    event: &Event,
268) -> Result<(), rusqlite::Error> {
269    let id_str = event.id.to_string();
270    let substrate_str = event.substrate.name().to_string();
271    let kind_str = event.kind.name().to_string();
272    let outcome_str = event.outcome.name().to_string();
273    let payload_str = event.payload.to_string();
274    let target_str = event.target_id.map(|u| u.to_string());
275    let session_str = event.session_id.map(|u| u.to_string());
276    let aggregate_str = event.aggregate_id.map(|u| u.to_string());
277    let profile_state_version = event.profile_state_version.map(|v| v as i64);
278
279    conn.execute(
280        "INSERT INTO events \
281         (id, namespace, verb, substrate, actor, kind, outcome, payload, payload_schema_version, \
282          profile_state_version, duration_us, target_id, session_id, aggregate_kind, aggregate_id, created_at) \
283         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
284        rusqlite::params![
285            id_str,
286            &event.namespace,
287            &event.verb,
288            substrate_str,
289            &event.actor,
290            kind_str,
291            outcome_str,
292            payload_str,
293            event.payload_schema_version as i64,
294            profile_state_version,
295            event.duration_us,
296            target_str,
297            session_str,
298            &event.aggregate_kind,
299            aggregate_str,
300            event.created_at,
301        ],
302    )?;
303
304    for observation in decode_event_observations(event)? {
305        conn.execute(
306            "INSERT INTO event_observations \
307             (event_id, entity_id, referent_kind, role, position) \
308             VALUES (?1, ?2, ?3, ?4, ?5)",
309            rusqlite::params![
310                observation.event_id.to_string(),
311                observation.entity_id.to_string(),
312                observation.referent_kind.name(),
313                observation.role.name(),
314                observation.position as i64,
315            ],
316        )?;
317    }
318
319    Ok(())
320}
321
322fn decode_event_observations(event: &Event) -> Result<Vec<EventObservation>, rusqlite::Error> {
323    match event.kind {
324        EventKind::RerankExecuted => decode_rank_observations(event),
325        EventKind::RecallExecuted | EventKind::SearchExecuted => decode_rank_observations(event),
326        EventKind::LinkCreated => decode_link_observations(event),
327        EventKind::EntityCreated
328        | EventKind::EntityUpdated
329        | EventKind::EntityDeleted
330        | EventKind::NoteCreated
331        | EventKind::NoteUpdated
332        | EventKind::NoteDeleted
333        | EventKind::TaskTransitioned => decode_target_observation(event),
334        EventKind::FeedbackExplicit => decode_signal_observation(event),
335        _ => Ok(Vec::new()),
336    }
337}
338
339fn payload_uuid_array(event: &Event, field: &'static str) -> Result<Vec<Uuid>, rusqlite::Error> {
340    let Some(values) = event.payload.get(field) else {
341        return Ok(Vec::new());
342    };
343    let Some(array) = values.as_array() else {
344        return Err(invalid_payload(event.kind, field, "expected array"));
345    };
346
347    array
348        .iter()
349        .map(|value| {
350            value
351                .as_str()
352                .ok_or_else(|| invalid_payload(event.kind, field, "expected UUID string"))
353                .and_then(|s| Uuid::parse_str(s).map_err(|e| invalid_payload(event.kind, field, e)))
354        })
355        .collect()
356}
357
358fn payload_uuid(event: &Event, field: &'static str) -> Result<Option<Uuid>, rusqlite::Error> {
359    let Some(value) = event.payload.get(field) else {
360        return Ok(None);
361    };
362    let Some(s) = value.as_str() else {
363        return Err(invalid_payload(event.kind, field, "expected UUID string"));
364    };
365    Uuid::parse_str(s)
366        .map(Some)
367        .map_err(|e| invalid_payload(event.kind, field, e))
368}
369
370fn decode_rank_observations(event: &Event) -> Result<Vec<EventObservation>, rusqlite::Error> {
371    let mut rows = Vec::new();
372
373    for (position, entity_id) in payload_uuid_array(event, "candidates")?
374        .into_iter()
375        .enumerate()
376    {
377        let position_u32 = u32::try_from(position).map_err(|_| {
378            invalid_payload(
379                event.kind,
380                "candidates[position]",
381                "position out of u32 range",
382            )
383        })?;
384        rows.push(EventObservation {
385            event_id: event.id,
386            entity_id,
387            referent_kind: ReferentKind::Note,
388            role: ObservationRole::Candidate,
389            position: position_u32,
390        });
391    }
392
393    let selected = payload_uuid_array(event, "selected")
394        .or_else(|_| payload_uuid_array(event, "reranked"))
395        .or_else(|_| payload_uuid_array(event, "final_scores"))?;
396    for (position, entity_id) in selected.into_iter().enumerate() {
397        let position_u32 = u32::try_from(position).map_err(|_| {
398            invalid_payload(
399                event.kind,
400                "selected[position]",
401                "position out of u32 range",
402            )
403        })?;
404        rows.push(EventObservation {
405            event_id: event.id,
406            entity_id,
407            referent_kind: ReferentKind::Note,
408            role: ObservationRole::Selected,
409            position: position_u32,
410        });
411    }
412
413    Ok(rows)
414}
415
416fn decode_link_observations(event: &Event) -> Result<Vec<EventObservation>, rusqlite::Error> {
417    let mut rows = Vec::new();
418    if let Some(source) = payload_uuid(event, "source_id")? {
419        rows.push(EventObservation {
420            event_id: event.id,
421            entity_id: source,
422            referent_kind: ReferentKind::Entity,
423            role: ObservationRole::Target,
424            position: 0,
425        });
426    }
427    if let Some(target) = payload_uuid(event, "target_id")? {
428        rows.push(EventObservation {
429            event_id: event.id,
430            entity_id: target,
431            referent_kind: ReferentKind::Entity,
432            role: ObservationRole::Target,
433            position: 1,
434        });
435    }
436    Ok(rows)
437}
438
439fn decode_target_observation(event: &Event) -> Result<Vec<EventObservation>, rusqlite::Error> {
440    let Some(entity_id) = event.target_id.or(payload_uuid(event, "target_id")?) else {
441        return Ok(Vec::new());
442    };
443    Ok(vec![EventObservation {
444        event_id: event.id,
445        entity_id,
446        referent_kind: if event.substrate == SubstrateKind::Note {
447            ReferentKind::Note
448        } else {
449            ReferentKind::Entity
450        },
451        role: ObservationRole::Target,
452        position: 0,
453    }])
454}
455
456fn decode_signal_observation(event: &Event) -> Result<Vec<EventObservation>, rusqlite::Error> {
457    let Some(entity_id) = payload_uuid(event, "about_id")? else {
458        return Ok(Vec::new());
459    };
460    Ok(vec![EventObservation {
461        event_id: event.id,
462        entity_id,
463        referent_kind: ReferentKind::Entity,
464        role: ObservationRole::Signal,
465        position: 0,
466    }])
467}
468
469fn invalid_payload(
470    kind: EventKind,
471    field: &'static str,
472    reason: impl std::fmt::Display,
473) -> rusqlite::Error {
474    rusqlite::Error::ToSqlConversionFailure(
475        format!("invalid payload for {}.{field}: {reason}", kind.name()).into(),
476    )
477}
478
479// =============================================================================
480// Helpers: filter SQL builder
481// =============================================================================
482
483fn build_event_filter_sql(
484    conn: &rusqlite::Connection,
485    default_namespace: &str,
486    filter: &EventFilter,
487) -> Result<(String, Vec<Box<dyn rusqlite::types::ToSql>>), rusqlite::Error> {
488    reject_missing_event_filter_schema(conn, filter)?;
489
490    let mut conditions: Vec<String> = Vec::new();
491    let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
492
493    params.push(Box::new(default_namespace.to_string()));
494    conditions.push(format!("namespace = ?{}", params.len()));
495
496    push_in_clause(
497        &mut conditions,
498        &mut params,
499        "id",
500        filter.ids.iter().map(Uuid::to_string),
501    );
502    push_in_clause(
503        &mut conditions,
504        &mut params,
505        "kind",
506        filter.kinds.iter().map(|kind| kind.name().to_string()),
507    );
508    push_in_clause(
509        &mut conditions,
510        &mut params,
511        "verb",
512        filter.verbs.iter().cloned(),
513    );
514    push_in_clause(
515        &mut conditions,
516        &mut params,
517        "substrate",
518        filter.substrates.iter().map(|s| s.name().to_string()),
519    );
520    push_in_clause(
521        &mut conditions,
522        &mut params,
523        "actor",
524        filter.actors.iter().cloned(),
525    );
526
527    if let Some(after) = filter.after {
528        params.push(Box::new(after));
529        conditions.push(format!("created_at > ?{}", params.len()));
530    }
531
532    if let Some(before) = filter.before {
533        params.push(Box::new(before));
534        conditions.push(format!("created_at < ?{}", params.len()));
535    }
536
537    if let Some(session_id) = filter.session_id {
538        params.push(Box::new(session_id.to_string()));
539        conditions.push(format!("session_id = ?{}", params.len()));
540    }
541
542    push_observation_exists(&mut conditions, &mut params, "candidate", &filter.observed);
543    push_observation_exists(&mut conditions, &mut params, "selected", &filter.selected);
544
545    if let Some(proposal_id) = filter.payload_proposal_id {
546        params.push(Box::new(proposal_id.to_string()));
547        conditions.push(format!(
548            "json_extract(payload, '$.proposal_id') = ?{}",
549            params.len()
550        ));
551    }
552
553    let clause = format!(" WHERE {}", conditions.join(" AND "));
554    Ok((clause, params))
555}
556
557fn push_in_clause<I>(
558    conditions: &mut Vec<String>,
559    params: &mut Vec<Box<dyn rusqlite::types::ToSql>>,
560    column: &'static str,
561    values: I,
562) where
563    I: IntoIterator<Item = String>,
564{
565    let placeholders: Vec<String> = values
566        .into_iter()
567        .map(|value| {
568            params.push(Box::new(value));
569            format!("?{}", params.len())
570        })
571        .collect();
572    if !placeholders.is_empty() {
573        conditions.push(format!("{column} IN ({})", placeholders.join(",")));
574    }
575}
576
577fn push_observation_exists(
578    conditions: &mut Vec<String>,
579    params: &mut Vec<Box<dyn rusqlite::types::ToSql>>,
580    role: &'static str,
581    entity_ids: &[Uuid],
582) {
583    if entity_ids.is_empty() {
584        return;
585    }
586    let placeholders: Vec<String> = entity_ids
587        .iter()
588        .map(|id| {
589            params.push(Box::new(id.to_string()));
590            format!("?{}", params.len())
591        })
592        .collect();
593    conditions.push(format!(
594        "EXISTS (SELECT 1 FROM event_observations o \
595         WHERE o.event_id = events.id AND o.role = '{role}' AND o.entity_id IN ({}))",
596        placeholders.join(",")
597    ));
598}
599
600fn reject_missing_event_filter_schema(
601    conn: &rusqlite::Connection,
602    filter: &EventFilter,
603) -> Result<(), rusqlite::Error> {
604    if filter.session_id.is_some() && !has_column(conn, "events", "session_id")? {
605        return Err(schema_absent("events.session_id"));
606    }
607    if (!filter.observed.is_empty() || !filter.selected.is_empty())
608        && !has_table(conn, "event_observations")?
609    {
610        return Err(schema_absent("event_observations"));
611    }
612    if filter.payload_proposal_id.is_some() && !has_column(conn, "events", "payload")? {
613        return Err(schema_absent("events.payload"));
614    }
615    Ok(())
616}
617
618fn has_table(conn: &rusqlite::Connection, table: &'static str) -> Result<bool, rusqlite::Error> {
619    conn.query_row(
620        "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type = 'table' AND name = ?1",
621        [table],
622        |row| row.get(0),
623    )
624}
625
626fn has_column(
627    conn: &rusqlite::Connection,
628    table: &'static str,
629    column: &'static str,
630) -> Result<bool, rusqlite::Error> {
631    conn.query_row(
632        "SELECT COUNT(*) > 0 FROM pragma_table_info(?1) WHERE name = ?2",
633        rusqlite::params![table, column],
634        |row| row.get(0),
635    )
636}
637
638fn schema_absent(name: &'static str) -> rusqlite::Error {
639    rusqlite::Error::ToSqlConversionFailure(
640        format!("event filter requires missing schema element {name}; run migrations").into(),
641    )
642}
643
644// =============================================================================
645// EventStore implementation
646// =============================================================================
647
648#[async_trait]
649impl EventStore for SqlEventStore {
650    async fn append_event(&self, event: Event) -> Result<(), StorageError> {
651        self.with_writer("append_event", move |conn| {
652            conn.execute_batch("BEGIN IMMEDIATE")?;
653            if let Err(e) = insert_event_with_observations(conn, &event) {
654                let _ = conn.execute_batch("ROLLBACK");
655                return Err(e);
656            }
657            conn.execute_batch("COMMIT")?;
658            Ok(())
659        })
660        .await
661    }
662
663    async fn append_events(&self, events: Vec<Event>) -> Result<BatchWriteSummary, StorageError> {
664        let attempted = events.len() as u64;
665
666        self.with_writer("append_events", move |conn| {
667            conn.execute_batch("BEGIN IMMEDIATE")?;
668            let mut affected = 0u64;
669
670            for event in &events {
671                if let Err(e) = insert_event_with_observations(conn, event) {
672                    let _ = conn.execute_batch("ROLLBACK");
673                    return Err(e);
674                }
675                affected += 1;
676            }
677
678            conn.execute_batch("COMMIT")?;
679            Ok(BatchWriteSummary {
680                attempted,
681                affected,
682                failed: 0,
683                first_error: String::new(),
684            })
685        })
686        .await
687    }
688
689    async fn get_event(&self, id: Uuid) -> Result<Option<Event>, StorageError> {
690        let namespace = self.namespace.clone();
691        let id_str = id.to_string();
692
693        self.with_reader("get_event", move |conn| {
694            let mut stmt = conn.prepare(
695                "SELECT id, namespace, verb, substrate, actor, kind, outcome, payload, \
696                        payload_schema_version, profile_state_version, duration_us, target_id, \
697                        session_id, aggregate_kind, aggregate_id, created_at \
698                 FROM events WHERE namespace = ?1 AND id = ?2",
699            )?;
700            let mut rows = stmt.query(rusqlite::params![namespace, id_str])?;
701            match rows.next()? {
702                Some(row) => Ok(Some(read_event(row)?)),
703                None => Ok(None),
704            }
705        })
706        .await
707    }
708
709    async fn query_events(
710        &self,
711        filter: EventFilter,
712        page: PageRequest,
713    ) -> Result<Page<Event>, StorageError> {
714        let namespace = self.namespace.clone();
715
716        self.with_reader("query_events", move |conn| {
717            let (where_clause, filter_params) = build_event_filter_sql(conn, &namespace, &filter)?;
718
719            let count_sql = format!("SELECT COUNT(*) FROM events{}", where_clause);
720            let total: i64 = {
721                let mut stmt = conn.prepare(&count_sql)?;
722                let param_refs: Vec<&dyn rusqlite::types::ToSql> =
723                    filter_params.iter().map(|p| p.as_ref()).collect();
724                stmt.query_row(param_refs.as_slice(), |row| row.get(0))?
725            };
726
727            let (_, data_filter_params) = build_event_filter_sql(conn, &namespace, &filter)?;
728            let mut all_params: Vec<Box<dyn rusqlite::types::ToSql>> = data_filter_params;
729            all_params.push(Box::new(page.limit as i64));
730            all_params.push(Box::new(page.offset as i64));
731
732            let limit_idx = all_params.len() - 1;
733            let offset_idx = all_params.len();
734
735            let data_sql = format!(
736                "SELECT id, namespace, verb, substrate, actor, kind, outcome, payload, \
737                        payload_schema_version, profile_state_version, duration_us, target_id, \
738                        session_id, aggregate_kind, aggregate_id, created_at \
739                 FROM events{} ORDER BY created_at DESC, id DESC LIMIT ?{} OFFSET ?{}",
740                where_clause, limit_idx, offset_idx,
741            );
742
743            let mut stmt = conn.prepare(&data_sql)?;
744            let param_refs: Vec<&dyn rusqlite::types::ToSql> =
745                all_params.iter().map(|p| p.as_ref()).collect();
746            let rows = stmt.query_map(param_refs.as_slice(), read_event)?;
747
748            let mut items = Vec::new();
749            for row in rows {
750                items.push(row?);
751            }
752
753            Ok(Page {
754                items,
755                total: Some(total as u64),
756            })
757        })
758        .await
759    }
760
761    async fn count_events(&self, filter: EventFilter) -> Result<u64, StorageError> {
762        let namespace = self.namespace.clone();
763
764        self.with_reader("count_events", move |conn| {
765            let (where_clause, params) = build_event_filter_sql(conn, &namespace, &filter)?;
766            let sql = format!("SELECT COUNT(*) FROM events{}", where_clause);
767            let mut stmt = conn.prepare(&sql)?;
768            let param_refs: Vec<&dyn rusqlite::types::ToSql> =
769                params.iter().map(|p| p.as_ref()).collect();
770            let count: i64 = stmt.query_row(param_refs.as_slice(), |row| row.get(0))?;
771            Ok(count as u64)
772        })
773        .await
774    }
775}
776
777// =============================================================================
778// DDL
779// =============================================================================
780
781const EVENTS_DDL: &str = include_str!("../../sql/events-ddl.sql");
782
783pub(crate) fn ensure_events_schema(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
784    conn.execute_batch(EVENTS_DDL)
785}
786
787#[cfg(test)]
788#[path = "event_tests.rs"]
789mod tests;