use std::sync::Arc;
use async_trait::async_trait;
use uuid::Uuid;
use khive_storage::error::StorageError;
use khive_storage::event::{Event, EventFilter, EventObservation, ObservationRole, ReferentKind};
use khive_storage::types::{BatchWriteSummary, Page, PageRequest};
use khive_storage::EventStore;
use khive_storage::StorageCapability;
use khive_types::{EventKind, EventOutcome, SubstrateKind};
use crate::error::SqliteError;
use crate::pool::ConnectionPool;
fn map_err(e: rusqlite::Error, op: &'static str) -> StorageError {
StorageError::driver(StorageCapability::Events, op, e)
}
fn map_sqlite_err(e: SqliteError, op: &'static str) -> StorageError {
StorageError::driver(StorageCapability::Events, op, e)
}
pub struct SqlEventStore {
pool: Arc<ConnectionPool>,
is_file_backed: bool,
namespace: String,
}
impl SqlEventStore {
pub fn new_scoped(
pool: Arc<ConnectionPool>,
is_file_backed: bool,
namespace: impl Into<String>,
) -> Self {
Self {
pool,
is_file_backed,
namespace: namespace.into(),
}
}
fn open_standalone_writer(&self) -> Result<rusqlite::Connection, StorageError> {
let config = self.pool.config();
let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
operation: "event_writer".into(),
message: "in-memory databases do not support standalone connections".into(),
})?;
let conn = rusqlite::Connection::open_with_flags(
path,
rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE
| rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
| rusqlite::OpenFlags::SQLITE_OPEN_URI,
)
.map_err(|e| map_err(e, "open_event_writer"))?;
conn.busy_timeout(config.busy_timeout)
.map_err(|e| map_err(e, "open_event_writer"))?;
conn.pragma_update(None, "foreign_keys", "ON")
.map_err(|e| map_err(e, "open_event_writer"))?;
conn.pragma_update(None, "synchronous", "NORMAL")
.map_err(|e| map_err(e, "open_event_writer"))?;
Ok(conn)
}
fn open_standalone_reader(&self) -> Result<rusqlite::Connection, StorageError> {
let config = self.pool.config();
let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
operation: "event_reader".into(),
message: "in-memory databases do not support standalone connections".into(),
})?;
let conn = rusqlite::Connection::open_with_flags(
path,
rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
| rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
| rusqlite::OpenFlags::SQLITE_OPEN_URI,
)
.map_err(|e| map_err(e, "open_event_reader"))?;
conn.busy_timeout(config.busy_timeout)
.map_err(|e| map_err(e, "open_event_reader"))?;
conn.pragma_update(None, "foreign_keys", "ON")
.map_err(|e| map_err(e, "open_event_reader"))?;
conn.pragma_update(None, "synchronous", "NORMAL")
.map_err(|e| map_err(e, "open_event_reader"))?;
Ok(conn)
}
async fn with_writer<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
where
F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
R: Send + 'static,
{
if self.is_file_backed {
let conn = self.open_standalone_writer()?;
tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
.await
.map_err(|e| StorageError::driver(StorageCapability::Events, op, e))?
} else {
let pool = Arc::clone(&self.pool);
tokio::task::spawn_blocking(move || {
let guard = pool.try_writer().map_err(|e| map_sqlite_err(e, op))?;
f(guard.conn()).map_err(|e| map_err(e, op))
})
.await
.map_err(|e| StorageError::driver(StorageCapability::Events, op, e))?
}
}
async fn with_reader<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
where
F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
R: Send + 'static,
{
if self.is_file_backed {
let conn = self.open_standalone_reader()?;
tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
.await
.map_err(|e| StorageError::driver(StorageCapability::Events, op, e))?
} else {
let pool = Arc::clone(&self.pool);
tokio::task::spawn_blocking(move || {
let guard = pool.reader().map_err(|e| map_sqlite_err(e, op))?;
f(guard.conn()).map_err(|e| map_err(e, op))
})
.await
.map_err(|e| StorageError::driver(StorageCapability::Events, op, e))?
}
}
}
fn substrate_from_str(s: &str) -> Result<SubstrateKind, rusqlite::Error> {
s.parse::<SubstrateKind>().map_err(|_| {
rusqlite::Error::FromSqlConversionFailure(
0,
rusqlite::types::Type::Text,
format!("unknown SubstrateKind: {s}").into(),
)
})
}
fn outcome_from_str(s: &str) -> Result<EventOutcome, rusqlite::Error> {
match s {
"success" => Ok(EventOutcome::Success),
"denied" => Ok(EventOutcome::Denied),
"error" => Ok(EventOutcome::Error),
other => Err(rusqlite::Error::FromSqlConversionFailure(
0,
rusqlite::types::Type::Text,
format!("unknown EventOutcome: {other}").into(),
)),
}
}
fn kind_from_str(s: &str) -> Result<EventKind, rusqlite::Error> {
s.parse::<EventKind>().map_err(|_| {
rusqlite::Error::FromSqlConversionFailure(
0,
rusqlite::types::Type::Text,
format!("unknown EventKind: {s}").into(),
)
})
}
fn parse_uuid(s: &str) -> Result<Uuid, rusqlite::Error> {
Uuid::parse_str(s).map_err(|e| {
rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e))
})
}
fn read_event(row: &rusqlite::Row<'_>) -> Result<Event, rusqlite::Error> {
let id_str: String = row.get(0)?;
let namespace: String = row.get(1)?;
let verb: String = row.get(2)?;
let substrate_str: String = row.get(3)?;
let actor: String = row.get(4)?;
let kind_str: String = row.get(5)?;
let outcome_str: String = row.get(6)?;
let payload_str: String = row.get(7)?;
let payload_schema_version: i64 = row.get(8)?;
let profile_state_version: Option<i64> = row.get(9)?;
let duration_us: i64 = row.get(10)?;
let target_str: Option<String> = row.get(11)?;
let session_str: Option<String> = row.get(12)?;
let aggregate_kind: Option<String> = row.get(13)?;
let aggregate_str: Option<String> = row.get(14)?;
let created_at: i64 = row.get(15)?;
let id = parse_uuid(&id_str)?;
let substrate = substrate_from_str(&substrate_str)?;
let kind = kind_from_str(&kind_str)?;
let outcome = outcome_from_str(&outcome_str)?;
let payload: serde_json::Value = serde_json::from_str(&payload_str).map_err(|e| {
rusqlite::Error::FromSqlConversionFailure(7, rusqlite::types::Type::Text, Box::new(e))
})?;
let target_id = target_str.as_deref().map(parse_uuid).transpose()?;
let session_id = session_str.as_deref().map(parse_uuid).transpose()?;
let aggregate_id = aggregate_str.as_deref().map(parse_uuid).transpose()?;
Ok(Event {
id,
namespace,
verb,
substrate,
actor,
kind,
outcome,
payload,
payload_schema_version: payload_schema_version as u32,
profile_state_version: profile_state_version.map(|v| v as u64),
duration_us,
target_id,
session_id,
aggregate_kind,
aggregate_id,
created_at,
})
}
fn insert_event_with_observations(
conn: &rusqlite::Connection,
event: &Event,
) -> Result<(), rusqlite::Error> {
let id_str = event.id.to_string();
let substrate_str = event.substrate.name().to_string();
let kind_str = event.kind.name().to_string();
let outcome_str = event.outcome.name().to_string();
let payload_str = event.payload.to_string();
let target_str = event.target_id.map(|u| u.to_string());
let session_str = event.session_id.map(|u| u.to_string());
let aggregate_str = event.aggregate_id.map(|u| u.to_string());
let profile_state_version = event.profile_state_version.map(|v| v as i64);
conn.execute(
"INSERT INTO events \
(id, namespace, verb, substrate, actor, kind, outcome, payload, payload_schema_version, \
profile_state_version, duration_us, target_id, session_id, aggregate_kind, aggregate_id, created_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
rusqlite::params![
id_str,
&event.namespace,
&event.verb,
substrate_str,
&event.actor,
kind_str,
outcome_str,
payload_str,
event.payload_schema_version as i64,
profile_state_version,
event.duration_us,
target_str,
session_str,
&event.aggregate_kind,
aggregate_str,
event.created_at,
],
)?;
for observation in decode_event_observations(event)? {
conn.execute(
"INSERT INTO event_observations \
(event_id, entity_id, referent_kind, role, position) \
VALUES (?1, ?2, ?3, ?4, ?5)",
rusqlite::params![
observation.event_id.to_string(),
observation.entity_id.to_string(),
observation.referent_kind.name(),
observation.role.name(),
observation.position as i64,
],
)?;
}
Ok(())
}
fn decode_event_observations(event: &Event) -> Result<Vec<EventObservation>, rusqlite::Error> {
match event.kind {
EventKind::RerankExecuted => decode_rank_observations(event),
EventKind::RecallExecuted | EventKind::SearchExecuted => decode_rank_observations(event),
EventKind::LinkCreated => decode_link_observations(event),
EventKind::EntityCreated
| EventKind::EntityUpdated
| EventKind::EntityDeleted
| EventKind::NoteCreated
| EventKind::NoteUpdated
| EventKind::NoteDeleted
| EventKind::TaskTransitioned => decode_target_observation(event),
EventKind::FeedbackExplicit => decode_signal_observation(event),
_ => Ok(Vec::new()),
}
}
fn payload_uuid_array(event: &Event, field: &'static str) -> Result<Vec<Uuid>, rusqlite::Error> {
let Some(values) = event.payload.get(field) else {
return Ok(Vec::new());
};
let Some(array) = values.as_array() else {
return Err(invalid_payload(event.kind, field, "expected array"));
};
array
.iter()
.map(|value| {
value
.as_str()
.ok_or_else(|| invalid_payload(event.kind, field, "expected UUID string"))
.and_then(|s| Uuid::parse_str(s).map_err(|e| invalid_payload(event.kind, field, e)))
})
.collect()
}
fn payload_uuid(event: &Event, field: &'static str) -> Result<Option<Uuid>, rusqlite::Error> {
let Some(value) = event.payload.get(field) else {
return Ok(None);
};
let Some(s) = value.as_str() else {
return Err(invalid_payload(event.kind, field, "expected UUID string"));
};
Uuid::parse_str(s)
.map(Some)
.map_err(|e| invalid_payload(event.kind, field, e))
}
fn decode_rank_observations(event: &Event) -> Result<Vec<EventObservation>, rusqlite::Error> {
let mut rows = Vec::new();
for (position, entity_id) in payload_uuid_array(event, "candidates")?
.into_iter()
.enumerate()
{
rows.push(EventObservation {
event_id: event.id,
entity_id,
referent_kind: ReferentKind::Note,
role: ObservationRole::Candidate,
position: position as u32,
});
}
let selected = payload_uuid_array(event, "selected")
.or_else(|_| payload_uuid_array(event, "reranked"))
.or_else(|_| payload_uuid_array(event, "final_scores"))?;
for (position, entity_id) in selected.into_iter().enumerate() {
rows.push(EventObservation {
event_id: event.id,
entity_id,
referent_kind: ReferentKind::Note,
role: ObservationRole::Selected,
position: position as u32,
});
}
Ok(rows)
}
fn decode_link_observations(event: &Event) -> Result<Vec<EventObservation>, rusqlite::Error> {
let mut rows = Vec::new();
if let Some(source) = payload_uuid(event, "source_id")? {
rows.push(EventObservation {
event_id: event.id,
entity_id: source,
referent_kind: ReferentKind::Entity,
role: ObservationRole::Target,
position: 0,
});
}
if let Some(target) = payload_uuid(event, "target_id")? {
rows.push(EventObservation {
event_id: event.id,
entity_id: target,
referent_kind: ReferentKind::Entity,
role: ObservationRole::Target,
position: 1,
});
}
Ok(rows)
}
fn decode_target_observation(event: &Event) -> Result<Vec<EventObservation>, rusqlite::Error> {
let Some(entity_id) = event.target_id.or(payload_uuid(event, "target_id")?) else {
return Ok(Vec::new());
};
Ok(vec![EventObservation {
event_id: event.id,
entity_id,
referent_kind: if event.substrate == SubstrateKind::Note {
ReferentKind::Note
} else {
ReferentKind::Entity
},
role: ObservationRole::Target,
position: 0,
}])
}
fn decode_signal_observation(event: &Event) -> Result<Vec<EventObservation>, rusqlite::Error> {
let Some(entity_id) = payload_uuid(event, "about_id")? else {
return Ok(Vec::new());
};
Ok(vec![EventObservation {
event_id: event.id,
entity_id,
referent_kind: ReferentKind::Entity,
role: ObservationRole::Signal,
position: 0,
}])
}
fn invalid_payload(
kind: EventKind,
field: &'static str,
reason: impl std::fmt::Display,
) -> rusqlite::Error {
rusqlite::Error::ToSqlConversionFailure(
format!("invalid payload for {}.{field}: {reason}", kind.name()).into(),
)
}
fn build_event_filter_sql(
conn: &rusqlite::Connection,
default_namespace: &str,
filter: &EventFilter,
) -> Result<(String, Vec<Box<dyn rusqlite::types::ToSql>>), rusqlite::Error> {
reject_missing_event_filter_schema(conn, filter)?;
let mut conditions: Vec<String> = Vec::new();
let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
params.push(Box::new(default_namespace.to_string()));
conditions.push(format!("namespace = ?{}", params.len()));
push_in_clause(
&mut conditions,
&mut params,
"id",
filter.ids.iter().map(Uuid::to_string),
);
push_in_clause(
&mut conditions,
&mut params,
"kind",
filter.kinds.iter().map(|kind| kind.name().to_string()),
);
push_in_clause(
&mut conditions,
&mut params,
"verb",
filter.verbs.iter().cloned(),
);
push_in_clause(
&mut conditions,
&mut params,
"substrate",
filter.substrates.iter().map(|s| s.name().to_string()),
);
push_in_clause(
&mut conditions,
&mut params,
"actor",
filter.actors.iter().cloned(),
);
if let Some(after) = filter.after {
params.push(Box::new(after));
conditions.push(format!("created_at > ?{}", params.len()));
}
if let Some(before) = filter.before {
params.push(Box::new(before));
conditions.push(format!("created_at < ?{}", params.len()));
}
if let Some(session_id) = filter.session_id {
params.push(Box::new(session_id.to_string()));
conditions.push(format!("session_id = ?{}", params.len()));
}
push_observation_exists(&mut conditions, &mut params, "candidate", &filter.observed);
push_observation_exists(&mut conditions, &mut params, "selected", &filter.selected);
if let Some(proposal_id) = filter.payload_proposal_id {
params.push(Box::new(proposal_id.to_string()));
conditions.push(format!(
"json_extract(payload, '$.proposal_id') = ?{}",
params.len()
));
}
let clause = format!(" WHERE {}", conditions.join(" AND "));
Ok((clause, params))
}
fn push_in_clause<I>(
conditions: &mut Vec<String>,
params: &mut Vec<Box<dyn rusqlite::types::ToSql>>,
column: &'static str,
values: I,
) where
I: IntoIterator<Item = String>,
{
let placeholders: Vec<String> = values
.into_iter()
.map(|value| {
params.push(Box::new(value));
format!("?{}", params.len())
})
.collect();
if !placeholders.is_empty() {
conditions.push(format!("{column} IN ({})", placeholders.join(",")));
}
}
fn push_observation_exists(
conditions: &mut Vec<String>,
params: &mut Vec<Box<dyn rusqlite::types::ToSql>>,
role: &'static str,
entity_ids: &[Uuid],
) {
if entity_ids.is_empty() {
return;
}
let placeholders: Vec<String> = entity_ids
.iter()
.map(|id| {
params.push(Box::new(id.to_string()));
format!("?{}", params.len())
})
.collect();
conditions.push(format!(
"EXISTS (SELECT 1 FROM event_observations o \
WHERE o.event_id = events.id AND o.role = '{role}' AND o.entity_id IN ({}))",
placeholders.join(",")
));
}
fn reject_missing_event_filter_schema(
conn: &rusqlite::Connection,
filter: &EventFilter,
) -> Result<(), rusqlite::Error> {
if filter.session_id.is_some() && !has_column(conn, "events", "session_id")? {
return Err(schema_absent("events.session_id"));
}
if (!filter.observed.is_empty() || !filter.selected.is_empty())
&& !has_table(conn, "event_observations")?
{
return Err(schema_absent("event_observations"));
}
if filter.payload_proposal_id.is_some() && !has_column(conn, "events", "payload")? {
return Err(schema_absent("events.payload"));
}
Ok(())
}
fn has_table(conn: &rusqlite::Connection, table: &'static str) -> Result<bool, rusqlite::Error> {
conn.query_row(
"SELECT COUNT(*) > 0 FROM sqlite_master WHERE type = 'table' AND name = ?1",
[table],
|row| row.get(0),
)
}
fn has_column(
conn: &rusqlite::Connection,
table: &'static str,
column: &'static str,
) -> Result<bool, rusqlite::Error> {
conn.query_row(
"SELECT COUNT(*) > 0 FROM pragma_table_info(?1) WHERE name = ?2",
rusqlite::params![table, column],
|row| row.get(0),
)
}
fn schema_absent(name: &'static str) -> rusqlite::Error {
rusqlite::Error::ToSqlConversionFailure(
format!("event filter requires missing schema element {name}; run migrations").into(),
)
}
#[async_trait]
impl EventStore for SqlEventStore {
async fn append_event(&self, event: Event) -> Result<(), StorageError> {
self.with_writer("append_event", move |conn| {
conn.execute_batch("BEGIN IMMEDIATE")?;
if let Err(e) = insert_event_with_observations(conn, &event) {
let _ = conn.execute_batch("ROLLBACK");
return Err(e);
}
conn.execute_batch("COMMIT")?;
Ok(())
})
.await
}
async fn append_events(&self, events: Vec<Event>) -> Result<BatchWriteSummary, StorageError> {
let attempted = events.len() as u64;
self.with_writer("append_events", move |conn| {
conn.execute_batch("BEGIN IMMEDIATE")?;
let mut affected = 0u64;
for event in &events {
if let Err(e) = insert_event_with_observations(conn, event) {
let _ = conn.execute_batch("ROLLBACK");
return Err(e);
}
affected += 1;
}
conn.execute_batch("COMMIT")?;
Ok(BatchWriteSummary {
attempted,
affected,
failed: 0,
first_error: String::new(),
})
})
.await
}
async fn get_event(&self, id: Uuid) -> Result<Option<Event>, StorageError> {
let namespace = self.namespace.clone();
let id_str = id.to_string();
self.with_reader("get_event", move |conn| {
let mut stmt = conn.prepare(
"SELECT id, namespace, verb, substrate, actor, kind, outcome, payload, \
payload_schema_version, profile_state_version, duration_us, target_id, \
session_id, aggregate_kind, aggregate_id, created_at \
FROM events WHERE namespace = ?1 AND id = ?2",
)?;
let mut rows = stmt.query(rusqlite::params![namespace, id_str])?;
match rows.next()? {
Some(row) => Ok(Some(read_event(row)?)),
None => Ok(None),
}
})
.await
}
async fn query_events(
&self,
filter: EventFilter,
page: PageRequest,
) -> Result<Page<Event>, StorageError> {
let namespace = self.namespace.clone();
self.with_reader("query_events", move |conn| {
let (where_clause, filter_params) = build_event_filter_sql(conn, &namespace, &filter)?;
let count_sql = format!("SELECT COUNT(*) FROM events{}", where_clause);
let total: i64 = {
let mut stmt = conn.prepare(&count_sql)?;
let param_refs: Vec<&dyn rusqlite::types::ToSql> =
filter_params.iter().map(|p| p.as_ref()).collect();
stmt.query_row(param_refs.as_slice(), |row| row.get(0))?
};
let (_, data_filter_params) = build_event_filter_sql(conn, &namespace, &filter)?;
let mut all_params: Vec<Box<dyn rusqlite::types::ToSql>> = data_filter_params;
all_params.push(Box::new(page.limit as i64));
all_params.push(Box::new(page.offset as i64));
let limit_idx = all_params.len() - 1;
let offset_idx = all_params.len();
let data_sql = format!(
"SELECT id, namespace, verb, substrate, actor, kind, outcome, payload, \
payload_schema_version, profile_state_version, duration_us, target_id, \
session_id, aggregate_kind, aggregate_id, created_at \
FROM events{} ORDER BY created_at DESC, id DESC LIMIT ?{} OFFSET ?{}",
where_clause, limit_idx, offset_idx,
);
let mut stmt = conn.prepare(&data_sql)?;
let param_refs: Vec<&dyn rusqlite::types::ToSql> =
all_params.iter().map(|p| p.as_ref()).collect();
let rows = stmt.query_map(param_refs.as_slice(), read_event)?;
let mut items = Vec::new();
for row in rows {
items.push(row?);
}
Ok(Page {
items,
total: Some(total as u64),
})
})
.await
}
async fn count_events(&self, filter: EventFilter) -> Result<u64, StorageError> {
let namespace = self.namespace.clone();
self.with_reader("count_events", move |conn| {
let (where_clause, params) = build_event_filter_sql(conn, &namespace, &filter)?;
let sql = format!("SELECT COUNT(*) FROM events{}", where_clause);
let mut stmt = conn.prepare(&sql)?;
let param_refs: Vec<&dyn rusqlite::types::ToSql> =
params.iter().map(|p| p.as_ref()).collect();
let count: i64 = stmt.query_row(param_refs.as_slice(), |row| row.get(0))?;
Ok(count as u64)
})
.await
}
}
const EVENTS_DDL: &str = "\
CREATE TABLE IF NOT EXISTS events (\
id TEXT PRIMARY KEY,\
namespace TEXT NOT NULL,\
verb TEXT NOT NULL,\
substrate TEXT NOT NULL,\
actor TEXT NOT NULL,\
kind TEXT NOT NULL DEFAULT 'audit',\
outcome TEXT NOT NULL,\
payload TEXT NOT NULL DEFAULT '{}',\
payload_schema_version INTEGER NOT NULL DEFAULT 1,\
profile_state_version INTEGER,\
duration_us INTEGER NOT NULL DEFAULT 0,\
target_id TEXT,\
session_id TEXT,\
aggregate_kind TEXT,\
aggregate_id TEXT,\
created_at INTEGER NOT NULL\
);\
CREATE TABLE IF NOT EXISTS event_observations (\
event_id TEXT NOT NULL,\
entity_id TEXT NOT NULL,\
referent_kind TEXT NOT NULL,\
role TEXT NOT NULL,\
position INTEGER NOT NULL,\
PRIMARY KEY (event_id, role, position)\
);\
CREATE INDEX IF NOT EXISTS idx_events_namespace ON events(namespace);\
CREATE INDEX IF NOT EXISTS idx_events_verb ON events(verb);\
CREATE INDEX IF NOT EXISTS idx_events_kind ON events(kind);\
CREATE INDEX IF NOT EXISTS idx_events_substrate ON events(substrate);\
CREATE INDEX IF NOT EXISTS idx_events_created ON events(created_at DESC);\
CREATE INDEX IF NOT EXISTS idx_events_ns_created_id ON events(namespace, created_at DESC, id DESC);\
CREATE INDEX IF NOT EXISTS idx_events_session ON events(namespace, session_id, created_at, id);\
CREATE INDEX IF NOT EXISTS idx_events_payload_proposal_id ON events(json_extract(payload, '$.proposal_id'));\
CREATE INDEX IF NOT EXISTS idx_event_obs_entity ON event_observations(entity_id, role);\
CREATE INDEX IF NOT EXISTS idx_event_obs_event_role ON event_observations(event_id, role);\
";
pub(crate) fn ensure_events_schema(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
conn.execute_batch(EVENTS_DDL)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::pool::PoolConfig;
use serde_json::json;
fn setup_memory_store() -> SqlEventStore {
let config = PoolConfig {
path: None,
..PoolConfig::default()
};
let pool = Arc::new(ConnectionPool::new(config).unwrap());
{
let writer = pool.writer().unwrap();
writer.conn().execute_batch(EVENTS_DDL).unwrap();
}
SqlEventStore::new_scoped(pool, false, "default")
}
fn make_event(namespace: &str) -> Event {
Event::new(
namespace,
"search",
EventKind::SearchExecuted,
SubstrateKind::Note,
"agent:test",
)
}
#[tokio::test]
async fn test_append_and_get_event() {
let store = setup_memory_store();
let event = make_event("default");
let id = event.id;
store.append_event(event).await.unwrap();
let fetched = store.get_event(id).await.unwrap();
assert!(fetched.is_some());
let fetched = fetched.unwrap();
assert_eq!(fetched.id, id);
assert_eq!(fetched.verb, "search");
assert_eq!(fetched.substrate, SubstrateKind::Note);
assert_eq!(fetched.actor, "agent:test");
assert_eq!(fetched.outcome, EventOutcome::Success);
}
#[tokio::test]
async fn test_append_events_batch() {
let store = setup_memory_store();
let events: Vec<Event> = (0..3).map(|_| make_event("default")).collect();
let summary = store.append_events(events).await.unwrap();
assert_eq!(summary.attempted, 3);
assert_eq!(summary.affected, 3);
assert_eq!(summary.failed, 0);
}
#[tokio::test]
async fn test_count_events() {
let store = setup_memory_store();
for _ in 0..3 {
store.append_event(make_event("default")).await.unwrap();
}
let count = store.count_events(EventFilter::default()).await.unwrap();
assert_eq!(count, 3);
}
#[tokio::test]
async fn test_query_events_filter_by_verb() {
let store = setup_memory_store();
store.append_event(make_event("default")).await.unwrap();
let mut create_event = make_event("default");
create_event.verb = "create".to_string();
store.append_event(create_event).await.unwrap();
let filter = EventFilter {
verbs: vec!["search".to_string()],
..EventFilter::default()
};
let page = store
.query_events(
filter,
PageRequest {
limit: 10,
offset: 0,
},
)
.await
.unwrap();
assert_eq!(page.items.len(), 1);
assert_eq!(page.items[0].verb, "search");
}
#[tokio::test]
async fn test_query_events_filter_by_substrate() {
let store = setup_memory_store();
store.append_event(make_event("default")).await.unwrap();
let mut entity_event = make_event("default");
entity_event.substrate = SubstrateKind::Entity;
store.append_event(entity_event).await.unwrap();
let filter = EventFilter {
substrates: vec![SubstrateKind::Entity],
..EventFilter::default()
};
let page = store
.query_events(
filter,
PageRequest {
limit: 10,
offset: 0,
},
)
.await
.unwrap();
assert_eq!(page.items.len(), 1);
assert_eq!(page.items[0].substrate, SubstrateKind::Entity);
}
#[tokio::test]
async fn test_outcome_roundtrip() {
let store = setup_memory_store();
let mut denied = make_event("default");
denied.outcome = EventOutcome::Denied;
let denied_id = denied.id;
store.append_event(denied).await.unwrap();
let fetched = store.get_event(denied_id).await.unwrap().unwrap();
assert_eq!(fetched.outcome, EventOutcome::Denied);
}
#[tokio::test]
async fn append_event_writes_observations_atomically() {
let store = setup_memory_store();
let candidate = Uuid::new_v4();
let selected = Uuid::new_v4();
let mut event = make_event("default");
event.kind = EventKind::RerankExecuted;
event.payload = json!({
"candidates": [candidate.to_string()],
"selected": [selected.to_string()],
"served_by_profile_id": "profile-a"
});
let event_id = event.id;
store.append_event(event).await.unwrap();
let fetched = store.get_event(event_id).await.unwrap();
assert!(fetched.is_some());
let pool = Arc::clone(&store.pool);
let event_id_str = event_id.to_string();
let (candidate_count, selected_count) = tokio::task::spawn_blocking(move || {
let guard = pool.reader().unwrap();
let conn = guard.conn();
let c: i64 = conn
.query_row(
"SELECT COUNT(*) FROM event_observations WHERE event_id = ?1 AND role = 'candidate'",
[&event_id_str],
|r| r.get(0),
)
.unwrap();
let s: i64 = conn
.query_row(
"SELECT COUNT(*) FROM event_observations WHERE event_id = ?1 AND role = 'selected'",
[&event_id_str],
|r| r.get(0),
)
.unwrap();
(c, s)
})
.await
.unwrap();
assert_eq!(candidate_count, 1, "expected one candidate observation row");
assert_eq!(selected_count, 1, "expected one selected observation row");
}
#[tokio::test]
async fn invalid_projection_payload_aborts_event_insert() {
let store = setup_memory_store();
let mut event = make_event("default");
event.kind = EventKind::RerankExecuted;
event.payload = json!({ "candidates": "not-array" });
let event_id = event.id;
let result = store.append_event(event).await;
assert!(result.is_err(), "invalid payload must return Err");
let fetched = store.get_event(event_id).await.unwrap();
assert!(fetched.is_none(), "event row must not exist after rollback");
}
#[tokio::test]
async fn query_events_orders_by_created_at_then_id_desc() {
let store = setup_memory_store();
let ts = chrono::Utc::now().timestamp_micros();
let id_low = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap();
let id_high = Uuid::parse_str("ffffffff-ffff-ffff-ffff-ffffffffffff").unwrap();
let pool = Arc::clone(&store.pool);
tokio::task::spawn_blocking(move || {
let guard = pool.try_writer().unwrap();
let conn = guard.conn();
conn.execute_batch("BEGIN IMMEDIATE").unwrap();
for id in [id_low, id_high] {
conn.execute(
"INSERT INTO events \
(id, namespace, verb, substrate, actor, kind, outcome, payload, \
payload_schema_version, duration_us, created_at) \
VALUES (?1, 'default', 'search', 'note', 'test', 'audit', 'success', '{}', 1, 0, ?2)",
rusqlite::params![id.to_string(), ts],
)
.unwrap();
}
conn.execute_batch("COMMIT").unwrap();
})
.await
.unwrap();
let page = store
.query_events(
EventFilter::default(),
PageRequest {
limit: 10,
offset: 0,
},
)
.await
.unwrap();
assert_eq!(page.items.len(), 2);
assert_eq!(
page.items[0].id, id_high,
"higher UUID must come first (id DESC tiebreaker)"
);
assert_eq!(page.items[1].id, id_low);
}
#[tokio::test]
async fn query_events_filters_by_kind() {
let store = setup_memory_store();
store.append_event(make_event("default")).await.unwrap();
let mut recall_event = make_event("default");
recall_event.kind = EventKind::RecallExecuted;
store.append_event(recall_event).await.unwrap();
let filter = EventFilter {
kinds: vec![EventKind::RecallExecuted],
..EventFilter::default()
};
let page = store
.query_events(
filter,
PageRequest {
limit: 10,
offset: 0,
},
)
.await
.unwrap();
assert_eq!(page.items.len(), 1);
assert_eq!(page.items[0].kind, EventKind::RecallExecuted);
}
#[tokio::test]
async fn query_events_filters_by_session_id() {
let store = setup_memory_store();
let session = Uuid::new_v4();
let mut event = make_event("default");
event.session_id = Some(session);
store.append_event(event).await.unwrap();
store.append_event(make_event("default")).await.unwrap();
let filter = EventFilter {
session_id: Some(session),
..EventFilter::default()
};
let page = store
.query_events(
filter,
PageRequest {
limit: 10,
offset: 0,
},
)
.await
.unwrap();
assert_eq!(page.items.len(), 1);
assert_eq!(page.items[0].session_id, Some(session));
}
#[tokio::test]
async fn query_events_filters_by_observed() {
let store = setup_memory_store();
let entity_id = Uuid::new_v4();
let mut event = make_event("default");
event.kind = EventKind::RerankExecuted;
event.payload = json!({
"candidates": [entity_id.to_string()],
"selected": []
});
store.append_event(event).await.unwrap();
store.append_event(make_event("default")).await.unwrap();
let filter = EventFilter {
observed: vec![entity_id],
..EventFilter::default()
};
let page = store
.query_events(
filter,
PageRequest {
limit: 10,
offset: 0,
},
)
.await
.unwrap();
assert_eq!(page.items.len(), 1);
}
#[tokio::test]
async fn query_events_filters_by_selected() {
let store = setup_memory_store();
let entity_id = Uuid::new_v4();
let mut event = make_event("default");
event.kind = EventKind::RerankExecuted;
event.payload = json!({
"candidates": [],
"selected": [entity_id.to_string()]
});
store.append_event(event).await.unwrap();
store.append_event(make_event("default")).await.unwrap();
let filter = EventFilter {
selected: vec![entity_id],
..EventFilter::default()
};
let page = store
.query_events(
filter,
PageRequest {
limit: 10,
offset: 0,
},
)
.await
.unwrap();
assert_eq!(page.items.len(), 1);
}
#[tokio::test]
async fn query_events_filters_by_payload_proposal_id() {
let store = setup_memory_store();
let proposal_id = Uuid::new_v4();
let mut event = make_event("default");
event.kind = EventKind::ProposalCreated;
event.payload = json!({ "proposal_id": proposal_id.to_string() });
store.append_event(event).await.unwrap();
store.append_event(make_event("default")).await.unwrap();
let filter = EventFilter {
payload_proposal_id: Some(proposal_id),
..EventFilter::default()
};
let page = store
.query_events(
filter,
PageRequest {
limit: 10,
offset: 0,
},
)
.await
.unwrap();
assert_eq!(page.items.len(), 1);
}
#[tokio::test]
async fn query_events_observed_filter_missing_projection_returns_clean_error() {
let config = PoolConfig {
path: None,
..PoolConfig::default()
};
let pool = Arc::new(ConnectionPool::new(config).unwrap());
{
let writer = pool.writer().unwrap();
writer.conn().execute_batch(
"CREATE TABLE IF NOT EXISTS events (\
id TEXT PRIMARY KEY, namespace TEXT NOT NULL, verb TEXT NOT NULL,\
substrate TEXT NOT NULL, actor TEXT NOT NULL, kind TEXT NOT NULL DEFAULT 'audit',\
outcome TEXT NOT NULL, payload TEXT NOT NULL DEFAULT '{}',\
payload_schema_version INTEGER NOT NULL DEFAULT 1,\
duration_us INTEGER NOT NULL DEFAULT 0, created_at INTEGER NOT NULL\
);"
).unwrap();
}
let store = SqlEventStore::new_scoped(pool, false, "default");
let filter = EventFilter {
observed: vec![Uuid::new_v4()],
..EventFilter::default()
};
let result = store
.query_events(
filter,
PageRequest {
limit: 10,
offset: 0,
},
)
.await;
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("event_observations") && err_msg.contains("run migrations"),
"error should mention event_observations and run migrations, got: {err_msg}"
);
}
}