1use std::sync::Arc;
11
12use async_trait::async_trait;
13use uuid::Uuid;
14
15use khive_storage::error::StorageError;
16use khive_storage::event::{Event, EventFilter, EventObservation, ObservationRole, ReferentKind};
17use khive_storage::types::{BatchWriteSummary, Page, PageRequest};
18use khive_storage::EventStore;
19use khive_storage::StorageCapability;
20use khive_types::{EventKind, EventOutcome, SubstrateKind};
21
22use crate::error::SqliteError;
23use crate::pool::ConnectionPool;
24
25fn map_err(e: rusqlite::Error, op: &'static str) -> StorageError {
26 StorageError::driver(StorageCapability::Events, op, e)
27}
28
29fn map_sqlite_err(e: SqliteError, op: &'static str) -> StorageError {
30 StorageError::driver(StorageCapability::Events, op, e)
31}
32
33pub struct SqlEventStore {
35 pool: Arc<ConnectionPool>,
36 is_file_backed: bool,
37 namespace: String,
38}
39
40impl SqlEventStore {
41 pub fn new_scoped(
43 pool: Arc<ConnectionPool>,
44 is_file_backed: bool,
45 namespace: impl Into<String>,
46 ) -> Self {
47 Self {
48 pool,
49 is_file_backed,
50 namespace: namespace.into(),
51 }
52 }
53
54 fn open_standalone_writer(&self) -> Result<rusqlite::Connection, StorageError> {
55 let config = self.pool.config();
56 let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
57 operation: "event_writer".into(),
58 message: "in-memory databases do not support standalone connections".into(),
59 })?;
60
61 let conn = rusqlite::Connection::open_with_flags(
62 path,
63 rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE
64 | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
65 | rusqlite::OpenFlags::SQLITE_OPEN_URI,
66 )
67 .map_err(|e| map_err(e, "open_event_writer"))?;
68
69 conn.busy_timeout(config.busy_timeout)
70 .map_err(|e| map_err(e, "open_event_writer"))?;
71 conn.pragma_update(None, "foreign_keys", "ON")
72 .map_err(|e| map_err(e, "open_event_writer"))?;
73 conn.pragma_update(None, "synchronous", "NORMAL")
74 .map_err(|e| map_err(e, "open_event_writer"))?;
75
76 Ok(conn)
77 }
78
79 fn open_standalone_reader(&self) -> Result<rusqlite::Connection, StorageError> {
80 let config = self.pool.config();
81 let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
82 operation: "event_reader".into(),
83 message: "in-memory databases do not support standalone connections".into(),
84 })?;
85
86 let conn = rusqlite::Connection::open_with_flags(
87 path,
88 rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
89 | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
90 | rusqlite::OpenFlags::SQLITE_OPEN_URI,
91 )
92 .map_err(|e| map_err(e, "open_event_reader"))?;
93
94 conn.busy_timeout(config.busy_timeout)
95 .map_err(|e| map_err(e, "open_event_reader"))?;
96 conn.pragma_update(None, "foreign_keys", "ON")
97 .map_err(|e| map_err(e, "open_event_reader"))?;
98 conn.pragma_update(None, "synchronous", "NORMAL")
99 .map_err(|e| map_err(e, "open_event_reader"))?;
100
101 Ok(conn)
102 }
103
104 async fn with_writer<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
105 where
106 F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
107 R: Send + 'static,
108 {
109 if self.is_file_backed {
110 let conn = self.open_standalone_writer()?;
111 tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
112 .await
113 .map_err(|e| StorageError::driver(StorageCapability::Events, op, e))?
114 } else {
115 let pool = Arc::clone(&self.pool);
116 tokio::task::spawn_blocking(move || {
117 let guard = pool.try_writer().map_err(|e| map_sqlite_err(e, op))?;
118 f(guard.conn()).map_err(|e| map_err(e, op))
119 })
120 .await
121 .map_err(|e| StorageError::driver(StorageCapability::Events, op, e))?
122 }
123 }
124
125 async fn with_reader<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
126 where
127 F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
128 R: Send + 'static,
129 {
130 if self.is_file_backed {
131 let conn = self.open_standalone_reader()?;
132 tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
133 .await
134 .map_err(|e| StorageError::driver(StorageCapability::Events, op, e))?
135 } else {
136 let pool = Arc::clone(&self.pool);
137 tokio::task::spawn_blocking(move || {
138 let guard = pool.reader().map_err(|e| map_sqlite_err(e, op))?;
139 f(guard.conn()).map_err(|e| map_err(e, op))
140 })
141 .await
142 .map_err(|e| StorageError::driver(StorageCapability::Events, op, e))?
143 }
144 }
145}
146
147fn substrate_from_str(s: &str) -> Result<SubstrateKind, rusqlite::Error> {
152 s.parse::<SubstrateKind>().map_err(|_| {
153 rusqlite::Error::FromSqlConversionFailure(
154 0,
155 rusqlite::types::Type::Text,
156 format!("unknown SubstrateKind: {s}").into(),
157 )
158 })
159}
160
161fn outcome_from_str(s: &str) -> Result<EventOutcome, rusqlite::Error> {
162 match s {
163 "success" => Ok(EventOutcome::Success),
164 "denied" => Ok(EventOutcome::Denied),
165 "error" => Ok(EventOutcome::Error),
166 other => Err(rusqlite::Error::FromSqlConversionFailure(
167 0,
168 rusqlite::types::Type::Text,
169 format!("unknown EventOutcome: {other}").into(),
170 )),
171 }
172}
173
174fn kind_from_str(s: &str) -> Result<EventKind, rusqlite::Error> {
175 s.parse::<EventKind>().map_err(|_| {
176 rusqlite::Error::FromSqlConversionFailure(
177 0,
178 rusqlite::types::Type::Text,
179 format!("unknown EventKind: {s}").into(),
180 )
181 })
182}
183
184fn parse_uuid(s: &str) -> Result<Uuid, rusqlite::Error> {
185 Uuid::parse_str(s).map_err(|e| {
186 rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e))
187 })
188}
189
190fn read_event(row: &rusqlite::Row<'_>) -> Result<Event, rusqlite::Error> {
195 let id_str: String = row.get(0)?;
196 let namespace: String = row.get(1)?;
197 let verb: String = row.get(2)?;
198 let substrate_str: String = row.get(3)?;
199 let actor: String = row.get(4)?;
200 let kind_str: String = row.get(5)?;
201 let outcome_str: String = row.get(6)?;
202 let payload_str: String = row.get(7)?;
203 let payload_schema_version: i64 = row.get(8)?;
204 let profile_state_version: Option<i64> = row.get(9)?;
205 let duration_us: i64 = row.get(10)?;
206 let target_str: Option<String> = row.get(11)?;
207 let session_str: Option<String> = row.get(12)?;
208 let aggregate_kind: Option<String> = row.get(13)?;
209 let aggregate_str: Option<String> = row.get(14)?;
210 let created_at: i64 = row.get(15)?;
211
212 let id = parse_uuid(&id_str)?;
213 let substrate = substrate_from_str(&substrate_str)?;
214 let kind = kind_from_str(&kind_str)?;
215 let outcome = outcome_from_str(&outcome_str)?;
216 let payload: serde_json::Value = serde_json::from_str(&payload_str).map_err(|e| {
217 rusqlite::Error::FromSqlConversionFailure(7, rusqlite::types::Type::Text, Box::new(e))
218 })?;
219 let target_id = target_str.as_deref().map(parse_uuid).transpose()?;
220 let session_id = session_str.as_deref().map(parse_uuid).transpose()?;
221 let aggregate_id = aggregate_str.as_deref().map(parse_uuid).transpose()?;
222
223 Ok(Event {
224 id,
225 namespace,
226 verb,
227 substrate,
228 actor,
229 kind,
230 outcome,
231 payload,
232 payload_schema_version: payload_schema_version as u32,
233 profile_state_version: profile_state_version.map(|v| v as u64),
234 duration_us,
235 target_id,
236 session_id,
237 aggregate_kind,
238 aggregate_id,
239 created_at,
240 })
241}
242
243fn insert_event_with_observations(
248 conn: &rusqlite::Connection,
249 event: &Event,
250) -> Result<(), rusqlite::Error> {
251 let id_str = event.id.to_string();
252 let substrate_str = event.substrate.name().to_string();
253 let kind_str = event.kind.name().to_string();
254 let outcome_str = event.outcome.name().to_string();
255 let payload_str = event.payload.to_string();
256 let target_str = event.target_id.map(|u| u.to_string());
257 let session_str = event.session_id.map(|u| u.to_string());
258 let aggregate_str = event.aggregate_id.map(|u| u.to_string());
259 let profile_state_version = event.profile_state_version.map(|v| v as i64);
260
261 conn.execute(
262 "INSERT INTO events \
263 (id, namespace, verb, substrate, actor, kind, outcome, payload, payload_schema_version, \
264 profile_state_version, duration_us, target_id, session_id, aggregate_kind, aggregate_id, created_at) \
265 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
266 rusqlite::params![
267 id_str,
268 &event.namespace,
269 &event.verb,
270 substrate_str,
271 &event.actor,
272 kind_str,
273 outcome_str,
274 payload_str,
275 event.payload_schema_version as i64,
276 profile_state_version,
277 event.duration_us,
278 target_str,
279 session_str,
280 &event.aggregate_kind,
281 aggregate_str,
282 event.created_at,
283 ],
284 )?;
285
286 for observation in decode_event_observations(event)? {
287 conn.execute(
288 "INSERT INTO event_observations \
289 (event_id, entity_id, referent_kind, role, position) \
290 VALUES (?1, ?2, ?3, ?4, ?5)",
291 rusqlite::params![
292 observation.event_id.to_string(),
293 observation.entity_id.to_string(),
294 observation.referent_kind.name(),
295 observation.role.name(),
296 observation.position as i64,
297 ],
298 )?;
299 }
300
301 Ok(())
302}
303
304fn decode_event_observations(event: &Event) -> Result<Vec<EventObservation>, rusqlite::Error> {
305 match event.kind {
306 EventKind::RerankExecuted => decode_rank_observations(event),
307 EventKind::RecallExecuted | EventKind::SearchExecuted => decode_rank_observations(event),
308 EventKind::LinkCreated => decode_link_observations(event),
309 EventKind::EntityCreated
310 | EventKind::EntityUpdated
311 | EventKind::EntityDeleted
312 | EventKind::NoteCreated
313 | EventKind::NoteUpdated
314 | EventKind::NoteDeleted
315 | EventKind::TaskTransitioned => decode_target_observation(event),
316 EventKind::FeedbackExplicit => decode_signal_observation(event),
317 _ => Ok(Vec::new()),
318 }
319}
320
321fn payload_uuid_array(event: &Event, field: &'static str) -> Result<Vec<Uuid>, rusqlite::Error> {
322 let Some(values) = event.payload.get(field) else {
323 return Ok(Vec::new());
324 };
325 let Some(array) = values.as_array() else {
326 return Err(invalid_payload(event.kind, field, "expected array"));
327 };
328
329 array
330 .iter()
331 .map(|value| {
332 value
333 .as_str()
334 .ok_or_else(|| invalid_payload(event.kind, field, "expected UUID string"))
335 .and_then(|s| Uuid::parse_str(s).map_err(|e| invalid_payload(event.kind, field, e)))
336 })
337 .collect()
338}
339
340fn payload_uuid(event: &Event, field: &'static str) -> Result<Option<Uuid>, rusqlite::Error> {
341 let Some(value) = event.payload.get(field) else {
342 return Ok(None);
343 };
344 let Some(s) = value.as_str() else {
345 return Err(invalid_payload(event.kind, field, "expected UUID string"));
346 };
347 Uuid::parse_str(s)
348 .map(Some)
349 .map_err(|e| invalid_payload(event.kind, field, e))
350}
351
352fn decode_rank_observations(event: &Event) -> Result<Vec<EventObservation>, rusqlite::Error> {
353 let mut rows = Vec::new();
354
355 for (position, entity_id) in payload_uuid_array(event, "candidates")?
356 .into_iter()
357 .enumerate()
358 {
359 rows.push(EventObservation {
360 event_id: event.id,
361 entity_id,
362 referent_kind: ReferentKind::Note,
363 role: ObservationRole::Candidate,
364 position: position as u32,
365 });
366 }
367
368 let selected = payload_uuid_array(event, "selected")
369 .or_else(|_| payload_uuid_array(event, "reranked"))
370 .or_else(|_| payload_uuid_array(event, "final_scores"))?;
371 for (position, entity_id) in selected.into_iter().enumerate() {
372 rows.push(EventObservation {
373 event_id: event.id,
374 entity_id,
375 referent_kind: ReferentKind::Note,
376 role: ObservationRole::Selected,
377 position: position as u32,
378 });
379 }
380
381 Ok(rows)
382}
383
384fn decode_link_observations(event: &Event) -> Result<Vec<EventObservation>, rusqlite::Error> {
385 let mut rows = Vec::new();
386 if let Some(source) = payload_uuid(event, "source_id")? {
387 rows.push(EventObservation {
388 event_id: event.id,
389 entity_id: source,
390 referent_kind: ReferentKind::Entity,
391 role: ObservationRole::Target,
392 position: 0,
393 });
394 }
395 if let Some(target) = payload_uuid(event, "target_id")? {
396 rows.push(EventObservation {
397 event_id: event.id,
398 entity_id: target,
399 referent_kind: ReferentKind::Entity,
400 role: ObservationRole::Target,
401 position: 1,
402 });
403 }
404 Ok(rows)
405}
406
407fn decode_target_observation(event: &Event) -> Result<Vec<EventObservation>, rusqlite::Error> {
408 let Some(entity_id) = event.target_id.or(payload_uuid(event, "target_id")?) else {
409 return Ok(Vec::new());
410 };
411 Ok(vec![EventObservation {
412 event_id: event.id,
413 entity_id,
414 referent_kind: if event.substrate == SubstrateKind::Note {
415 ReferentKind::Note
416 } else {
417 ReferentKind::Entity
418 },
419 role: ObservationRole::Target,
420 position: 0,
421 }])
422}
423
424fn decode_signal_observation(event: &Event) -> Result<Vec<EventObservation>, rusqlite::Error> {
425 let Some(entity_id) = payload_uuid(event, "about_id")? else {
426 return Ok(Vec::new());
427 };
428 Ok(vec![EventObservation {
429 event_id: event.id,
430 entity_id,
431 referent_kind: ReferentKind::Entity,
432 role: ObservationRole::Signal,
433 position: 0,
434 }])
435}
436
437fn invalid_payload(
438 kind: EventKind,
439 field: &'static str,
440 reason: impl std::fmt::Display,
441) -> rusqlite::Error {
442 rusqlite::Error::ToSqlConversionFailure(
443 format!("invalid payload for {}.{field}: {reason}", kind.name()).into(),
444 )
445}
446
447fn build_event_filter_sql(
452 conn: &rusqlite::Connection,
453 default_namespace: &str,
454 filter: &EventFilter,
455) -> Result<(String, Vec<Box<dyn rusqlite::types::ToSql>>), rusqlite::Error> {
456 reject_missing_event_filter_schema(conn, filter)?;
457
458 let mut conditions: Vec<String> = Vec::new();
459 let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
460
461 params.push(Box::new(default_namespace.to_string()));
462 conditions.push(format!("namespace = ?{}", params.len()));
463
464 push_in_clause(
465 &mut conditions,
466 &mut params,
467 "id",
468 filter.ids.iter().map(Uuid::to_string),
469 );
470 push_in_clause(
471 &mut conditions,
472 &mut params,
473 "kind",
474 filter.kinds.iter().map(|kind| kind.name().to_string()),
475 );
476 push_in_clause(
477 &mut conditions,
478 &mut params,
479 "verb",
480 filter.verbs.iter().cloned(),
481 );
482 push_in_clause(
483 &mut conditions,
484 &mut params,
485 "substrate",
486 filter.substrates.iter().map(|s| s.name().to_string()),
487 );
488 push_in_clause(
489 &mut conditions,
490 &mut params,
491 "actor",
492 filter.actors.iter().cloned(),
493 );
494
495 if let Some(after) = filter.after {
496 params.push(Box::new(after));
497 conditions.push(format!("created_at > ?{}", params.len()));
498 }
499
500 if let Some(before) = filter.before {
501 params.push(Box::new(before));
502 conditions.push(format!("created_at < ?{}", params.len()));
503 }
504
505 if let Some(session_id) = filter.session_id {
506 params.push(Box::new(session_id.to_string()));
507 conditions.push(format!("session_id = ?{}", params.len()));
508 }
509
510 push_observation_exists(&mut conditions, &mut params, "candidate", &filter.observed);
511 push_observation_exists(&mut conditions, &mut params, "selected", &filter.selected);
512
513 if let Some(proposal_id) = filter.payload_proposal_id {
514 params.push(Box::new(proposal_id.to_string()));
515 conditions.push(format!(
516 "json_extract(payload, '$.proposal_id') = ?{}",
517 params.len()
518 ));
519 }
520
521 let clause = format!(" WHERE {}", conditions.join(" AND "));
522 Ok((clause, params))
523}
524
525fn push_in_clause<I>(
526 conditions: &mut Vec<String>,
527 params: &mut Vec<Box<dyn rusqlite::types::ToSql>>,
528 column: &'static str,
529 values: I,
530) where
531 I: IntoIterator<Item = String>,
532{
533 let placeholders: Vec<String> = values
534 .into_iter()
535 .map(|value| {
536 params.push(Box::new(value));
537 format!("?{}", params.len())
538 })
539 .collect();
540 if !placeholders.is_empty() {
541 conditions.push(format!("{column} IN ({})", placeholders.join(",")));
542 }
543}
544
545fn push_observation_exists(
546 conditions: &mut Vec<String>,
547 params: &mut Vec<Box<dyn rusqlite::types::ToSql>>,
548 role: &'static str,
549 entity_ids: &[Uuid],
550) {
551 if entity_ids.is_empty() {
552 return;
553 }
554 let placeholders: Vec<String> = entity_ids
555 .iter()
556 .map(|id| {
557 params.push(Box::new(id.to_string()));
558 format!("?{}", params.len())
559 })
560 .collect();
561 conditions.push(format!(
562 "EXISTS (SELECT 1 FROM event_observations o \
563 WHERE o.event_id = events.id AND o.role = '{role}' AND o.entity_id IN ({}))",
564 placeholders.join(",")
565 ));
566}
567
568fn reject_missing_event_filter_schema(
569 conn: &rusqlite::Connection,
570 filter: &EventFilter,
571) -> Result<(), rusqlite::Error> {
572 if filter.session_id.is_some() && !has_column(conn, "events", "session_id")? {
573 return Err(schema_absent("events.session_id"));
574 }
575 if (!filter.observed.is_empty() || !filter.selected.is_empty())
576 && !has_table(conn, "event_observations")?
577 {
578 return Err(schema_absent("event_observations"));
579 }
580 if filter.payload_proposal_id.is_some() && !has_column(conn, "events", "payload")? {
581 return Err(schema_absent("events.payload"));
582 }
583 Ok(())
584}
585
586fn has_table(conn: &rusqlite::Connection, table: &'static str) -> Result<bool, rusqlite::Error> {
587 conn.query_row(
588 "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type = 'table' AND name = ?1",
589 [table],
590 |row| row.get(0),
591 )
592}
593
594fn has_column(
595 conn: &rusqlite::Connection,
596 table: &'static str,
597 column: &'static str,
598) -> Result<bool, rusqlite::Error> {
599 conn.query_row(
600 "SELECT COUNT(*) > 0 FROM pragma_table_info(?1) WHERE name = ?2",
601 rusqlite::params![table, column],
602 |row| row.get(0),
603 )
604}
605
606fn schema_absent(name: &'static str) -> rusqlite::Error {
607 rusqlite::Error::ToSqlConversionFailure(
608 format!("event filter requires missing schema element {name}; run migrations").into(),
609 )
610}
611
612#[async_trait]
617impl EventStore for SqlEventStore {
618 async fn append_event(&self, event: Event) -> Result<(), StorageError> {
619 self.with_writer("append_event", move |conn| {
620 conn.execute_batch("BEGIN IMMEDIATE")?;
621 if let Err(e) = insert_event_with_observations(conn, &event) {
622 let _ = conn.execute_batch("ROLLBACK");
623 return Err(e);
624 }
625 conn.execute_batch("COMMIT")?;
626 Ok(())
627 })
628 .await
629 }
630
631 async fn append_events(&self, events: Vec<Event>) -> Result<BatchWriteSummary, StorageError> {
632 let attempted = events.len() as u64;
633
634 self.with_writer("append_events", move |conn| {
635 conn.execute_batch("BEGIN IMMEDIATE")?;
636 let mut affected = 0u64;
637
638 for event in &events {
639 if let Err(e) = insert_event_with_observations(conn, event) {
640 let _ = conn.execute_batch("ROLLBACK");
641 return Err(e);
642 }
643 affected += 1;
644 }
645
646 conn.execute_batch("COMMIT")?;
647 Ok(BatchWriteSummary {
648 attempted,
649 affected,
650 failed: 0,
651 first_error: String::new(),
652 })
653 })
654 .await
655 }
656
657 async fn get_event(&self, id: Uuid) -> Result<Option<Event>, StorageError> {
658 let namespace = self.namespace.clone();
659 let id_str = id.to_string();
660
661 self.with_reader("get_event", move |conn| {
662 let mut stmt = conn.prepare(
663 "SELECT id, namespace, verb, substrate, actor, kind, outcome, payload, \
664 payload_schema_version, profile_state_version, duration_us, target_id, \
665 session_id, aggregate_kind, aggregate_id, created_at \
666 FROM events WHERE namespace = ?1 AND id = ?2",
667 )?;
668 let mut rows = stmt.query(rusqlite::params![namespace, id_str])?;
669 match rows.next()? {
670 Some(row) => Ok(Some(read_event(row)?)),
671 None => Ok(None),
672 }
673 })
674 .await
675 }
676
677 async fn query_events(
678 &self,
679 filter: EventFilter,
680 page: PageRequest,
681 ) -> Result<Page<Event>, StorageError> {
682 let namespace = self.namespace.clone();
683
684 self.with_reader("query_events", move |conn| {
685 let (where_clause, filter_params) = build_event_filter_sql(conn, &namespace, &filter)?;
686
687 let count_sql = format!("SELECT COUNT(*) FROM events{}", where_clause);
688 let total: i64 = {
689 let mut stmt = conn.prepare(&count_sql)?;
690 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
691 filter_params.iter().map(|p| p.as_ref()).collect();
692 stmt.query_row(param_refs.as_slice(), |row| row.get(0))?
693 };
694
695 let (_, data_filter_params) = build_event_filter_sql(conn, &namespace, &filter)?;
696 let mut all_params: Vec<Box<dyn rusqlite::types::ToSql>> = data_filter_params;
697 all_params.push(Box::new(page.limit as i64));
698 all_params.push(Box::new(page.offset as i64));
699
700 let limit_idx = all_params.len() - 1;
701 let offset_idx = all_params.len();
702
703 let data_sql = format!(
704 "SELECT id, namespace, verb, substrate, actor, kind, outcome, payload, \
705 payload_schema_version, profile_state_version, duration_us, target_id, \
706 session_id, aggregate_kind, aggregate_id, created_at \
707 FROM events{} ORDER BY created_at DESC, id DESC LIMIT ?{} OFFSET ?{}",
708 where_clause, limit_idx, offset_idx,
709 );
710
711 let mut stmt = conn.prepare(&data_sql)?;
712 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
713 all_params.iter().map(|p| p.as_ref()).collect();
714 let rows = stmt.query_map(param_refs.as_slice(), read_event)?;
715
716 let mut items = Vec::new();
717 for row in rows {
718 items.push(row?);
719 }
720
721 Ok(Page {
722 items,
723 total: Some(total as u64),
724 })
725 })
726 .await
727 }
728
729 async fn count_events(&self, filter: EventFilter) -> Result<u64, StorageError> {
730 let namespace = self.namespace.clone();
731
732 self.with_reader("count_events", move |conn| {
733 let (where_clause, params) = build_event_filter_sql(conn, &namespace, &filter)?;
734 let sql = format!("SELECT COUNT(*) FROM events{}", where_clause);
735 let mut stmt = conn.prepare(&sql)?;
736 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
737 params.iter().map(|p| p.as_ref()).collect();
738 let count: i64 = stmt.query_row(param_refs.as_slice(), |row| row.get(0))?;
739 Ok(count as u64)
740 })
741 .await
742 }
743}
744
745const EVENTS_DDL: &str = "\
750 CREATE TABLE IF NOT EXISTS events (\
751 id TEXT PRIMARY KEY,\
752 namespace TEXT NOT NULL,\
753 verb TEXT NOT NULL,\
754 substrate TEXT NOT NULL,\
755 actor TEXT NOT NULL,\
756 kind TEXT NOT NULL DEFAULT 'audit',\
757 outcome TEXT NOT NULL,\
758 payload TEXT NOT NULL DEFAULT '{}',\
759 payload_schema_version INTEGER NOT NULL DEFAULT 1,\
760 profile_state_version INTEGER,\
761 duration_us INTEGER NOT NULL DEFAULT 0,\
762 target_id TEXT,\
763 session_id TEXT,\
764 aggregate_kind TEXT,\
765 aggregate_id TEXT,\
766 created_at INTEGER NOT NULL\
767 );\
768 CREATE TABLE IF NOT EXISTS event_observations (\
769 event_id TEXT NOT NULL,\
770 entity_id TEXT NOT NULL,\
771 referent_kind TEXT NOT NULL,\
772 role TEXT NOT NULL,\
773 position INTEGER NOT NULL,\
774 PRIMARY KEY (event_id, role, position)\
775 );\
776 CREATE INDEX IF NOT EXISTS idx_events_namespace ON events(namespace);\
777 CREATE INDEX IF NOT EXISTS idx_events_verb ON events(verb);\
778 CREATE INDEX IF NOT EXISTS idx_events_kind ON events(kind);\
779 CREATE INDEX IF NOT EXISTS idx_events_substrate ON events(substrate);\
780 CREATE INDEX IF NOT EXISTS idx_events_created ON events(created_at DESC);\
781 CREATE INDEX IF NOT EXISTS idx_events_ns_created_id ON events(namespace, created_at DESC, id DESC);\
782 CREATE INDEX IF NOT EXISTS idx_events_session ON events(namespace, session_id, created_at, id);\
783 CREATE INDEX IF NOT EXISTS idx_events_payload_proposal_id ON events(json_extract(payload, '$.proposal_id'));\
784 CREATE INDEX IF NOT EXISTS idx_event_obs_entity ON event_observations(entity_id, role);\
785 CREATE INDEX IF NOT EXISTS idx_event_obs_event_role ON event_observations(event_id, role);\
786";
787
788pub(crate) fn ensure_events_schema(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
789 conn.execute_batch(EVENTS_DDL)
790}
791
792#[cfg(test)]
793#[path = "event_tests.rs"]
794mod tests;