use std::collections::BTreeMap;
use std::path::Path;
use chrono::{DateTime, Utc};
use cortex_core::{
compose_policy_outcomes, Attestor, Event, EventId, EventSource, EventType, PolicyContribution,
PolicyDecision, PolicyOutcome, TraceId,
};
use cortex_ledger::{seal, JsonlLog};
use rusqlite::{params, OptionalExtension, Row};
use crate::{Pool, StoreError, StoreResult};
pub const MIRROR_APPEND_PARITY_INVARIANT_RULE_ID: &str = "mirror.append.parity_invariant";
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EventMismatch {
pub id: EventId,
pub jsonl_event_hash: String,
pub sqlite_event_hash: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EventSetParity {
pub jsonl_event_count: usize,
pub sqlite_event_count: usize,
pub missing_in_sqlite: Vec<EventId>,
pub missing_in_jsonl: Vec<EventId>,
pub mismatched: Vec<EventMismatch>,
}
impl EventSetParity {
#[must_use]
pub fn is_consistent(&self) -> bool {
self.missing_in_sqlite.is_empty()
&& self.missing_in_jsonl.is_empty()
&& self.mismatched.is_empty()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReplayReport {
pub replayed: usize,
pub skipped_existing: usize,
pub parity: EventSetParity,
}
pub fn append_event(
log: &mut JsonlLog,
pool: &mut Pool,
event: Event,
ledger_policy: &PolicyDecision,
mirror_policy: &PolicyDecision,
) -> StoreResult<Event> {
require_mirror_parity_contributor(mirror_policy)?;
require_mirror_final_outcome(mirror_policy, "mirror.append")?;
if let Some(existing) = select_event_by_id(pool, &event.id)? {
return Err(StoreError::Validation(format!(
"event id `{}` already exists in SQLite with hash `{}`",
existing.id, existing.event_hash
)));
}
let mut sealed = event.clone();
sealed.prev_event_hash = log.head().map(str::to_owned);
seal(&mut sealed);
let appended_head = log.append(event, ledger_policy).map_err(jsonl_error)?;
if appended_head != sealed.event_hash {
return Err(StoreError::Validation(format!(
"JSONL append head `{appended_head}` did not match sealed event `{}`",
sealed.event_hash
)));
}
let tx = pool.transaction()?;
insert_event(&tx, &sealed)?;
tx.commit()?;
Ok(sealed)
}
pub fn append_signed_event(
log: &mut JsonlLog,
pool: &mut Pool,
event: Event,
attestor: &dyn Attestor,
ledger_policy: &PolicyDecision,
mirror_policy: &PolicyDecision,
) -> StoreResult<Event> {
require_mirror_parity_contributor(mirror_policy)?;
require_mirror_final_outcome(mirror_policy, "mirror.append_signed")?;
if let Some(existing) = select_event_by_id(pool, &event.id)? {
return Err(StoreError::Validation(format!(
"event id `{}` already exists in SQLite with hash `{}`",
existing.id, existing.event_hash
)));
}
let mut sealed = event.clone();
sealed.prev_event_hash = log.head().map(str::to_owned);
seal(&mut sealed);
let appended_head = log
.append_signed(event, attestor, ledger_policy)
.map_err(jsonl_error)?;
if appended_head != sealed.event_hash {
return Err(StoreError::Validation(format!(
"signed JSONL append head `{appended_head}` did not match sealed event `{}`",
sealed.event_hash
)));
}
let tx = pool.transaction()?;
insert_event(&tx, &sealed)?;
tx.commit()?;
Ok(sealed)
}
pub fn mirror_single_event_into_sqlite(pool: &mut Pool, event: &Event) -> StoreResult<()> {
let tx = pool.transaction()?;
match mirror_single_event_into_sqlite_in_tx(&tx, event)? {
MirrorSingleEventOutcome::Inserted | MirrorSingleEventOutcome::AlreadyPresent => {
tx.commit()?;
Ok(())
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MirrorSingleEventOutcome {
Inserted,
AlreadyPresent,
}
pub fn mirror_single_event_into_sqlite_in_tx(
tx: &rusqlite::Transaction<'_>,
event: &Event,
) -> StoreResult<MirrorSingleEventOutcome> {
match select_event_by_id(tx, &event.id)? {
Some(existing) if existing == *event => Ok(MirrorSingleEventOutcome::AlreadyPresent),
Some(existing) => Err(StoreError::Validation(format!(
"event id `{}` already mirrored with diverging hash `{}` (expected `{}`); refusing to overwrite",
event.id, existing.event_hash, event.event_hash
))),
None => {
insert_event(tx, event)?;
Ok(MirrorSingleEventOutcome::Inserted)
}
}
}
fn require_mirror_parity_contributor(policy: &PolicyDecision) -> StoreResult<()> {
let contains_rule = policy
.contributing
.iter()
.chain(policy.discarded.iter())
.any(|contribution| {
contribution.rule_id.as_str() == MIRROR_APPEND_PARITY_INVARIANT_RULE_ID
});
if contains_rule {
Ok(())
} else {
Err(StoreError::Validation(format!(
"policy decision missing required contributor `{MIRROR_APPEND_PARITY_INVARIANT_RULE_ID}`; caller skipped ADR 0026 composition",
)))
}
}
fn require_mirror_final_outcome(policy: &PolicyDecision, surface: &str) -> StoreResult<()> {
match policy.final_outcome {
PolicyOutcome::Allow | PolicyOutcome::Warn | PolicyOutcome::BreakGlass => Ok(()),
PolicyOutcome::Quarantine | PolicyOutcome::Reject => Err(StoreError::Validation(format!(
"{surface} preflight: composed policy outcome {:?} blocks mirrored append",
policy.final_outcome,
))),
}
}
#[must_use]
pub fn mirror_policy_decision_test_allow() -> PolicyDecision {
compose_policy_outcomes(
vec![PolicyContribution::new(
MIRROR_APPEND_PARITY_INVARIANT_RULE_ID,
PolicyOutcome::Allow,
"test fixture: JSONL <-> SQLite parity invariant satisfied",
)
.expect("static test contribution is valid")],
None,
)
}
pub fn replay_jsonl_into_sqlite(
pool: &mut Pool,
jsonl_path: impl AsRef<Path>,
) -> StoreResult<ReplayReport> {
let jsonl_path = jsonl_path.as_ref();
let jsonl_events = read_jsonl_events(jsonl_path)?;
let mut replayed = 0;
let mut skipped_existing = 0;
{
let tx = pool.transaction()?;
for event in jsonl_events.values() {
match select_event_by_id(&tx, &event.id)? {
Some(existing) if existing == *event => skipped_existing += 1,
Some(existing) => {
return Err(StoreError::Validation(format!(
"event id `{}` differs between JSONL hash `{}` and SQLite hash `{}`",
event.id, event.event_hash, existing.event_hash
)));
}
None => {
insert_event(&tx, event)?;
replayed += 1;
}
}
}
tx.commit()?;
}
let parity = verify_event_set_parity(pool, jsonl_path)?;
Ok(ReplayReport {
replayed,
skipped_existing,
parity,
})
}
pub fn verify_event_set_parity(
pool: &Pool,
jsonl_path: impl AsRef<Path>,
) -> StoreResult<EventSetParity> {
let jsonl_events = read_jsonl_events(jsonl_path.as_ref())?;
let sqlite_events = read_sqlite_events(pool)?;
let mut missing_in_sqlite = Vec::new();
let mut missing_in_jsonl = Vec::new();
let mut mismatched = Vec::new();
for (id, jsonl_event) in &jsonl_events {
match sqlite_events.get(id) {
Some(sqlite_event) if sqlite_event == jsonl_event => {}
Some(sqlite_event) => mismatched.push(EventMismatch {
id: jsonl_event.id,
jsonl_event_hash: jsonl_event.event_hash.clone(),
sqlite_event_hash: sqlite_event.event_hash.clone(),
}),
None => missing_in_sqlite.push(jsonl_event.id),
}
}
for (id, sqlite_event) in &sqlite_events {
if !jsonl_events.contains_key(id) {
missing_in_jsonl.push(sqlite_event.id);
}
}
Ok(EventSetParity {
jsonl_event_count: jsonl_events.len(),
sqlite_event_count: sqlite_events.len(),
missing_in_sqlite,
missing_in_jsonl,
mismatched,
})
}
fn read_jsonl_events(path: &Path) -> StoreResult<BTreeMap<String, Event>> {
if !path.is_file() {
return Err(StoreError::Validation(format!(
"JSONL mirror does not exist at `{}`",
path.display()
)));
}
let log = JsonlLog::open(path).map_err(jsonl_error)?;
log.verify_chain().map_err(jsonl_error)?;
let mut events = BTreeMap::new();
for item in log.iter().map_err(jsonl_error)? {
let event = item.map_err(jsonl_error)?;
let id = event.id.to_string();
if let Some(existing) = events.insert(id.clone(), event.clone()) {
return Err(StoreError::Validation(format!(
"duplicate event id `{id}` in JSONL mirror: hashes `{}` and `{}`",
existing.event_hash, event.event_hash
)));
}
}
Ok(events)
}
fn read_sqlite_events(pool: &Pool) -> StoreResult<BTreeMap<String, Event>> {
let mut stmt = pool.prepare("SELECT id FROM events ORDER BY id;")?;
let ids = stmt
.query_map([], |row| row.get::<_, String>(0))?
.collect::<Result<Vec<_>, _>>()?;
let mut events = BTreeMap::new();
for id in ids {
let event = select_event_by_id(pool, &id.parse::<EventId>()?)?.ok_or_else(|| {
StoreError::Validation(format!("event id `{id}` disappeared during parity read"))
})?;
events.insert(id, event);
}
Ok(events)
}
fn insert_event(conn: &rusqlite::Connection, event: &Event) -> StoreResult<()> {
let source_json = serde_json::to_string(&event.source)?;
let domain_tags_json = serde_json::to_string(&event.domain_tags)?;
let payload_json = serde_json::to_string(&event.payload)?;
let trace_id = event.trace_id.map(|id| id.to_string());
conn.execute(
"INSERT INTO events (
id, schema_version, observed_at, recorded_at, source_json,
event_type, trace_id, session_id, domain_tags_json, payload_json,
payload_hash, prev_event_hash, event_hash
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13);",
params![
event.id.to_string(),
i64::from(event.schema_version),
event.observed_at.to_rfc3339(),
event.recorded_at.to_rfc3339(),
source_json,
event.event_type.wire_str(),
trace_id,
event.session_id,
domain_tags_json,
payload_json,
event.payload_hash,
event.prev_event_hash,
event.event_hash,
],
)?;
Ok(())
}
fn select_event_by_id(conn: &rusqlite::Connection, id: &EventId) -> StoreResult<Option<Event>> {
conn.query_row(
"SELECT id, schema_version, observed_at, recorded_at, source_json,
event_type, trace_id, session_id, domain_tags_json, payload_json,
payload_hash, prev_event_hash, event_hash
FROM events
WHERE id = ?1;",
params![id.to_string()],
event_row,
)
.optional()?
.map(TryInto::try_into)
.transpose()
}
#[derive(Debug)]
struct EventRow {
id: String,
schema_version: i64,
observed_at: String,
recorded_at: String,
source_json: String,
event_type: String,
trace_id: Option<String>,
session_id: Option<String>,
domain_tags_json: String,
payload_json: String,
payload_hash: String,
prev_event_hash: Option<String>,
event_hash: String,
}
fn event_row(row: &Row<'_>) -> rusqlite::Result<EventRow> {
Ok(EventRow {
id: row.get(0)?,
schema_version: row.get(1)?,
observed_at: row.get(2)?,
recorded_at: row.get(3)?,
source_json: row.get(4)?,
event_type: row.get(5)?,
trace_id: row.get(6)?,
session_id: row.get(7)?,
domain_tags_json: row.get(8)?,
payload_json: row.get(9)?,
payload_hash: row.get(10)?,
prev_event_hash: row.get(11)?,
event_hash: row.get(12)?,
})
}
impl TryFrom<EventRow> for Event {
type Error = StoreError;
fn try_from(row: EventRow) -> StoreResult<Self> {
let schema_version = u16::try_from(row.schema_version).map_err(|_| {
StoreError::Validation(format!(
"invalid event schema_version {}",
row.schema_version
))
})?;
Ok(Self {
id: row.id.parse::<EventId>()?,
schema_version,
observed_at: parse_utc(&row.observed_at)?,
recorded_at: parse_utc(&row.recorded_at)?,
source: serde_json::from_str::<EventSource>(&row.source_json)?,
event_type: serde_json::from_value::<EventType>(serde_json::Value::String(
row.event_type,
))?,
trace_id: row.trace_id.map(|id| id.parse::<TraceId>()).transpose()?,
session_id: row.session_id,
domain_tags: serde_json::from_str(&row.domain_tags_json)?,
payload: serde_json::from_str(&row.payload_json)?,
payload_hash: row.payload_hash,
prev_event_hash: row.prev_event_hash,
event_hash: row.event_hash,
})
}
}
fn parse_utc(value: &str) -> StoreResult<DateTime<Utc>> {
Ok(DateTime::parse_from_rfc3339(value)?.with_timezone(&Utc))
}
fn jsonl_error(err: cortex_ledger::JsonlError) -> StoreError {
StoreError::Validation(err.to_string())
}