use chrono::{DateTime, Utc};
use cortex_core::{
Event, EventId, EventSource, EventType, SourceAttestation, TraceId,
SCHEMA_MIGRATION_V1_TO_V2_TARGET,
};
use rusqlite::{params, OptionalExtension, Row};
use crate::{Pool, StoreError, StoreResult};
#[derive(Debug)]
pub struct EventRepo<'a> {
pool: &'a Pool,
}
impl<'a> EventRepo<'a> {
#[must_use]
pub const fn new(pool: &'a Pool) -> Self {
Self { pool }
}
pub fn append(&self, event: &Event) -> StoreResult<()> {
let source_json = serde_json::to_string(&event.source)?;
let domain_tags_json = serde_json::to_string(&event.domain_tags)?;
let payload_json = serde_json::to_string(&event.payload)?;
let trace_id = event.trace_id.map(|id| id.to_string());
self.pool.execute(
"INSERT OR IGNORE INTO events (
id, schema_version, observed_at, recorded_at, source_json,
event_type, trace_id, session_id, domain_tags_json, payload_json,
payload_hash, prev_event_hash, event_hash
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13);",
params![
event.id.to_string(),
i64::from(event.schema_version),
event.observed_at.to_rfc3339(),
event.recorded_at.to_rfc3339(),
source_json,
event.event_type.wire_str(),
trace_id,
event.session_id,
domain_tags_json,
payload_json,
event.payload_hash,
event.prev_event_hash,
event.event_hash,
],
)?;
Ok(())
}
pub fn append_schema_v2_with_source_attestation(
&self,
event: &Event,
source_attestation: &SourceAttestation,
) -> StoreResult<()> {
if event.schema_version != SCHEMA_MIGRATION_V1_TO_V2_TARGET {
return Err(StoreError::Validation(format!(
"schema-v2 event append requires schema_version {}; found {}",
SCHEMA_MIGRATION_V1_TO_V2_TARGET, event.schema_version
)));
}
if matches!(source_attestation, SourceAttestation::Missing) {
return Err(StoreError::Validation(
"schema-v2 event append requires explicit source attestation; missing is not persistable"
.into(),
));
}
let source_json = serde_json::to_string(&event.source)?;
let source_attestation_json = serde_json::to_string(source_attestation)?;
let domain_tags_json = serde_json::to_string(&event.domain_tags)?;
let payload_json = serde_json::to_string(&event.payload)?;
let trace_id = event.trace_id.map(|id| id.to_string());
self.pool.execute(
"INSERT INTO events (
id, schema_version, observed_at, recorded_at, source_json,
event_type, trace_id, session_id, domain_tags_json, payload_json,
payload_hash, prev_event_hash, event_hash, source_attestation_json
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14);",
params![
event.id.to_string(),
i64::from(event.schema_version),
event.observed_at.to_rfc3339(),
event.recorded_at.to_rfc3339(),
source_json,
event.event_type.wire_str(),
trace_id,
event.session_id,
domain_tags_json,
payload_json,
event.payload_hash,
event.prev_event_hash,
event.event_hash,
source_attestation_json,
],
)?;
Ok(())
}
pub fn get_by_id(&self, id: &EventId) -> StoreResult<Option<Event>> {
let row = self
.pool
.query_row(
"SELECT id, schema_version, observed_at, recorded_at, source_json,
event_type, trace_id, session_id, domain_tags_json, payload_json,
payload_hash, prev_event_hash, event_hash
FROM events
WHERE id = ?1;",
params![id.to_string()],
event_row,
)
.optional()?;
row.map(TryInto::try_into).transpose()
}
pub fn chain_head(&self) -> StoreResult<Option<Event>> {
let row = self
.pool
.query_row(
"SELECT e.id, e.schema_version, e.observed_at, e.recorded_at, e.source_json,
e.event_type, e.trace_id, e.session_id, e.domain_tags_json,
e.payload_json, e.payload_hash, e.prev_event_hash, e.event_hash
FROM events e
WHERE NOT EXISTS (
SELECT 1 FROM events child WHERE child.prev_event_hash = e.event_hash
)
ORDER BY e.recorded_at DESC, e.id DESC
LIMIT 1;",
[],
event_row,
)
.optional()?;
row.map(TryInto::try_into).transpose()
}
}
#[derive(Debug)]
struct EventRow {
id: String,
schema_version: i64,
observed_at: String,
recorded_at: String,
source_json: String,
event_type: String,
trace_id: Option<String>,
session_id: Option<String>,
domain_tags_json: String,
payload_json: String,
payload_hash: String,
prev_event_hash: Option<String>,
event_hash: String,
}
fn event_row(row: &Row<'_>) -> rusqlite::Result<EventRow> {
Ok(EventRow {
id: row.get(0)?,
schema_version: row.get(1)?,
observed_at: row.get(2)?,
recorded_at: row.get(3)?,
source_json: row.get(4)?,
event_type: row.get(5)?,
trace_id: row.get(6)?,
session_id: row.get(7)?,
domain_tags_json: row.get(8)?,
payload_json: row.get(9)?,
payload_hash: row.get(10)?,
prev_event_hash: row.get(11)?,
event_hash: row.get(12)?,
})
}
impl TryFrom<EventRow> for Event {
type Error = StoreError;
fn try_from(row: EventRow) -> StoreResult<Self> {
let schema_version = u16::try_from(row.schema_version).map_err(|_| {
StoreError::Validation(format!(
"invalid event schema_version {}",
row.schema_version
))
})?;
Ok(Self {
id: row.id.parse::<EventId>()?,
schema_version,
observed_at: parse_utc(&row.observed_at)?,
recorded_at: parse_utc(&row.recorded_at)?,
source: serde_json::from_str::<EventSource>(&row.source_json)?,
event_type: serde_json::from_value::<EventType>(serde_json::Value::String(
row.event_type,
))?,
trace_id: row.trace_id.map(|id| id.parse::<TraceId>()).transpose()?,
session_id: row.session_id,
domain_tags: serde_json::from_str(&row.domain_tags_json)?,
payload: serde_json::from_str(&row.payload_json)?,
payload_hash: row.payload_hash,
prev_event_hash: row.prev_event_hash,
event_hash: row.event_hash,
})
}
}
fn parse_utc(value: &str) -> StoreResult<DateTime<Utc>> {
Ok(DateTime::parse_from_rfc3339(value)?.with_timezone(&Utc))
}