1use std::sync::Arc;
11
12use async_trait::async_trait;
13use uuid::Uuid;
14
15use khive_storage::error::StorageError;
16use khive_storage::event::{Event, EventFilter, EventObservation, ObservationRole, ReferentKind};
17use khive_storage::types::{BatchWriteSummary, Page, PageRequest};
18use khive_storage::EventStore;
19use khive_storage::StorageCapability;
20use khive_types::{EventKind, EventOutcome, SubstrateKind};
21
22use crate::error::SqliteError;
23use crate::pool::ConnectionPool;
24
25fn map_err(e: rusqlite::Error, op: &'static str) -> StorageError {
26 StorageError::driver(StorageCapability::Events, op, e)
27}
28
29fn map_sqlite_err(e: SqliteError, op: &'static str) -> StorageError {
30 StorageError::driver(StorageCapability::Events, op, e)
31}
32
33pub struct SqlEventStore {
35 pool: Arc<ConnectionPool>,
36 is_file_backed: bool,
37 namespace: String,
38}
39
40impl SqlEventStore {
41 pub fn new_scoped(
43 pool: Arc<ConnectionPool>,
44 is_file_backed: bool,
45 namespace: impl Into<String>,
46 ) -> Self {
47 Self {
48 pool,
49 is_file_backed,
50 namespace: namespace.into(),
51 }
52 }
53
54 fn open_standalone_writer(&self) -> Result<rusqlite::Connection, StorageError> {
55 let config = self.pool.config();
56 let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
57 operation: "event_writer".into(),
58 message: "in-memory databases do not support standalone connections".into(),
59 })?;
60
61 let conn = rusqlite::Connection::open_with_flags(
62 path,
63 rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE
64 | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
65 | rusqlite::OpenFlags::SQLITE_OPEN_URI,
66 )
67 .map_err(|e| map_err(e, "open_event_writer"))?;
68
69 conn.busy_timeout(config.busy_timeout)
70 .map_err(|e| map_err(e, "open_event_writer"))?;
71 conn.pragma_update(None, "foreign_keys", "ON")
72 .map_err(|e| map_err(e, "open_event_writer"))?;
73 conn.pragma_update(None, "synchronous", "NORMAL")
74 .map_err(|e| map_err(e, "open_event_writer"))?;
75
76 Ok(conn)
77 }
78
79 fn open_standalone_reader(&self) -> Result<rusqlite::Connection, StorageError> {
80 let config = self.pool.config();
81 let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
82 operation: "event_reader".into(),
83 message: "in-memory databases do not support standalone connections".into(),
84 })?;
85
86 let conn = rusqlite::Connection::open_with_flags(
87 path,
88 rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
89 | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
90 | rusqlite::OpenFlags::SQLITE_OPEN_URI,
91 )
92 .map_err(|e| map_err(e, "open_event_reader"))?;
93
94 conn.busy_timeout(config.busy_timeout)
95 .map_err(|e| map_err(e, "open_event_reader"))?;
96 conn.pragma_update(None, "foreign_keys", "ON")
97 .map_err(|e| map_err(e, "open_event_reader"))?;
98 conn.pragma_update(None, "synchronous", "NORMAL")
99 .map_err(|e| map_err(e, "open_event_reader"))?;
100
101 Ok(conn)
102 }
103
104 async fn with_writer<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
105 where
106 F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
107 R: Send + 'static,
108 {
109 if self.is_file_backed {
110 let conn = self.open_standalone_writer()?;
111 tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
112 .await
113 .map_err(|e| StorageError::driver(StorageCapability::Events, op, e))?
114 } else {
115 let pool = Arc::clone(&self.pool);
116 tokio::task::spawn_blocking(move || {
117 let guard = pool.try_writer().map_err(|e| map_sqlite_err(e, op))?;
118 f(guard.conn()).map_err(|e| map_err(e, op))
119 })
120 .await
121 .map_err(|e| StorageError::driver(StorageCapability::Events, op, e))?
122 }
123 }
124
125 async fn with_reader<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
126 where
127 F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
128 R: Send + 'static,
129 {
130 if self.is_file_backed {
131 let conn = self.open_standalone_reader()?;
132 tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
133 .await
134 .map_err(|e| StorageError::driver(StorageCapability::Events, op, e))?
135 } else {
136 let pool = Arc::clone(&self.pool);
137 tokio::task::spawn_blocking(move || {
138 let guard = pool.reader().map_err(|e| map_sqlite_err(e, op))?;
139 f(guard.conn()).map_err(|e| map_err(e, op))
140 })
141 .await
142 .map_err(|e| StorageError::driver(StorageCapability::Events, op, e))?
143 }
144 }
145}
146
147fn substrate_from_str(s: &str) -> Result<SubstrateKind, rusqlite::Error> {
152 s.parse::<SubstrateKind>().map_err(|_| {
153 rusqlite::Error::FromSqlConversionFailure(
154 0,
155 rusqlite::types::Type::Text,
156 format!("unknown SubstrateKind: {s}").into(),
157 )
158 })
159}
160
161fn outcome_from_str(s: &str) -> Result<EventOutcome, rusqlite::Error> {
162 match s {
163 "success" => Ok(EventOutcome::Success),
164 "denied" => Ok(EventOutcome::Denied),
165 "error" => Ok(EventOutcome::Error),
166 other => Err(rusqlite::Error::FromSqlConversionFailure(
167 0,
168 rusqlite::types::Type::Text,
169 format!("unknown EventOutcome: {other}").into(),
170 )),
171 }
172}
173
174fn kind_from_str(s: &str) -> Result<EventKind, rusqlite::Error> {
175 s.parse::<EventKind>().map_err(|_| {
176 rusqlite::Error::FromSqlConversionFailure(
177 0,
178 rusqlite::types::Type::Text,
179 format!("unknown EventKind: {s}").into(),
180 )
181 })
182}
183
184fn parse_uuid(s: &str) -> Result<Uuid, rusqlite::Error> {
185 Uuid::parse_str(s).map_err(|e| {
186 rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e))
187 })
188}
189
190fn read_event(row: &rusqlite::Row<'_>) -> Result<Event, rusqlite::Error> {
195 let id_str: String = row.get(0)?;
196 let namespace: String = row.get(1)?;
197 let verb: String = row.get(2)?;
198 let substrate_str: String = row.get(3)?;
199 let actor: String = row.get(4)?;
200 let kind_str: String = row.get(5)?;
201 let outcome_str: String = row.get(6)?;
202 let payload_str: String = row.get(7)?;
203 let payload_schema_version: i64 = row.get(8)?;
204 let profile_state_version: Option<i64> = row.get(9)?;
205 let duration_us: i64 = row.get(10)?;
206 let target_str: Option<String> = row.get(11)?;
207 let session_str: Option<String> = row.get(12)?;
208 let aggregate_kind: Option<String> = row.get(13)?;
209 let aggregate_str: Option<String> = row.get(14)?;
210 let created_at: i64 = row.get(15)?;
211
212 let id = parse_uuid(&id_str)?;
213 let substrate = substrate_from_str(&substrate_str)?;
214 let kind = kind_from_str(&kind_str)?;
215 let outcome = outcome_from_str(&outcome_str)?;
216 let payload: serde_json::Value = serde_json::from_str(&payload_str).map_err(|e| {
217 rusqlite::Error::FromSqlConversionFailure(7, rusqlite::types::Type::Text, Box::new(e))
218 })?;
219 let target_id = target_str.as_deref().map(parse_uuid).transpose()?;
220 let session_id = session_str.as_deref().map(parse_uuid).transpose()?;
221 let aggregate_id = aggregate_str.as_deref().map(parse_uuid).transpose()?;
222 let payload_schema_version_u32: u32 = payload_schema_version.try_into().map_err(|_| {
223 rusqlite::Error::FromSqlConversionFailure(
224 8,
225 rusqlite::types::Type::Integer,
226 format!("payload_schema_version {payload_schema_version} out of u32 range").into(),
227 )
228 })?;
229 let profile_state_version_u64: Option<u64> = profile_state_version
230 .map(|v| {
231 u64::try_from(v).map_err(|_| {
232 rusqlite::Error::FromSqlConversionFailure(
233 9,
234 rusqlite::types::Type::Integer,
235 format!("profile_state_version {v} out of u64 range").into(),
236 )
237 })
238 })
239 .transpose()?;
240
241 Ok(Event {
242 id,
243 namespace,
244 verb,
245 substrate,
246 actor,
247 kind,
248 outcome,
249 payload,
250 payload_schema_version: payload_schema_version_u32,
251 profile_state_version: profile_state_version_u64,
252 duration_us,
253 target_id,
254 session_id,
255 aggregate_kind,
256 aggregate_id,
257 created_at,
258 })
259}
260
261fn insert_event_with_observations(
266 conn: &rusqlite::Connection,
267 event: &Event,
268) -> Result<(), rusqlite::Error> {
269 let id_str = event.id.to_string();
270 let substrate_str = event.substrate.name().to_string();
271 let kind_str = event.kind.name().to_string();
272 let outcome_str = event.outcome.name().to_string();
273 let payload_str = event.payload.to_string();
274 let target_str = event.target_id.map(|u| u.to_string());
275 let session_str = event.session_id.map(|u| u.to_string());
276 let aggregate_str = event.aggregate_id.map(|u| u.to_string());
277 let profile_state_version = event.profile_state_version.map(|v| v as i64);
278
279 conn.execute(
280 "INSERT INTO events \
281 (id, namespace, verb, substrate, actor, kind, outcome, payload, payload_schema_version, \
282 profile_state_version, duration_us, target_id, session_id, aggregate_kind, aggregate_id, created_at) \
283 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
284 rusqlite::params![
285 id_str,
286 &event.namespace,
287 &event.verb,
288 substrate_str,
289 &event.actor,
290 kind_str,
291 outcome_str,
292 payload_str,
293 event.payload_schema_version as i64,
294 profile_state_version,
295 event.duration_us,
296 target_str,
297 session_str,
298 &event.aggregate_kind,
299 aggregate_str,
300 event.created_at,
301 ],
302 )?;
303
304 for observation in decode_event_observations(event)? {
305 conn.execute(
306 "INSERT INTO event_observations \
307 (event_id, entity_id, referent_kind, role, position) \
308 VALUES (?1, ?2, ?3, ?4, ?5)",
309 rusqlite::params![
310 observation.event_id.to_string(),
311 observation.entity_id.to_string(),
312 observation.referent_kind.name(),
313 observation.role.name(),
314 observation.position as i64,
315 ],
316 )?;
317 }
318
319 Ok(())
320}
321
322fn decode_event_observations(event: &Event) -> Result<Vec<EventObservation>, rusqlite::Error> {
323 match event.kind {
324 EventKind::RerankExecuted => decode_rank_observations(event),
325 EventKind::RecallExecuted | EventKind::SearchExecuted => decode_rank_observations(event),
326 EventKind::LinkCreated => decode_link_observations(event),
327 EventKind::EntityCreated
328 | EventKind::EntityUpdated
329 | EventKind::EntityDeleted
330 | EventKind::NoteCreated
331 | EventKind::NoteUpdated
332 | EventKind::NoteDeleted
333 | EventKind::TaskTransitioned => decode_target_observation(event),
334 EventKind::FeedbackExplicit => decode_signal_observation(event),
335 _ => Ok(Vec::new()),
336 }
337}
338
339fn payload_uuid_array(event: &Event, field: &'static str) -> Result<Vec<Uuid>, rusqlite::Error> {
340 let Some(values) = event.payload.get(field) else {
341 return Ok(Vec::new());
342 };
343 let Some(array) = values.as_array() else {
344 return Err(invalid_payload(event.kind, field, "expected array"));
345 };
346
347 array
348 .iter()
349 .map(|value| {
350 value
351 .as_str()
352 .ok_or_else(|| invalid_payload(event.kind, field, "expected UUID string"))
353 .and_then(|s| Uuid::parse_str(s).map_err(|e| invalid_payload(event.kind, field, e)))
354 })
355 .collect()
356}
357
358fn payload_uuid(event: &Event, field: &'static str) -> Result<Option<Uuid>, rusqlite::Error> {
359 let Some(value) = event.payload.get(field) else {
360 return Ok(None);
361 };
362 let Some(s) = value.as_str() else {
363 return Err(invalid_payload(event.kind, field, "expected UUID string"));
364 };
365 Uuid::parse_str(s)
366 .map(Some)
367 .map_err(|e| invalid_payload(event.kind, field, e))
368}
369
370fn decode_rank_observations(event: &Event) -> Result<Vec<EventObservation>, rusqlite::Error> {
371 let mut rows = Vec::new();
372
373 for (position, entity_id) in payload_uuid_array(event, "candidates")?
374 .into_iter()
375 .enumerate()
376 {
377 let position_u32 = u32::try_from(position).map_err(|_| {
378 invalid_payload(
379 event.kind,
380 "candidates[position]",
381 "position out of u32 range",
382 )
383 })?;
384 rows.push(EventObservation {
385 event_id: event.id,
386 entity_id,
387 referent_kind: ReferentKind::Note,
388 role: ObservationRole::Candidate,
389 position: position_u32,
390 });
391 }
392
393 let selected = payload_uuid_array(event, "selected")
394 .or_else(|_| payload_uuid_array(event, "reranked"))
395 .or_else(|_| payload_uuid_array(event, "final_scores"))?;
396 for (position, entity_id) in selected.into_iter().enumerate() {
397 let position_u32 = u32::try_from(position).map_err(|_| {
398 invalid_payload(
399 event.kind,
400 "selected[position]",
401 "position out of u32 range",
402 )
403 })?;
404 rows.push(EventObservation {
405 event_id: event.id,
406 entity_id,
407 referent_kind: ReferentKind::Note,
408 role: ObservationRole::Selected,
409 position: position_u32,
410 });
411 }
412
413 Ok(rows)
414}
415
416fn decode_link_observations(event: &Event) -> Result<Vec<EventObservation>, rusqlite::Error> {
417 let mut rows = Vec::new();
418 if let Some(source) = payload_uuid(event, "source_id")? {
419 rows.push(EventObservation {
420 event_id: event.id,
421 entity_id: source,
422 referent_kind: ReferentKind::Entity,
423 role: ObservationRole::Target,
424 position: 0,
425 });
426 }
427 if let Some(target) = payload_uuid(event, "target_id")? {
428 rows.push(EventObservation {
429 event_id: event.id,
430 entity_id: target,
431 referent_kind: ReferentKind::Entity,
432 role: ObservationRole::Target,
433 position: 1,
434 });
435 }
436 Ok(rows)
437}
438
439fn decode_target_observation(event: &Event) -> Result<Vec<EventObservation>, rusqlite::Error> {
440 let Some(entity_id) = event.target_id.or(payload_uuid(event, "target_id")?) else {
441 return Ok(Vec::new());
442 };
443 Ok(vec![EventObservation {
444 event_id: event.id,
445 entity_id,
446 referent_kind: if event.substrate == SubstrateKind::Note {
447 ReferentKind::Note
448 } else {
449 ReferentKind::Entity
450 },
451 role: ObservationRole::Target,
452 position: 0,
453 }])
454}
455
456fn decode_signal_observation(event: &Event) -> Result<Vec<EventObservation>, rusqlite::Error> {
457 let Some(entity_id) = payload_uuid(event, "about_id")? else {
458 return Ok(Vec::new());
459 };
460 Ok(vec![EventObservation {
461 event_id: event.id,
462 entity_id,
463 referent_kind: ReferentKind::Entity,
464 role: ObservationRole::Signal,
465 position: 0,
466 }])
467}
468
469fn invalid_payload(
470 kind: EventKind,
471 field: &'static str,
472 reason: impl std::fmt::Display,
473) -> rusqlite::Error {
474 rusqlite::Error::ToSqlConversionFailure(
475 format!("invalid payload for {}.{field}: {reason}", kind.name()).into(),
476 )
477}
478
479fn build_event_filter_sql(
484 conn: &rusqlite::Connection,
485 default_namespace: &str,
486 filter: &EventFilter,
487) -> Result<(String, Vec<Box<dyn rusqlite::types::ToSql>>), rusqlite::Error> {
488 reject_missing_event_filter_schema(conn, filter)?;
489
490 let mut conditions: Vec<String> = Vec::new();
491 let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
492
493 params.push(Box::new(default_namespace.to_string()));
494 conditions.push(format!("namespace = ?{}", params.len()));
495
496 push_in_clause(
497 &mut conditions,
498 &mut params,
499 "id",
500 filter.ids.iter().map(Uuid::to_string),
501 );
502 push_in_clause(
503 &mut conditions,
504 &mut params,
505 "kind",
506 filter.kinds.iter().map(|kind| kind.name().to_string()),
507 );
508 push_in_clause(
509 &mut conditions,
510 &mut params,
511 "verb",
512 filter.verbs.iter().cloned(),
513 );
514 push_in_clause(
515 &mut conditions,
516 &mut params,
517 "substrate",
518 filter.substrates.iter().map(|s| s.name().to_string()),
519 );
520 push_in_clause(
521 &mut conditions,
522 &mut params,
523 "actor",
524 filter.actors.iter().cloned(),
525 );
526
527 if let Some(after) = filter.after {
528 params.push(Box::new(after));
529 conditions.push(format!("created_at > ?{}", params.len()));
530 }
531
532 if let Some(before) = filter.before {
533 params.push(Box::new(before));
534 conditions.push(format!("created_at < ?{}", params.len()));
535 }
536
537 if let Some(session_id) = filter.session_id {
538 params.push(Box::new(session_id.to_string()));
539 conditions.push(format!("session_id = ?{}", params.len()));
540 }
541
542 push_observation_exists(&mut conditions, &mut params, "candidate", &filter.observed);
543 push_observation_exists(&mut conditions, &mut params, "selected", &filter.selected);
544
545 if let Some(proposal_id) = filter.payload_proposal_id {
546 params.push(Box::new(proposal_id.to_string()));
547 conditions.push(format!(
548 "json_extract(payload, '$.proposal_id') = ?{}",
549 params.len()
550 ));
551 }
552
553 let clause = format!(" WHERE {}", conditions.join(" AND "));
554 Ok((clause, params))
555}
556
557fn push_in_clause<I>(
558 conditions: &mut Vec<String>,
559 params: &mut Vec<Box<dyn rusqlite::types::ToSql>>,
560 column: &'static str,
561 values: I,
562) where
563 I: IntoIterator<Item = String>,
564{
565 let placeholders: Vec<String> = values
566 .into_iter()
567 .map(|value| {
568 params.push(Box::new(value));
569 format!("?{}", params.len())
570 })
571 .collect();
572 if !placeholders.is_empty() {
573 conditions.push(format!("{column} IN ({})", placeholders.join(",")));
574 }
575}
576
577fn push_observation_exists(
578 conditions: &mut Vec<String>,
579 params: &mut Vec<Box<dyn rusqlite::types::ToSql>>,
580 role: &'static str,
581 entity_ids: &[Uuid],
582) {
583 if entity_ids.is_empty() {
584 return;
585 }
586 let placeholders: Vec<String> = entity_ids
587 .iter()
588 .map(|id| {
589 params.push(Box::new(id.to_string()));
590 format!("?{}", params.len())
591 })
592 .collect();
593 conditions.push(format!(
594 "EXISTS (SELECT 1 FROM event_observations o \
595 WHERE o.event_id = events.id AND o.role = '{role}' AND o.entity_id IN ({}))",
596 placeholders.join(",")
597 ));
598}
599
600fn reject_missing_event_filter_schema(
601 conn: &rusqlite::Connection,
602 filter: &EventFilter,
603) -> Result<(), rusqlite::Error> {
604 if filter.session_id.is_some() && !has_column(conn, "events", "session_id")? {
605 return Err(schema_absent("events.session_id"));
606 }
607 if (!filter.observed.is_empty() || !filter.selected.is_empty())
608 && !has_table(conn, "event_observations")?
609 {
610 return Err(schema_absent("event_observations"));
611 }
612 if filter.payload_proposal_id.is_some() && !has_column(conn, "events", "payload")? {
613 return Err(schema_absent("events.payload"));
614 }
615 Ok(())
616}
617
618fn has_table(conn: &rusqlite::Connection, table: &'static str) -> Result<bool, rusqlite::Error> {
619 conn.query_row(
620 "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type = 'table' AND name = ?1",
621 [table],
622 |row| row.get(0),
623 )
624}
625
626fn has_column(
627 conn: &rusqlite::Connection,
628 table: &'static str,
629 column: &'static str,
630) -> Result<bool, rusqlite::Error> {
631 conn.query_row(
632 "SELECT COUNT(*) > 0 FROM pragma_table_info(?1) WHERE name = ?2",
633 rusqlite::params![table, column],
634 |row| row.get(0),
635 )
636}
637
638fn schema_absent(name: &'static str) -> rusqlite::Error {
639 rusqlite::Error::ToSqlConversionFailure(
640 format!("event filter requires missing schema element {name}; run migrations").into(),
641 )
642}
643
644#[async_trait]
649impl EventStore for SqlEventStore {
650 async fn append_event(&self, event: Event) -> Result<(), StorageError> {
651 self.with_writer("append_event", move |conn| {
652 conn.execute_batch("BEGIN IMMEDIATE")?;
653 if let Err(e) = insert_event_with_observations(conn, &event) {
654 let _ = conn.execute_batch("ROLLBACK");
655 return Err(e);
656 }
657 conn.execute_batch("COMMIT")?;
658 Ok(())
659 })
660 .await
661 }
662
663 async fn append_events(&self, events: Vec<Event>) -> Result<BatchWriteSummary, StorageError> {
664 let attempted = events.len() as u64;
665
666 self.with_writer("append_events", move |conn| {
667 conn.execute_batch("BEGIN IMMEDIATE")?;
668 let mut affected = 0u64;
669
670 for event in &events {
671 if let Err(e) = insert_event_with_observations(conn, event) {
672 let _ = conn.execute_batch("ROLLBACK");
673 return Err(e);
674 }
675 affected += 1;
676 }
677
678 conn.execute_batch("COMMIT")?;
679 Ok(BatchWriteSummary {
680 attempted,
681 affected,
682 failed: 0,
683 first_error: String::new(),
684 })
685 })
686 .await
687 }
688
689 async fn get_event(&self, id: Uuid) -> Result<Option<Event>, StorageError> {
690 let namespace = self.namespace.clone();
691 let id_str = id.to_string();
692
693 self.with_reader("get_event", move |conn| {
694 let mut stmt = conn.prepare(
695 "SELECT id, namespace, verb, substrate, actor, kind, outcome, payload, \
696 payload_schema_version, profile_state_version, duration_us, target_id, \
697 session_id, aggregate_kind, aggregate_id, created_at \
698 FROM events WHERE namespace = ?1 AND id = ?2",
699 )?;
700 let mut rows = stmt.query(rusqlite::params![namespace, id_str])?;
701 match rows.next()? {
702 Some(row) => Ok(Some(read_event(row)?)),
703 None => Ok(None),
704 }
705 })
706 .await
707 }
708
709 async fn query_events(
710 &self,
711 filter: EventFilter,
712 page: PageRequest,
713 ) -> Result<Page<Event>, StorageError> {
714 let namespace = self.namespace.clone();
715
716 self.with_reader("query_events", move |conn| {
717 let (where_clause, filter_params) = build_event_filter_sql(conn, &namespace, &filter)?;
718
719 let count_sql = format!("SELECT COUNT(*) FROM events{}", where_clause);
720 let total: i64 = {
721 let mut stmt = conn.prepare(&count_sql)?;
722 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
723 filter_params.iter().map(|p| p.as_ref()).collect();
724 stmt.query_row(param_refs.as_slice(), |row| row.get(0))?
725 };
726
727 let (_, data_filter_params) = build_event_filter_sql(conn, &namespace, &filter)?;
728 let mut all_params: Vec<Box<dyn rusqlite::types::ToSql>> = data_filter_params;
729 all_params.push(Box::new(page.limit as i64));
730 all_params.push(Box::new(page.offset as i64));
731
732 let limit_idx = all_params.len() - 1;
733 let offset_idx = all_params.len();
734
735 let data_sql = format!(
736 "SELECT id, namespace, verb, substrate, actor, kind, outcome, payload, \
737 payload_schema_version, profile_state_version, duration_us, target_id, \
738 session_id, aggregate_kind, aggregate_id, created_at \
739 FROM events{} ORDER BY created_at DESC, id DESC LIMIT ?{} OFFSET ?{}",
740 where_clause, limit_idx, offset_idx,
741 );
742
743 let mut stmt = conn.prepare(&data_sql)?;
744 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
745 all_params.iter().map(|p| p.as_ref()).collect();
746 let rows = stmt.query_map(param_refs.as_slice(), read_event)?;
747
748 let mut items = Vec::new();
749 for row in rows {
750 items.push(row?);
751 }
752
753 Ok(Page {
754 items,
755 total: Some(total as u64),
756 })
757 })
758 .await
759 }
760
761 async fn count_events(&self, filter: EventFilter) -> Result<u64, StorageError> {
762 let namespace = self.namespace.clone();
763
764 self.with_reader("count_events", move |conn| {
765 let (where_clause, params) = build_event_filter_sql(conn, &namespace, &filter)?;
766 let sql = format!("SELECT COUNT(*) FROM events{}", where_clause);
767 let mut stmt = conn.prepare(&sql)?;
768 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
769 params.iter().map(|p| p.as_ref()).collect();
770 let count: i64 = stmt.query_row(param_refs.as_slice(), |row| row.get(0))?;
771 Ok(count as u64)
772 })
773 .await
774 }
775}
776
777const EVENTS_DDL: &str = include_str!("../../sql/events-ddl.sql");
782
783pub(crate) fn ensure_events_schema(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
784 conn.execute_batch(EVENTS_DDL)
785}
786
787#[cfg(test)]
788#[path = "event_tests.rs"]
789mod tests;