1use std::collections::BTreeMap;
9use std::path::Path;
10
11use chrono::{DateTime, Utc};
12use cortex_core::{
13 compose_policy_outcomes, Attestor, Event, EventId, EventSource, EventType, PolicyContribution,
14 PolicyDecision, PolicyOutcome, TraceId,
15};
16use cortex_ledger::{seal, JsonlLog};
17use rusqlite::{params, OptionalExtension, Row};
18
19use crate::{Pool, StoreError, StoreResult};
20
21pub const MIRROR_APPEND_PARITY_INVARIANT_RULE_ID: &str = "mirror.append.parity_invariant";
27
28#[derive(Debug, Clone, PartialEq, Eq)]
30pub struct EventMismatch {
31 pub id: EventId,
33 pub jsonl_event_hash: String,
35 pub sqlite_event_hash: String,
37}
38
39#[derive(Debug, Clone, PartialEq, Eq)]
41pub struct EventSetParity {
42 pub jsonl_event_count: usize,
44 pub sqlite_event_count: usize,
46 pub missing_in_sqlite: Vec<EventId>,
48 pub missing_in_jsonl: Vec<EventId>,
50 pub mismatched: Vec<EventMismatch>,
52}
53
54impl EventSetParity {
55 #[must_use]
57 pub fn is_consistent(&self) -> bool {
58 self.missing_in_sqlite.is_empty()
59 && self.missing_in_jsonl.is_empty()
60 && self.mismatched.is_empty()
61 }
62}
63
64#[derive(Debug, Clone, PartialEq, Eq)]
66pub struct ReplayReport {
67 pub replayed: usize,
69 pub skipped_existing: usize,
71 pub parity: EventSetParity,
73}
74
75pub fn append_event(
88 log: &mut JsonlLog,
89 pool: &mut Pool,
90 event: Event,
91 ledger_policy: &PolicyDecision,
92 mirror_policy: &PolicyDecision,
93) -> StoreResult<Event> {
94 require_mirror_parity_contributor(mirror_policy)?;
95 require_mirror_final_outcome(mirror_policy, "mirror.append")?;
96
97 if let Some(existing) = select_event_by_id(pool, &event.id)? {
98 return Err(StoreError::Validation(format!(
99 "event id `{}` already exists in SQLite with hash `{}`",
100 existing.id, existing.event_hash
101 )));
102 }
103
104 let mut sealed = event.clone();
105 sealed.prev_event_hash = log.head().map(str::to_owned);
106 seal(&mut sealed);
107
108 let appended_head = log.append(event, ledger_policy).map_err(jsonl_error)?;
109 if appended_head != sealed.event_hash {
110 return Err(StoreError::Validation(format!(
111 "JSONL append head `{appended_head}` did not match sealed event `{}`",
112 sealed.event_hash
113 )));
114 }
115
116 let tx = pool.transaction()?;
117 insert_event(&tx, &sealed)?;
118 tx.commit()?;
119 Ok(sealed)
120}
121
122pub fn append_signed_event(
136 log: &mut JsonlLog,
137 pool: &mut Pool,
138 event: Event,
139 attestor: &dyn Attestor,
140 ledger_policy: &PolicyDecision,
141 mirror_policy: &PolicyDecision,
142) -> StoreResult<Event> {
143 require_mirror_parity_contributor(mirror_policy)?;
144 require_mirror_final_outcome(mirror_policy, "mirror.append_signed")?;
145
146 if let Some(existing) = select_event_by_id(pool, &event.id)? {
147 return Err(StoreError::Validation(format!(
148 "event id `{}` already exists in SQLite with hash `{}`",
149 existing.id, existing.event_hash
150 )));
151 }
152
153 let mut sealed = event.clone();
154 sealed.prev_event_hash = log.head().map(str::to_owned);
155 seal(&mut sealed);
156
157 let appended_head = log
158 .append_signed(event, attestor, ledger_policy)
159 .map_err(jsonl_error)?;
160 if appended_head != sealed.event_hash {
161 return Err(StoreError::Validation(format!(
162 "signed JSONL append head `{appended_head}` did not match sealed event `{}`",
163 sealed.event_hash
164 )));
165 }
166
167 let tx = pool.transaction()?;
168 insert_event(&tx, &sealed)?;
169 tx.commit()?;
170 Ok(sealed)
171}
172
173pub fn mirror_single_event_into_sqlite(pool: &mut Pool, event: &Event) -> StoreResult<()> {
186 let tx = pool.transaction()?;
187 match mirror_single_event_into_sqlite_in_tx(&tx, event)? {
188 MirrorSingleEventOutcome::Inserted | MirrorSingleEventOutcome::AlreadyPresent => {
189 tx.commit()?;
190 Ok(())
191 }
192 }
193}
194
195#[derive(Debug, Clone, Copy, PartialEq, Eq)]
201pub enum MirrorSingleEventOutcome {
202 Inserted,
204 AlreadyPresent,
206}
207
208pub fn mirror_single_event_into_sqlite_in_tx(
217 tx: &rusqlite::Transaction<'_>,
218 event: &Event,
219) -> StoreResult<MirrorSingleEventOutcome> {
220 match select_event_by_id(tx, &event.id)? {
221 Some(existing) if existing == *event => Ok(MirrorSingleEventOutcome::AlreadyPresent),
222 Some(existing) => Err(StoreError::Validation(format!(
223 "event id `{}` already mirrored with diverging hash `{}` (expected `{}`); refusing to overwrite",
224 event.id, existing.event_hash, event.event_hash
225 ))),
226 None => {
227 insert_event(tx, event)?;
228 Ok(MirrorSingleEventOutcome::Inserted)
229 }
230 }
231}
232
233fn require_mirror_parity_contributor(policy: &PolicyDecision) -> StoreResult<()> {
234 let contains_rule = policy
235 .contributing
236 .iter()
237 .chain(policy.discarded.iter())
238 .any(|contribution| {
239 contribution.rule_id.as_str() == MIRROR_APPEND_PARITY_INVARIANT_RULE_ID
240 });
241 if contains_rule {
242 Ok(())
243 } else {
244 Err(StoreError::Validation(format!(
245 "policy decision missing required contributor `{MIRROR_APPEND_PARITY_INVARIANT_RULE_ID}`; caller skipped ADR 0026 composition",
246 )))
247 }
248}
249
250fn require_mirror_final_outcome(policy: &PolicyDecision, surface: &str) -> StoreResult<()> {
251 match policy.final_outcome {
252 PolicyOutcome::Allow | PolicyOutcome::Warn | PolicyOutcome::BreakGlass => Ok(()),
253 PolicyOutcome::Quarantine | PolicyOutcome::Reject => Err(StoreError::Validation(format!(
254 "{surface} preflight: composed policy outcome {:?} blocks mirrored append",
255 policy.final_outcome,
256 ))),
257 }
258}
259
260#[must_use]
265pub fn mirror_policy_decision_test_allow() -> PolicyDecision {
266 compose_policy_outcomes(
267 vec![PolicyContribution::new(
268 MIRROR_APPEND_PARITY_INVARIANT_RULE_ID,
269 PolicyOutcome::Allow,
270 "test fixture: JSONL <-> SQLite parity invariant satisfied",
271 )
272 .expect("static test contribution is valid")],
273 None,
274 )
275}
276
277pub fn replay_jsonl_into_sqlite(
283 pool: &mut Pool,
284 jsonl_path: impl AsRef<Path>,
285) -> StoreResult<ReplayReport> {
286 let jsonl_path = jsonl_path.as_ref();
287 let jsonl_events = read_jsonl_events(jsonl_path)?;
288 let mut replayed = 0;
289 let mut skipped_existing = 0;
290
291 {
292 let tx = pool.transaction()?;
293 for event in jsonl_events.values() {
294 match select_event_by_id(&tx, &event.id)? {
295 Some(existing) if existing == *event => skipped_existing += 1,
296 Some(existing) => {
297 return Err(StoreError::Validation(format!(
298 "event id `{}` differs between JSONL hash `{}` and SQLite hash `{}`",
299 event.id, event.event_hash, existing.event_hash
300 )));
301 }
302 None => {
303 insert_event(&tx, event)?;
304 replayed += 1;
305 }
306 }
307 }
308 tx.commit()?;
309 }
310
311 let parity = verify_event_set_parity(pool, jsonl_path)?;
312 Ok(ReplayReport {
313 replayed,
314 skipped_existing,
315 parity,
316 })
317}
318
319pub fn verify_event_set_parity(
321 pool: &Pool,
322 jsonl_path: impl AsRef<Path>,
323) -> StoreResult<EventSetParity> {
324 let jsonl_events = read_jsonl_events(jsonl_path.as_ref())?;
325 let sqlite_events = read_sqlite_events(pool)?;
326
327 let mut missing_in_sqlite = Vec::new();
328 let mut missing_in_jsonl = Vec::new();
329 let mut mismatched = Vec::new();
330
331 for (id, jsonl_event) in &jsonl_events {
332 match sqlite_events.get(id) {
333 Some(sqlite_event) if sqlite_event == jsonl_event => {}
334 Some(sqlite_event) => mismatched.push(EventMismatch {
335 id: jsonl_event.id,
336 jsonl_event_hash: jsonl_event.event_hash.clone(),
337 sqlite_event_hash: sqlite_event.event_hash.clone(),
338 }),
339 None => missing_in_sqlite.push(jsonl_event.id),
340 }
341 }
342
343 for (id, sqlite_event) in &sqlite_events {
344 if !jsonl_events.contains_key(id) {
345 missing_in_jsonl.push(sqlite_event.id);
346 }
347 }
348
349 Ok(EventSetParity {
350 jsonl_event_count: jsonl_events.len(),
351 sqlite_event_count: sqlite_events.len(),
352 missing_in_sqlite,
353 missing_in_jsonl,
354 mismatched,
355 })
356}
357
358fn read_jsonl_events(path: &Path) -> StoreResult<BTreeMap<String, Event>> {
359 if !path.is_file() {
360 return Err(StoreError::Validation(format!(
361 "JSONL mirror does not exist at `{}`",
362 path.display()
363 )));
364 }
365
366 let log = JsonlLog::open(path).map_err(jsonl_error)?;
367 log.verify_chain().map_err(jsonl_error)?;
368 let mut events = BTreeMap::new();
369 for item in log.iter().map_err(jsonl_error)? {
370 let event = item.map_err(jsonl_error)?;
371 let id = event.id.to_string();
372 if let Some(existing) = events.insert(id.clone(), event.clone()) {
373 return Err(StoreError::Validation(format!(
374 "duplicate event id `{id}` in JSONL mirror: hashes `{}` and `{}`",
375 existing.event_hash, event.event_hash
376 )));
377 }
378 }
379 Ok(events)
380}
381
382fn read_sqlite_events(pool: &Pool) -> StoreResult<BTreeMap<String, Event>> {
383 let mut stmt = pool.prepare("SELECT id FROM events ORDER BY id;")?;
384 let ids = stmt
385 .query_map([], |row| row.get::<_, String>(0))?
386 .collect::<Result<Vec<_>, _>>()?;
387
388 let mut events = BTreeMap::new();
389 for id in ids {
390 let event = select_event_by_id(pool, &id.parse::<EventId>()?)?.ok_or_else(|| {
391 StoreError::Validation(format!("event id `{id}` disappeared during parity read"))
392 })?;
393 events.insert(id, event);
394 }
395 Ok(events)
396}
397
398fn insert_event(conn: &rusqlite::Connection, event: &Event) -> StoreResult<()> {
399 let source_json = serde_json::to_string(&event.source)?;
400 let domain_tags_json = serde_json::to_string(&event.domain_tags)?;
401 let payload_json = serde_json::to_string(&event.payload)?;
402 let trace_id = event.trace_id.map(|id| id.to_string());
403
404 conn.execute(
405 "INSERT INTO events (
406 id, schema_version, observed_at, recorded_at, source_json,
407 event_type, trace_id, session_id, domain_tags_json, payload_json,
408 payload_hash, prev_event_hash, event_hash
409 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13);",
410 params![
411 event.id.to_string(),
412 i64::from(event.schema_version),
413 event.observed_at.to_rfc3339(),
414 event.recorded_at.to_rfc3339(),
415 source_json,
416 event.event_type.wire_str(),
417 trace_id,
418 event.session_id,
419 domain_tags_json,
420 payload_json,
421 event.payload_hash,
422 event.prev_event_hash,
423 event.event_hash,
424 ],
425 )?;
426 Ok(())
427}
428
429fn select_event_by_id(conn: &rusqlite::Connection, id: &EventId) -> StoreResult<Option<Event>> {
430 conn.query_row(
431 "SELECT id, schema_version, observed_at, recorded_at, source_json,
432 event_type, trace_id, session_id, domain_tags_json, payload_json,
433 payload_hash, prev_event_hash, event_hash
434 FROM events
435 WHERE id = ?1;",
436 params![id.to_string()],
437 event_row,
438 )
439 .optional()?
440 .map(TryInto::try_into)
441 .transpose()
442}
443
444#[derive(Debug)]
445struct EventRow {
446 id: String,
447 schema_version: i64,
448 observed_at: String,
449 recorded_at: String,
450 source_json: String,
451 event_type: String,
452 trace_id: Option<String>,
453 session_id: Option<String>,
454 domain_tags_json: String,
455 payload_json: String,
456 payload_hash: String,
457 prev_event_hash: Option<String>,
458 event_hash: String,
459}
460
461fn event_row(row: &Row<'_>) -> rusqlite::Result<EventRow> {
462 Ok(EventRow {
463 id: row.get(0)?,
464 schema_version: row.get(1)?,
465 observed_at: row.get(2)?,
466 recorded_at: row.get(3)?,
467 source_json: row.get(4)?,
468 event_type: row.get(5)?,
469 trace_id: row.get(6)?,
470 session_id: row.get(7)?,
471 domain_tags_json: row.get(8)?,
472 payload_json: row.get(9)?,
473 payload_hash: row.get(10)?,
474 prev_event_hash: row.get(11)?,
475 event_hash: row.get(12)?,
476 })
477}
478
479impl TryFrom<EventRow> for Event {
480 type Error = StoreError;
481
482 fn try_from(row: EventRow) -> StoreResult<Self> {
483 let schema_version = u16::try_from(row.schema_version).map_err(|_| {
484 StoreError::Validation(format!(
485 "invalid event schema_version {}",
486 row.schema_version
487 ))
488 })?;
489
490 Ok(Self {
491 id: row.id.parse::<EventId>()?,
492 schema_version,
493 observed_at: parse_utc(&row.observed_at)?,
494 recorded_at: parse_utc(&row.recorded_at)?,
495 source: serde_json::from_str::<EventSource>(&row.source_json)?,
496 event_type: serde_json::from_value::<EventType>(serde_json::Value::String(
497 row.event_type,
498 ))?,
499 trace_id: row.trace_id.map(|id| id.parse::<TraceId>()).transpose()?,
500 session_id: row.session_id,
501 domain_tags: serde_json::from_str(&row.domain_tags_json)?,
502 payload: serde_json::from_str(&row.payload_json)?,
503 payload_hash: row.payload_hash,
504 prev_event_hash: row.prev_event_hash,
505 event_hash: row.event_hash,
506 })
507 }
508}
509
510fn parse_utc(value: &str) -> StoreResult<DateTime<Utc>> {
511 Ok(DateTime::parse_from_rfc3339(value)?.with_timezone(&Utc))
512}
513
514fn jsonl_error(err: cortex_ledger::JsonlError) -> StoreError {
515 StoreError::Validation(err.to_string())
516}