1use std::sync::Arc;
4
5use async_trait::async_trait;
6use uuid::Uuid;
7
8use khive_storage::error::StorageError;
9use khive_storage::event::{Event, EventFilter, EventObservation, ObservationRole, ReferentKind};
10use khive_storage::types::{BatchWriteSummary, Page, PageRequest};
11use khive_storage::EventStore;
12use khive_storage::StorageCapability;
13use khive_types::{EventKind, EventOutcome, SubstrateKind};
14
15use crate::error::SqliteError;
16use crate::pool::ConnectionPool;
17
18fn map_err(e: rusqlite::Error, op: &'static str) -> StorageError {
19 StorageError::driver(StorageCapability::Events, op, e)
20}
21
22fn map_sqlite_err(e: SqliteError, op: &'static str) -> StorageError {
23 StorageError::driver(StorageCapability::Events, op, e)
24}
25
26pub struct SqlEventStore {
28 pool: Arc<ConnectionPool>,
29 is_file_backed: bool,
30 namespace: String,
31}
32
33impl SqlEventStore {
34 pub fn new_scoped(
36 pool: Arc<ConnectionPool>,
37 is_file_backed: bool,
38 namespace: impl Into<String>,
39 ) -> Self {
40 Self {
41 pool,
42 is_file_backed,
43 namespace: namespace.into(),
44 }
45 }
46
47 fn open_standalone_writer(&self) -> Result<rusqlite::Connection, StorageError> {
48 let config = self.pool.config();
49 let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
50 operation: "event_writer".into(),
51 message: "in-memory databases do not support standalone connections".into(),
52 })?;
53
54 let conn = rusqlite::Connection::open_with_flags(
55 path,
56 rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE
57 | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
58 | rusqlite::OpenFlags::SQLITE_OPEN_URI,
59 )
60 .map_err(|e| map_err(e, "open_event_writer"))?;
61
62 conn.busy_timeout(config.busy_timeout)
63 .map_err(|e| map_err(e, "open_event_writer"))?;
64 conn.pragma_update(None, "foreign_keys", "ON")
65 .map_err(|e| map_err(e, "open_event_writer"))?;
66 conn.pragma_update(None, "synchronous", "NORMAL")
67 .map_err(|e| map_err(e, "open_event_writer"))?;
68
69 Ok(conn)
70 }
71
72 fn open_standalone_reader(&self) -> Result<rusqlite::Connection, StorageError> {
73 let config = self.pool.config();
74 let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
75 operation: "event_reader".into(),
76 message: "in-memory databases do not support standalone connections".into(),
77 })?;
78
79 let conn = rusqlite::Connection::open_with_flags(
80 path,
81 rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
82 | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
83 | rusqlite::OpenFlags::SQLITE_OPEN_URI,
84 )
85 .map_err(|e| map_err(e, "open_event_reader"))?;
86
87 conn.busy_timeout(config.busy_timeout)
88 .map_err(|e| map_err(e, "open_event_reader"))?;
89 conn.pragma_update(None, "foreign_keys", "ON")
90 .map_err(|e| map_err(e, "open_event_reader"))?;
91 conn.pragma_update(None, "synchronous", "NORMAL")
92 .map_err(|e| map_err(e, "open_event_reader"))?;
93
94 Ok(conn)
95 }
96
97 async fn with_writer<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
98 where
99 F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
100 R: Send + 'static,
101 {
102 if self.is_file_backed {
103 let conn = self.open_standalone_writer()?;
104 tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
105 .await
106 .map_err(|e| StorageError::driver(StorageCapability::Events, op, e))?
107 } else {
108 let pool = Arc::clone(&self.pool);
109 tokio::task::spawn_blocking(move || {
110 let guard = pool.try_writer().map_err(|e| map_sqlite_err(e, op))?;
111 f(guard.conn()).map_err(|e| map_err(e, op))
112 })
113 .await
114 .map_err(|e| StorageError::driver(StorageCapability::Events, op, e))?
115 }
116 }
117
118 async fn with_reader<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
119 where
120 F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
121 R: Send + 'static,
122 {
123 if self.is_file_backed {
124 let conn = self.open_standalone_reader()?;
125 tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
126 .await
127 .map_err(|e| StorageError::driver(StorageCapability::Events, op, e))?
128 } else {
129 let pool = Arc::clone(&self.pool);
130 tokio::task::spawn_blocking(move || {
131 let guard = pool.reader().map_err(|e| map_sqlite_err(e, op))?;
132 f(guard.conn()).map_err(|e| map_err(e, op))
133 })
134 .await
135 .map_err(|e| StorageError::driver(StorageCapability::Events, op, e))?
136 }
137 }
138}
139
140fn substrate_from_str(s: &str) -> Result<SubstrateKind, rusqlite::Error> {
145 s.parse::<SubstrateKind>().map_err(|_| {
146 rusqlite::Error::FromSqlConversionFailure(
147 0,
148 rusqlite::types::Type::Text,
149 format!("unknown SubstrateKind: {s}").into(),
150 )
151 })
152}
153
154fn outcome_from_str(s: &str) -> Result<EventOutcome, rusqlite::Error> {
155 match s {
156 "success" => Ok(EventOutcome::Success),
157 "denied" => Ok(EventOutcome::Denied),
158 "error" => Ok(EventOutcome::Error),
159 other => Err(rusqlite::Error::FromSqlConversionFailure(
160 0,
161 rusqlite::types::Type::Text,
162 format!("unknown EventOutcome: {other}").into(),
163 )),
164 }
165}
166
167fn kind_from_str(s: &str) -> Result<EventKind, rusqlite::Error> {
168 s.parse::<EventKind>().map_err(|_| {
169 rusqlite::Error::FromSqlConversionFailure(
170 0,
171 rusqlite::types::Type::Text,
172 format!("unknown EventKind: {s}").into(),
173 )
174 })
175}
176
177fn parse_uuid(s: &str) -> Result<Uuid, rusqlite::Error> {
178 Uuid::parse_str(s).map_err(|e| {
179 rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e))
180 })
181}
182
183fn read_event(row: &rusqlite::Row<'_>) -> Result<Event, rusqlite::Error> {
188 let id_str: String = row.get(0)?;
189 let namespace: String = row.get(1)?;
190 let verb: String = row.get(2)?;
191 let substrate_str: String = row.get(3)?;
192 let actor: String = row.get(4)?;
193 let kind_str: String = row.get(5)?;
194 let outcome_str: String = row.get(6)?;
195 let payload_str: String = row.get(7)?;
196 let payload_schema_version: i64 = row.get(8)?;
197 let profile_state_version: Option<i64> = row.get(9)?;
198 let duration_us: i64 = row.get(10)?;
199 let target_str: Option<String> = row.get(11)?;
200 let session_str: Option<String> = row.get(12)?;
201 let aggregate_kind: Option<String> = row.get(13)?;
202 let aggregate_str: Option<String> = row.get(14)?;
203 let created_at: i64 = row.get(15)?;
204
205 let id = parse_uuid(&id_str)?;
206 let substrate = substrate_from_str(&substrate_str)?;
207 let kind = kind_from_str(&kind_str)?;
208 let outcome = outcome_from_str(&outcome_str)?;
209 let payload: serde_json::Value = serde_json::from_str(&payload_str).map_err(|e| {
210 rusqlite::Error::FromSqlConversionFailure(7, rusqlite::types::Type::Text, Box::new(e))
211 })?;
212 let target_id = target_str.as_deref().map(parse_uuid).transpose()?;
213 let session_id = session_str.as_deref().map(parse_uuid).transpose()?;
214 let aggregate_id = aggregate_str.as_deref().map(parse_uuid).transpose()?;
215
216 Ok(Event {
217 id,
218 namespace,
219 verb,
220 substrate,
221 actor,
222 kind,
223 outcome,
224 payload,
225 payload_schema_version: payload_schema_version as u32,
226 profile_state_version: profile_state_version.map(|v| v as u64),
227 duration_us,
228 target_id,
229 session_id,
230 aggregate_kind,
231 aggregate_id,
232 created_at,
233 })
234}
235
236fn insert_event_with_observations(
241 conn: &rusqlite::Connection,
242 event: &Event,
243) -> Result<(), rusqlite::Error> {
244 let id_str = event.id.to_string();
245 let substrate_str = event.substrate.name().to_string();
246 let kind_str = event.kind.name().to_string();
247 let outcome_str = event.outcome.name().to_string();
248 let payload_str = event.payload.to_string();
249 let target_str = event.target_id.map(|u| u.to_string());
250 let session_str = event.session_id.map(|u| u.to_string());
251 let aggregate_str = event.aggregate_id.map(|u| u.to_string());
252 let profile_state_version = event.profile_state_version.map(|v| v as i64);
253
254 conn.execute(
255 "INSERT INTO events \
256 (id, namespace, verb, substrate, actor, kind, outcome, payload, payload_schema_version, \
257 profile_state_version, duration_us, target_id, session_id, aggregate_kind, aggregate_id, created_at) \
258 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
259 rusqlite::params![
260 id_str,
261 &event.namespace,
262 &event.verb,
263 substrate_str,
264 &event.actor,
265 kind_str,
266 outcome_str,
267 payload_str,
268 event.payload_schema_version as i64,
269 profile_state_version,
270 event.duration_us,
271 target_str,
272 session_str,
273 &event.aggregate_kind,
274 aggregate_str,
275 event.created_at,
276 ],
277 )?;
278
279 for observation in decode_event_observations(event)? {
280 conn.execute(
281 "INSERT INTO event_observations \
282 (event_id, entity_id, referent_kind, role, position) \
283 VALUES (?1, ?2, ?3, ?4, ?5)",
284 rusqlite::params![
285 observation.event_id.to_string(),
286 observation.entity_id.to_string(),
287 observation.referent_kind.name(),
288 observation.role.name(),
289 observation.position as i64,
290 ],
291 )?;
292 }
293
294 Ok(())
295}
296
297fn decode_event_observations(event: &Event) -> Result<Vec<EventObservation>, rusqlite::Error> {
298 match event.kind {
299 EventKind::RerankExecuted => decode_rank_observations(event),
300 EventKind::RecallExecuted | EventKind::SearchExecuted => decode_rank_observations(event),
301 EventKind::LinkCreated => decode_link_observations(event),
302 EventKind::EntityCreated
303 | EventKind::EntityUpdated
304 | EventKind::EntityDeleted
305 | EventKind::NoteCreated
306 | EventKind::NoteUpdated
307 | EventKind::NoteDeleted
308 | EventKind::TaskTransitioned => decode_target_observation(event),
309 EventKind::FeedbackExplicit => decode_signal_observation(event),
310 _ => Ok(Vec::new()),
311 }
312}
313
314fn payload_uuid_array(event: &Event, field: &'static str) -> Result<Vec<Uuid>, rusqlite::Error> {
315 let Some(values) = event.payload.get(field) else {
316 return Ok(Vec::new());
317 };
318 let Some(array) = values.as_array() else {
319 return Err(invalid_payload(event.kind, field, "expected array"));
320 };
321
322 array
323 .iter()
324 .map(|value| {
325 value
326 .as_str()
327 .ok_or_else(|| invalid_payload(event.kind, field, "expected UUID string"))
328 .and_then(|s| Uuid::parse_str(s).map_err(|e| invalid_payload(event.kind, field, e)))
329 })
330 .collect()
331}
332
333fn payload_uuid(event: &Event, field: &'static str) -> Result<Option<Uuid>, rusqlite::Error> {
334 let Some(value) = event.payload.get(field) else {
335 return Ok(None);
336 };
337 let Some(s) = value.as_str() else {
338 return Err(invalid_payload(event.kind, field, "expected UUID string"));
339 };
340 Uuid::parse_str(s)
341 .map(Some)
342 .map_err(|e| invalid_payload(event.kind, field, e))
343}
344
345fn decode_rank_observations(event: &Event) -> Result<Vec<EventObservation>, rusqlite::Error> {
346 let mut rows = Vec::new();
347
348 for (position, entity_id) in payload_uuid_array(event, "candidates")?
349 .into_iter()
350 .enumerate()
351 {
352 rows.push(EventObservation {
353 event_id: event.id,
354 entity_id,
355 referent_kind: ReferentKind::Note,
356 role: ObservationRole::Candidate,
357 position: position as u32,
358 });
359 }
360
361 let selected = payload_uuid_array(event, "selected")
362 .or_else(|_| payload_uuid_array(event, "reranked"))
363 .or_else(|_| payload_uuid_array(event, "final_scores"))?;
364 for (position, entity_id) in selected.into_iter().enumerate() {
365 rows.push(EventObservation {
366 event_id: event.id,
367 entity_id,
368 referent_kind: ReferentKind::Note,
369 role: ObservationRole::Selected,
370 position: position as u32,
371 });
372 }
373
374 Ok(rows)
375}
376
377fn decode_link_observations(event: &Event) -> Result<Vec<EventObservation>, rusqlite::Error> {
378 let mut rows = Vec::new();
379 if let Some(source) = payload_uuid(event, "source_id")? {
380 rows.push(EventObservation {
381 event_id: event.id,
382 entity_id: source,
383 referent_kind: ReferentKind::Entity,
384 role: ObservationRole::Target,
385 position: 0,
386 });
387 }
388 if let Some(target) = payload_uuid(event, "target_id")? {
389 rows.push(EventObservation {
390 event_id: event.id,
391 entity_id: target,
392 referent_kind: ReferentKind::Entity,
393 role: ObservationRole::Target,
394 position: 1,
395 });
396 }
397 Ok(rows)
398}
399
400fn decode_target_observation(event: &Event) -> Result<Vec<EventObservation>, rusqlite::Error> {
401 let Some(entity_id) = event.target_id.or(payload_uuid(event, "target_id")?) else {
402 return Ok(Vec::new());
403 };
404 Ok(vec![EventObservation {
405 event_id: event.id,
406 entity_id,
407 referent_kind: if event.substrate == SubstrateKind::Note {
408 ReferentKind::Note
409 } else {
410 ReferentKind::Entity
411 },
412 role: ObservationRole::Target,
413 position: 0,
414 }])
415}
416
417fn decode_signal_observation(event: &Event) -> Result<Vec<EventObservation>, rusqlite::Error> {
418 let Some(entity_id) = payload_uuid(event, "about_id")? else {
419 return Ok(Vec::new());
420 };
421 Ok(vec![EventObservation {
422 event_id: event.id,
423 entity_id,
424 referent_kind: ReferentKind::Entity,
425 role: ObservationRole::Signal,
426 position: 0,
427 }])
428}
429
430fn invalid_payload(
431 kind: EventKind,
432 field: &'static str,
433 reason: impl std::fmt::Display,
434) -> rusqlite::Error {
435 rusqlite::Error::ToSqlConversionFailure(
436 format!("invalid payload for {}.{field}: {reason}", kind.name()).into(),
437 )
438}
439
440fn build_event_filter_sql(
445 conn: &rusqlite::Connection,
446 default_namespace: &str,
447 filter: &EventFilter,
448) -> Result<(String, Vec<Box<dyn rusqlite::types::ToSql>>), rusqlite::Error> {
449 reject_missing_event_filter_schema(conn, filter)?;
450
451 let mut conditions: Vec<String> = Vec::new();
452 let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
453
454 params.push(Box::new(default_namespace.to_string()));
455 conditions.push(format!("namespace = ?{}", params.len()));
456
457 push_in_clause(
458 &mut conditions,
459 &mut params,
460 "id",
461 filter.ids.iter().map(Uuid::to_string),
462 );
463 push_in_clause(
464 &mut conditions,
465 &mut params,
466 "kind",
467 filter.kinds.iter().map(|kind| kind.name().to_string()),
468 );
469 push_in_clause(
470 &mut conditions,
471 &mut params,
472 "verb",
473 filter.verbs.iter().cloned(),
474 );
475 push_in_clause(
476 &mut conditions,
477 &mut params,
478 "substrate",
479 filter.substrates.iter().map(|s| s.name().to_string()),
480 );
481 push_in_clause(
482 &mut conditions,
483 &mut params,
484 "actor",
485 filter.actors.iter().cloned(),
486 );
487
488 if let Some(after) = filter.after {
489 params.push(Box::new(after));
490 conditions.push(format!("created_at > ?{}", params.len()));
491 }
492
493 if let Some(before) = filter.before {
494 params.push(Box::new(before));
495 conditions.push(format!("created_at < ?{}", params.len()));
496 }
497
498 if let Some(session_id) = filter.session_id {
499 params.push(Box::new(session_id.to_string()));
500 conditions.push(format!("session_id = ?{}", params.len()));
501 }
502
503 push_observation_exists(&mut conditions, &mut params, "candidate", &filter.observed);
504 push_observation_exists(&mut conditions, &mut params, "selected", &filter.selected);
505
506 if let Some(proposal_id) = filter.payload_proposal_id {
507 params.push(Box::new(proposal_id.to_string()));
508 conditions.push(format!(
509 "json_extract(payload, '$.proposal_id') = ?{}",
510 params.len()
511 ));
512 }
513
514 let clause = format!(" WHERE {}", conditions.join(" AND "));
515 Ok((clause, params))
516}
517
518fn push_in_clause<I>(
519 conditions: &mut Vec<String>,
520 params: &mut Vec<Box<dyn rusqlite::types::ToSql>>,
521 column: &'static str,
522 values: I,
523) where
524 I: IntoIterator<Item = String>,
525{
526 let placeholders: Vec<String> = values
527 .into_iter()
528 .map(|value| {
529 params.push(Box::new(value));
530 format!("?{}", params.len())
531 })
532 .collect();
533 if !placeholders.is_empty() {
534 conditions.push(format!("{column} IN ({})", placeholders.join(",")));
535 }
536}
537
538fn push_observation_exists(
539 conditions: &mut Vec<String>,
540 params: &mut Vec<Box<dyn rusqlite::types::ToSql>>,
541 role: &'static str,
542 entity_ids: &[Uuid],
543) {
544 if entity_ids.is_empty() {
545 return;
546 }
547 let placeholders: Vec<String> = entity_ids
548 .iter()
549 .map(|id| {
550 params.push(Box::new(id.to_string()));
551 format!("?{}", params.len())
552 })
553 .collect();
554 conditions.push(format!(
555 "EXISTS (SELECT 1 FROM event_observations o \
556 WHERE o.event_id = events.id AND o.role = '{role}' AND o.entity_id IN ({}))",
557 placeholders.join(",")
558 ));
559}
560
561fn reject_missing_event_filter_schema(
562 conn: &rusqlite::Connection,
563 filter: &EventFilter,
564) -> Result<(), rusqlite::Error> {
565 if filter.session_id.is_some() && !has_column(conn, "events", "session_id")? {
566 return Err(schema_absent("events.session_id"));
567 }
568 if (!filter.observed.is_empty() || !filter.selected.is_empty())
569 && !has_table(conn, "event_observations")?
570 {
571 return Err(schema_absent("event_observations"));
572 }
573 if filter.payload_proposal_id.is_some() && !has_column(conn, "events", "payload")? {
574 return Err(schema_absent("events.payload"));
575 }
576 Ok(())
577}
578
579fn has_table(conn: &rusqlite::Connection, table: &'static str) -> Result<bool, rusqlite::Error> {
580 conn.query_row(
581 "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type = 'table' AND name = ?1",
582 [table],
583 |row| row.get(0),
584 )
585}
586
587fn has_column(
588 conn: &rusqlite::Connection,
589 table: &'static str,
590 column: &'static str,
591) -> Result<bool, rusqlite::Error> {
592 conn.query_row(
593 "SELECT COUNT(*) > 0 FROM pragma_table_info(?1) WHERE name = ?2",
594 rusqlite::params![table, column],
595 |row| row.get(0),
596 )
597}
598
599fn schema_absent(name: &'static str) -> rusqlite::Error {
600 rusqlite::Error::ToSqlConversionFailure(
601 format!("event filter requires missing schema element {name}; run migrations").into(),
602 )
603}
604
605#[async_trait]
610impl EventStore for SqlEventStore {
611 async fn append_event(&self, event: Event) -> Result<(), StorageError> {
612 self.with_writer("append_event", move |conn| {
613 conn.execute_batch("BEGIN IMMEDIATE")?;
614 if let Err(e) = insert_event_with_observations(conn, &event) {
615 let _ = conn.execute_batch("ROLLBACK");
616 return Err(e);
617 }
618 conn.execute_batch("COMMIT")?;
619 Ok(())
620 })
621 .await
622 }
623
624 async fn append_events(&self, events: Vec<Event>) -> Result<BatchWriteSummary, StorageError> {
625 let attempted = events.len() as u64;
626
627 self.with_writer("append_events", move |conn| {
628 conn.execute_batch("BEGIN IMMEDIATE")?;
629 let mut affected = 0u64;
630
631 for event in &events {
632 if let Err(e) = insert_event_with_observations(conn, event) {
633 let _ = conn.execute_batch("ROLLBACK");
634 return Err(e);
635 }
636 affected += 1;
637 }
638
639 conn.execute_batch("COMMIT")?;
640 Ok(BatchWriteSummary {
641 attempted,
642 affected,
643 failed: 0,
644 first_error: String::new(),
645 })
646 })
647 .await
648 }
649
650 async fn get_event(&self, id: Uuid) -> Result<Option<Event>, StorageError> {
651 let namespace = self.namespace.clone();
652 let id_str = id.to_string();
653
654 self.with_reader("get_event", move |conn| {
655 let mut stmt = conn.prepare(
656 "SELECT id, namespace, verb, substrate, actor, kind, outcome, payload, \
657 payload_schema_version, profile_state_version, duration_us, target_id, \
658 session_id, aggregate_kind, aggregate_id, created_at \
659 FROM events WHERE namespace = ?1 AND id = ?2",
660 )?;
661 let mut rows = stmt.query(rusqlite::params![namespace, id_str])?;
662 match rows.next()? {
663 Some(row) => Ok(Some(read_event(row)?)),
664 None => Ok(None),
665 }
666 })
667 .await
668 }
669
670 async fn query_events(
671 &self,
672 filter: EventFilter,
673 page: PageRequest,
674 ) -> Result<Page<Event>, StorageError> {
675 let namespace = self.namespace.clone();
676
677 self.with_reader("query_events", move |conn| {
678 let (where_clause, filter_params) = build_event_filter_sql(conn, &namespace, &filter)?;
679
680 let count_sql = format!("SELECT COUNT(*) FROM events{}", where_clause);
681 let total: i64 = {
682 let mut stmt = conn.prepare(&count_sql)?;
683 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
684 filter_params.iter().map(|p| p.as_ref()).collect();
685 stmt.query_row(param_refs.as_slice(), |row| row.get(0))?
686 };
687
688 let (_, data_filter_params) = build_event_filter_sql(conn, &namespace, &filter)?;
689 let mut all_params: Vec<Box<dyn rusqlite::types::ToSql>> = data_filter_params;
690 all_params.push(Box::new(page.limit as i64));
691 all_params.push(Box::new(page.offset as i64));
692
693 let limit_idx = all_params.len() - 1;
694 let offset_idx = all_params.len();
695
696 let data_sql = format!(
697 "SELECT id, namespace, verb, substrate, actor, kind, outcome, payload, \
698 payload_schema_version, profile_state_version, duration_us, target_id, \
699 session_id, aggregate_kind, aggregate_id, created_at \
700 FROM events{} ORDER BY created_at DESC, id DESC LIMIT ?{} OFFSET ?{}",
701 where_clause, limit_idx, offset_idx,
702 );
703
704 let mut stmt = conn.prepare(&data_sql)?;
705 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
706 all_params.iter().map(|p| p.as_ref()).collect();
707 let rows = stmt.query_map(param_refs.as_slice(), read_event)?;
708
709 let mut items = Vec::new();
710 for row in rows {
711 items.push(row?);
712 }
713
714 Ok(Page {
715 items,
716 total: Some(total as u64),
717 })
718 })
719 .await
720 }
721
722 async fn count_events(&self, filter: EventFilter) -> Result<u64, StorageError> {
723 let namespace = self.namespace.clone();
724
725 self.with_reader("count_events", move |conn| {
726 let (where_clause, params) = build_event_filter_sql(conn, &namespace, &filter)?;
727 let sql = format!("SELECT COUNT(*) FROM events{}", where_clause);
728 let mut stmt = conn.prepare(&sql)?;
729 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
730 params.iter().map(|p| p.as_ref()).collect();
731 let count: i64 = stmt.query_row(param_refs.as_slice(), |row| row.get(0))?;
732 Ok(count as u64)
733 })
734 .await
735 }
736}
737
738const EVENTS_DDL: &str = "\
743 CREATE TABLE IF NOT EXISTS events (\
744 id TEXT PRIMARY KEY,\
745 namespace TEXT NOT NULL,\
746 verb TEXT NOT NULL,\
747 substrate TEXT NOT NULL,\
748 actor TEXT NOT NULL,\
749 kind TEXT NOT NULL DEFAULT 'audit',\
750 outcome TEXT NOT NULL,\
751 payload TEXT NOT NULL DEFAULT '{}',\
752 payload_schema_version INTEGER NOT NULL DEFAULT 1,\
753 profile_state_version INTEGER,\
754 duration_us INTEGER NOT NULL DEFAULT 0,\
755 target_id TEXT,\
756 session_id TEXT,\
757 aggregate_kind TEXT,\
758 aggregate_id TEXT,\
759 created_at INTEGER NOT NULL\
760 );\
761 CREATE TABLE IF NOT EXISTS event_observations (\
762 event_id TEXT NOT NULL,\
763 entity_id TEXT NOT NULL,\
764 referent_kind TEXT NOT NULL,\
765 role TEXT NOT NULL,\
766 position INTEGER NOT NULL,\
767 PRIMARY KEY (event_id, role, position)\
768 );\
769 CREATE INDEX IF NOT EXISTS idx_events_namespace ON events(namespace);\
770 CREATE INDEX IF NOT EXISTS idx_events_verb ON events(verb);\
771 CREATE INDEX IF NOT EXISTS idx_events_kind ON events(kind);\
772 CREATE INDEX IF NOT EXISTS idx_events_substrate ON events(substrate);\
773 CREATE INDEX IF NOT EXISTS idx_events_created ON events(created_at DESC);\
774 CREATE INDEX IF NOT EXISTS idx_events_ns_created_id ON events(namespace, created_at DESC, id DESC);\
775 CREATE INDEX IF NOT EXISTS idx_events_session ON events(namespace, session_id, created_at, id);\
776 CREATE INDEX IF NOT EXISTS idx_events_payload_proposal_id ON events(json_extract(payload, '$.proposal_id'));\
777 CREATE INDEX IF NOT EXISTS idx_event_obs_entity ON event_observations(entity_id, role);\
778 CREATE INDEX IF NOT EXISTS idx_event_obs_event_role ON event_observations(event_id, role);\
779";
780
781pub(crate) fn ensure_events_schema(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
782 conn.execute_batch(EVENTS_DDL)
783}
784
785#[cfg(test)]
786mod tests {
787 use super::*;
788 use crate::pool::PoolConfig;
789 use serde_json::json;
790
791 fn setup_memory_store() -> SqlEventStore {
792 let config = PoolConfig {
793 path: None,
794 ..PoolConfig::default()
795 };
796 let pool = Arc::new(ConnectionPool::new(config).unwrap());
797
798 {
799 let writer = pool.writer().unwrap();
800 writer.conn().execute_batch(EVENTS_DDL).unwrap();
801 }
802
803 SqlEventStore::new_scoped(pool, false, "default")
804 }
805
806 fn make_event(namespace: &str) -> Event {
807 Event::new(
808 namespace,
809 "search",
810 EventKind::SearchExecuted,
811 SubstrateKind::Note,
812 "agent:test",
813 )
814 }
815
816 #[tokio::test]
817 async fn test_append_and_get_event() {
818 let store = setup_memory_store();
819
820 let event = make_event("default");
821 let id = event.id;
822
823 store.append_event(event).await.unwrap();
824
825 let fetched = store.get_event(id).await.unwrap();
826 assert!(fetched.is_some());
827 let fetched = fetched.unwrap();
828 assert_eq!(fetched.id, id);
829 assert_eq!(fetched.verb, "search");
830 assert_eq!(fetched.substrate, SubstrateKind::Note);
831 assert_eq!(fetched.actor, "agent:test");
832 assert_eq!(fetched.outcome, EventOutcome::Success);
833 }
834
835 #[tokio::test]
836 async fn test_append_events_batch() {
837 let store = setup_memory_store();
838
839 let events: Vec<Event> = (0..3).map(|_| make_event("default")).collect();
840 let summary = store.append_events(events).await.unwrap();
841 assert_eq!(summary.attempted, 3);
842 assert_eq!(summary.affected, 3);
843 assert_eq!(summary.failed, 0);
844 }
845
846 #[tokio::test]
847 async fn test_count_events() {
848 let store = setup_memory_store();
849
850 for _ in 0..3 {
851 store.append_event(make_event("default")).await.unwrap();
852 }
853
854 let count = store.count_events(EventFilter::default()).await.unwrap();
855 assert_eq!(count, 3);
856 }
857
858 #[tokio::test]
859 async fn test_query_events_filter_by_verb() {
860 let store = setup_memory_store();
861
862 store.append_event(make_event("default")).await.unwrap();
863
864 let mut create_event = make_event("default");
865 create_event.verb = "create".to_string();
866 store.append_event(create_event).await.unwrap();
867
868 let filter = EventFilter {
869 verbs: vec!["search".to_string()],
870 ..EventFilter::default()
871 };
872 let page = store
873 .query_events(
874 filter,
875 PageRequest {
876 limit: 10,
877 offset: 0,
878 },
879 )
880 .await
881 .unwrap();
882 assert_eq!(page.items.len(), 1);
883 assert_eq!(page.items[0].verb, "search");
884 }
885
886 #[tokio::test]
887 async fn test_query_events_filter_by_substrate() {
888 let store = setup_memory_store();
889
890 store.append_event(make_event("default")).await.unwrap();
891
892 let mut entity_event = make_event("default");
893 entity_event.substrate = SubstrateKind::Entity;
894 store.append_event(entity_event).await.unwrap();
895
896 let filter = EventFilter {
897 substrates: vec![SubstrateKind::Entity],
898 ..EventFilter::default()
899 };
900 let page = store
901 .query_events(
902 filter,
903 PageRequest {
904 limit: 10,
905 offset: 0,
906 },
907 )
908 .await
909 .unwrap();
910 assert_eq!(page.items.len(), 1);
911 assert_eq!(page.items[0].substrate, SubstrateKind::Entity);
912 }
913
914 #[tokio::test]
915 async fn test_outcome_roundtrip() {
916 let store = setup_memory_store();
917
918 let mut denied = make_event("default");
919 denied.outcome = EventOutcome::Denied;
920 let denied_id = denied.id;
921 store.append_event(denied).await.unwrap();
922
923 let fetched = store.get_event(denied_id).await.unwrap().unwrap();
924 assert_eq!(fetched.outcome, EventOutcome::Denied);
925 }
926
927 #[tokio::test]
928 async fn append_event_writes_observations_atomically() {
929 let store = setup_memory_store();
930 let candidate = Uuid::new_v4();
931 let selected = Uuid::new_v4();
932 let mut event = make_event("default");
933 event.kind = EventKind::RerankExecuted;
934 event.payload = json!({
935 "candidates": [candidate.to_string()],
936 "selected": [selected.to_string()],
937 "served_by_profile_id": "profile-a"
938 });
939 let event_id = event.id;
940
941 store.append_event(event).await.unwrap();
942
943 let fetched = store.get_event(event_id).await.unwrap();
945 assert!(fetched.is_some());
946
947 let pool = Arc::clone(&store.pool);
949 let event_id_str = event_id.to_string();
950 let (candidate_count, selected_count) = tokio::task::spawn_blocking(move || {
951 let guard = pool.reader().unwrap();
952 let conn = guard.conn();
953 let c: i64 = conn
954 .query_row(
955 "SELECT COUNT(*) FROM event_observations WHERE event_id = ?1 AND role = 'candidate'",
956 [&event_id_str],
957 |r| r.get(0),
958 )
959 .unwrap();
960 let s: i64 = conn
961 .query_row(
962 "SELECT COUNT(*) FROM event_observations WHERE event_id = ?1 AND role = 'selected'",
963 [&event_id_str],
964 |r| r.get(0),
965 )
966 .unwrap();
967 (c, s)
968 })
969 .await
970 .unwrap();
971
972 assert_eq!(candidate_count, 1, "expected one candidate observation row");
973 assert_eq!(selected_count, 1, "expected one selected observation row");
974 }
975
976 #[tokio::test]
977 async fn invalid_projection_payload_aborts_event_insert() {
978 let store = setup_memory_store();
979 let mut event = make_event("default");
980 event.kind = EventKind::RerankExecuted;
981 event.payload = json!({ "candidates": "not-array" });
983 let event_id = event.id;
984
985 let result = store.append_event(event).await;
986 assert!(result.is_err(), "invalid payload must return Err");
987
988 let fetched = store.get_event(event_id).await.unwrap();
990 assert!(fetched.is_none(), "event row must not exist after rollback");
991 }
992
993 #[tokio::test]
994 async fn query_events_orders_by_created_at_then_id_desc() {
995 let store = setup_memory_store();
996
997 let ts = chrono::Utc::now().timestamp_micros();
998 let id_low = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap();
999 let id_high = Uuid::parse_str("ffffffff-ffff-ffff-ffff-ffffffffffff").unwrap();
1000
1001 let pool = Arc::clone(&store.pool);
1003 tokio::task::spawn_blocking(move || {
1004 let guard = pool.try_writer().unwrap();
1005 let conn = guard.conn();
1006 conn.execute_batch("BEGIN IMMEDIATE").unwrap();
1007 for id in [id_low, id_high] {
1008 conn.execute(
1009 "INSERT INTO events \
1010 (id, namespace, verb, substrate, actor, kind, outcome, payload, \
1011 payload_schema_version, duration_us, created_at) \
1012 VALUES (?1, 'default', 'search', 'note', 'test', 'audit', 'success', '{}', 1, 0, ?2)",
1013 rusqlite::params![id.to_string(), ts],
1014 )
1015 .unwrap();
1016 }
1017 conn.execute_batch("COMMIT").unwrap();
1018 })
1019 .await
1020 .unwrap();
1021
1022 let page = store
1023 .query_events(
1024 EventFilter::default(),
1025 PageRequest {
1026 limit: 10,
1027 offset: 0,
1028 },
1029 )
1030 .await
1031 .unwrap();
1032
1033 assert_eq!(page.items.len(), 2);
1034 assert_eq!(
1035 page.items[0].id, id_high,
1036 "higher UUID must come first (id DESC tiebreaker)"
1037 );
1038 assert_eq!(page.items[1].id, id_low);
1039 }
1040
1041 #[tokio::test]
1042 async fn query_events_filters_by_kind() {
1043 let store = setup_memory_store();
1044 store.append_event(make_event("default")).await.unwrap();
1045 let mut recall_event = make_event("default");
1046 recall_event.kind = EventKind::RecallExecuted;
1047 store.append_event(recall_event).await.unwrap();
1048
1049 let filter = EventFilter {
1050 kinds: vec![EventKind::RecallExecuted],
1051 ..EventFilter::default()
1052 };
1053 let page = store
1054 .query_events(
1055 filter,
1056 PageRequest {
1057 limit: 10,
1058 offset: 0,
1059 },
1060 )
1061 .await
1062 .unwrap();
1063 assert_eq!(page.items.len(), 1);
1064 assert_eq!(page.items[0].kind, EventKind::RecallExecuted);
1065 }
1066
1067 #[tokio::test]
1068 async fn query_events_filters_by_session_id() {
1069 let store = setup_memory_store();
1070 let session = Uuid::new_v4();
1071 let mut event = make_event("default");
1072 event.session_id = Some(session);
1073 store.append_event(event).await.unwrap();
1074 store.append_event(make_event("default")).await.unwrap();
1075
1076 let filter = EventFilter {
1077 session_id: Some(session),
1078 ..EventFilter::default()
1079 };
1080 let page = store
1081 .query_events(
1082 filter,
1083 PageRequest {
1084 limit: 10,
1085 offset: 0,
1086 },
1087 )
1088 .await
1089 .unwrap();
1090 assert_eq!(page.items.len(), 1);
1091 assert_eq!(page.items[0].session_id, Some(session));
1092 }
1093
1094 #[tokio::test]
1095 async fn query_events_filters_by_observed() {
1096 let store = setup_memory_store();
1097 let entity_id = Uuid::new_v4();
1098 let mut event = make_event("default");
1099 event.kind = EventKind::RerankExecuted;
1100 event.payload = json!({
1101 "candidates": [entity_id.to_string()],
1102 "selected": []
1103 });
1104 store.append_event(event).await.unwrap();
1105 store.append_event(make_event("default")).await.unwrap();
1106
1107 let filter = EventFilter {
1108 observed: vec![entity_id],
1109 ..EventFilter::default()
1110 };
1111 let page = store
1112 .query_events(
1113 filter,
1114 PageRequest {
1115 limit: 10,
1116 offset: 0,
1117 },
1118 )
1119 .await
1120 .unwrap();
1121 assert_eq!(page.items.len(), 1);
1122 }
1123
1124 #[tokio::test]
1125 async fn query_events_filters_by_selected() {
1126 let store = setup_memory_store();
1127 let entity_id = Uuid::new_v4();
1128 let mut event = make_event("default");
1129 event.kind = EventKind::RerankExecuted;
1130 event.payload = json!({
1131 "candidates": [],
1132 "selected": [entity_id.to_string()]
1133 });
1134 store.append_event(event).await.unwrap();
1135 store.append_event(make_event("default")).await.unwrap();
1136
1137 let filter = EventFilter {
1138 selected: vec![entity_id],
1139 ..EventFilter::default()
1140 };
1141 let page = store
1142 .query_events(
1143 filter,
1144 PageRequest {
1145 limit: 10,
1146 offset: 0,
1147 },
1148 )
1149 .await
1150 .unwrap();
1151 assert_eq!(page.items.len(), 1);
1152 }
1153
1154 #[tokio::test]
1155 async fn query_events_filters_by_payload_proposal_id() {
1156 let store = setup_memory_store();
1157 let proposal_id = Uuid::new_v4();
1158 let mut event = make_event("default");
1159 event.kind = EventKind::ProposalCreated;
1160 event.payload = json!({ "proposal_id": proposal_id.to_string() });
1161 store.append_event(event).await.unwrap();
1162 store.append_event(make_event("default")).await.unwrap();
1163
1164 let filter = EventFilter {
1165 payload_proposal_id: Some(proposal_id),
1166 ..EventFilter::default()
1167 };
1168 let page = store
1169 .query_events(
1170 filter,
1171 PageRequest {
1172 limit: 10,
1173 offset: 0,
1174 },
1175 )
1176 .await
1177 .unwrap();
1178 assert_eq!(page.items.len(), 1);
1179 }
1180
1181 #[tokio::test]
1182 async fn query_events_observed_filter_missing_projection_returns_clean_error() {
1183 let config = PoolConfig {
1185 path: None,
1186 ..PoolConfig::default()
1187 };
1188 let pool = Arc::new(ConnectionPool::new(config).unwrap());
1189 {
1190 let writer = pool.writer().unwrap();
1191 writer.conn().execute_batch(
1193 "CREATE TABLE IF NOT EXISTS events (\
1194 id TEXT PRIMARY KEY, namespace TEXT NOT NULL, verb TEXT NOT NULL,\
1195 substrate TEXT NOT NULL, actor TEXT NOT NULL, kind TEXT NOT NULL DEFAULT 'audit',\
1196 outcome TEXT NOT NULL, payload TEXT NOT NULL DEFAULT '{}',\
1197 payload_schema_version INTEGER NOT NULL DEFAULT 1,\
1198 duration_us INTEGER NOT NULL DEFAULT 0, created_at INTEGER NOT NULL\
1199 );"
1200 ).unwrap();
1201 }
1202 let store = SqlEventStore::new_scoped(pool, false, "default");
1203
1204 let filter = EventFilter {
1205 observed: vec![Uuid::new_v4()],
1206 ..EventFilter::default()
1207 };
1208 let result = store
1209 .query_events(
1210 filter,
1211 PageRequest {
1212 limit: 10,
1213 offset: 0,
1214 },
1215 )
1216 .await;
1217 assert!(result.is_err());
1218 let err_msg = result.unwrap_err().to_string();
1219 assert!(
1220 err_msg.contains("event_observations") && err_msg.contains("run migrations"),
1221 "error should mention event_observations and run migrations, got: {err_msg}"
1222 );
1223 }
1224}