use std::io::Write;
use std::path::{Path, PathBuf};
use std::time::{SystemTime, UNIX_EPOCH};
use chrono::{TimeZone, Utc};
use cortex_core::{
Event, EventSource, EventType, InMemoryAttestor, PolicyDecision, SCHEMA_VERSION,
};
use cortex_ledger::{
append_policy_decision_test_allow, append_signed_policy_decision_test_allow, JsonlLog,
};
use cortex_store::migrate::apply_pending;
use cortex_store::mirror::{
append_event, append_signed_event, mirror_policy_decision_test_allow, replay_jsonl_into_sqlite,
verify_event_set_parity,
};
use cortex_store::Pool;
use rusqlite::{Connection, OptionalExtension};
use serde_json::json;
fn test_pool() -> Pool {
let pool = Connection::open_in_memory().expect("open in-memory sqlite");
apply_pending(&pool).expect("apply migrations");
pool
}
fn jsonl_path(label: &str) -> PathBuf {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system clock after unix epoch")
.as_nanos();
std::env::temp_dir().join(format!(
"cortex-store-mirror-{label}-{}-{nanos}.jsonl",
std::process::id()
))
}
fn at(second: u32) -> chrono::DateTime<Utc> {
Utc.with_ymd_and_hms(2026, 1, 1, 12, 0, second).unwrap()
}
fn event(id: &str, second: u32) -> Event {
Event {
id: id.parse().unwrap(),
schema_version: SCHEMA_VERSION,
observed_at: at(second),
recorded_at: at(second),
source: EventSource::Tool {
name: "mirror-test".into(),
},
event_type: EventType::ToolResult,
trace_id: None,
session_id: Some("session-mirror".into()),
domain_tags: vec!["store".into(), "mirror".into()],
payload: json!({"ok": true, "second": second}),
payload_hash: String::new(),
prev_event_hash: None,
event_hash: String::new(),
}
}
fn fresh_attestor(seed: u8) -> InMemoryAttestor {
InMemoryAttestor::from_seed(&[seed; 32])
}
fn ledger_policy() -> PolicyDecision {
append_policy_decision_test_allow()
}
fn signed_ledger_policy() -> PolicyDecision {
append_signed_policy_decision_test_allow()
}
fn mirror_policy() -> PolicyDecision {
mirror_policy_decision_test_allow()
}
fn append_jsonl(path: &Path, events: &[Event]) -> Vec<Event> {
let mut log = JsonlLog::open(path).expect("open JSONL mirror");
let policy = append_policy_decision_test_allow();
for event in events {
log.append(event.clone(), &policy)
.expect("append JSONL event");
}
drop(log);
let log = JsonlLog::open(path).expect("reopen JSONL mirror");
log.iter()
.expect("iterate JSONL mirror")
.collect::<Result<Vec<_>, _>>()
.expect("read JSONL events")
}
fn sqlite_event_count(pool: &Pool) -> usize {
pool.query_row("SELECT COUNT(*) FROM events;", [], |row| {
row.get::<_, i64>(0)
})
.expect("count SQLite events") as usize
}
fn sqlite_event_hash(pool: &Pool, id: &str) -> Option<String> {
pool.query_row(
"SELECT event_hash FROM events WHERE id = ?1;",
[id],
|row| row.get::<_, String>(0),
)
.optional()
.expect("read SQLite event hash")
}
#[test]
fn mirror_replay_restores_missing_sqlite_events_and_reaches_parity() {
let path = jsonl_path("replay");
let mut pool = test_pool();
let sealed = append_jsonl(
&path,
&[
event("evt_01ARZ3NDEKTSV4RRFFQ69G5FAV", 1),
event("evt_01ARZ3NDEKTSV4RRFFQ69G5FAW", 2),
],
);
let before = verify_event_set_parity(&pool, &path).expect("verify parity before replay");
assert_eq!(before.jsonl_event_count, sealed.len());
assert_eq!(before.sqlite_event_count, 0);
assert_eq!(before.missing_in_sqlite.len(), sealed.len());
assert!(!before.is_consistent());
let report = replay_jsonl_into_sqlite(&mut pool, &path).expect("replay JSONL into SQLite");
assert_eq!(report.replayed, sealed.len());
assert_eq!(report.skipped_existing, 0);
assert!(report.parity.is_consistent());
let _ = std::fs::remove_file(path);
}
#[test]
fn mirror_replay_is_idempotent_for_existing_identical_events() {
let path = jsonl_path("idempotent");
let mut pool = test_pool();
let sealed = append_jsonl(
&path,
&[
event("evt_01ARZ3NDEKTSV4RRFFQ69G5FAV", 1),
event("evt_01ARZ3NDEKTSV4RRFFQ69G5FAW", 2),
],
);
replay_jsonl_into_sqlite(&mut pool, &path).expect("initial replay");
let second = replay_jsonl_into_sqlite(&mut pool, &path).expect("repeat replay");
assert_eq!(second.replayed, 0);
assert_eq!(second.skipped_existing, sealed.len());
assert!(second.parity.is_consistent());
let _ = std::fs::remove_file(path);
}
#[test]
fn mirror_replay_rejects_same_id_with_different_sqlite_content() {
let path = jsonl_path("conflict");
let mut pool = test_pool();
append_jsonl(&path, &[event("evt_01ARZ3NDEKTSV4RRFFQ69G5FAV", 1)]);
replay_jsonl_into_sqlite(&mut pool, &path).expect("initial replay");
pool.execute(
"UPDATE events SET event_hash = ?1 WHERE id = ?2;",
["tampered-hash", "evt_01ARZ3NDEKTSV4RRFFQ69G5FAV"],
)
.expect("tamper SQLite event row for deterministic conflict");
let parity = verify_event_set_parity(&pool, &path).expect("verify mismatched parity");
assert_eq!(parity.mismatched.len(), 1);
assert!(!parity.is_consistent());
let err = replay_jsonl_into_sqlite(&mut pool, &path).expect_err("conflict must fail");
assert!(
err.to_string().contains("differs between JSONL"),
"unexpected error: {err}"
);
let _ = std::fs::remove_file(path);
}
#[test]
fn mirror_replay_rejects_divergent_duplicate_jsonl_ids() {
let path = jsonl_path("duplicate-jsonl-id");
let mut pool = test_pool();
append_jsonl(
&path,
&[
event("evt_01ARZ3NDEKTSV4RRFFQ69G5FAV", 1),
event("evt_01ARZ3NDEKTSV4RRFFQ69G5FAV", 2),
],
);
let err = replay_jsonl_into_sqlite(&mut pool, &path).expect_err("duplicate id must fail");
assert!(
err.to_string().contains("duplicate event id"),
"unexpected error: {err}"
);
assert_eq!(sqlite_event_count(&pool), 0);
let _ = std::fs::remove_file(path);
}
#[test]
fn mirror_replay_rolls_back_missing_rows_when_later_conflict_is_detected() {
let path = jsonl_path("rollback-conflict");
let mut pool = test_pool();
append_jsonl(
&path,
&[
event("evt_01ARZ3NDEKTSV4RRFFQ69G5FAV", 1),
event("evt_01ARZ3NDEKTSV4RRFFQ69G5FAW", 2),
],
);
replay_jsonl_into_sqlite(&mut pool, &path).expect("initial replay");
pool.execute(
"DELETE FROM events WHERE id = ?1;",
["evt_01ARZ3NDEKTSV4RRFFQ69G5FAV"],
)
.expect("remove first row to create missing replay candidate");
pool.execute(
"UPDATE events SET event_hash = ?1 WHERE id = ?2;",
["tampered-hash", "evt_01ARZ3NDEKTSV4RRFFQ69G5FAW"],
)
.expect("tamper later row to create conflict");
let err = replay_jsonl_into_sqlite(&mut pool, &path).expect_err("conflict must fail");
assert!(
err.to_string().contains("differs between JSONL"),
"unexpected error: {err}"
);
assert_eq!(sqlite_event_count(&pool), 1);
assert_eq!(
sqlite_event_hash(&pool, "evt_01ARZ3NDEKTSV4RRFFQ69G5FAV"),
None,
"missing row insertion must roll back with the failed replay transaction"
);
assert_eq!(
sqlite_event_hash(&pool, "evt_01ARZ3NDEKTSV4RRFFQ69G5FAW").as_deref(),
Some("tampered-hash"),
"existing divergent immutable row must not be overwritten"
);
let _ = std::fs::remove_file(path);
}
#[test]
fn mirror_replay_rejects_partial_jsonl_tail_without_sqlite_side_effects() {
let path = jsonl_path("partial-tail");
let mut pool = test_pool();
append_jsonl(&path, &[event("evt_01ARZ3NDEKTSV4RRFFQ69G5FAV", 1)]);
let mut file = std::fs::OpenOptions::new()
.append(true)
.open(&path)
.expect("open JSONL mirror for partial append");
file.write_all(br#"{"event":{"id":"evt_01ARZ3NDEKTSV4RRFFQ69G5FAW""#)
.expect("write partial JSONL row");
file.sync_all().expect("fsync partial JSONL row");
let err = replay_jsonl_into_sqlite(&mut pool, &path).expect_err("partial row must fail closed");
assert!(
err.to_string().contains("json decode error"),
"unexpected error: {err}"
);
assert_eq!(sqlite_event_count(&pool), 0);
let _ = std::fs::remove_file(path);
}
#[test]
fn mirror_deterministic_crash_window_replay_recovers_acknowledged_jsonl_x100() {
for iteration in 0..100 {
let path = jsonl_path(&format!("crash-window-{iteration}"));
let mut pool = test_pool();
let acknowledged = append_jsonl(
&path,
&[
event("evt_01ARZ3NDEKTSV4RRFFQ69G5FAV", 1),
event("evt_01ARZ3NDEKTSV4RRFFQ69G5FAW", 2),
],
);
let pre_replay = verify_event_set_parity(&pool, &path).expect("verify pre-replay parity");
assert_eq!(pre_replay.missing_in_sqlite.len(), acknowledged.len());
assert!(
!pre_replay.is_consistent(),
"iteration {iteration} should model JSONL-ahead crash window"
);
let report = replay_jsonl_into_sqlite(&mut pool, &path)
.expect("replay acknowledged JSONL rows into SQLite");
assert_eq!(report.replayed, acknowledged.len());
assert_eq!(report.skipped_existing, 0);
assert!(
report.parity.is_consistent(),
"iteration {iteration} should recover exact event-set parity"
);
let log = JsonlLog::open(&path).expect("reopen JSONL mirror");
let recovered = log
.iter()
.expect("iterate recovered JSONL mirror")
.collect::<Result<Vec<_>, _>>()
.expect("read recovered JSONL events");
assert_eq!(recovered, acknowledged);
assert_eq!(recovered[0].prev_event_hash, None);
assert_eq!(
recovered[1].prev_event_hash.as_deref(),
Some(recovered[0].event_hash.as_str()),
"iteration {iteration} should preserve hash-chain order"
);
let _ = std::fs::remove_file(path);
}
}
#[test]
fn mirrored_append_writes_the_same_sealed_event_to_both_stores() {
let path = jsonl_path("append");
let mut pool = test_pool();
let mut log = JsonlLog::open(&path).expect("open JSONL mirror");
let sealed = append_event(
&mut log,
&mut pool,
event("evt_01ARZ3NDEKTSV4RRFFQ69G5FAV", 1),
&ledger_policy(),
&mirror_policy(),
)
.expect("append mirrored event");
let parity = verify_event_set_parity(&pool, &path).expect("verify parity");
assert!(parity.is_consistent());
let log = JsonlLog::open(&path).expect("reopen JSONL mirror");
let jsonl_events = log
.iter()
.expect("iterate JSONL mirror")
.collect::<Result<Vec<_>, _>>()
.expect("read JSONL events");
assert_eq!(jsonl_events, vec![sealed]);
let _ = std::fs::remove_file(path);
}
#[test]
fn mirrored_append_rejects_existing_sqlite_id_before_jsonl_append() {
let path = jsonl_path("append-duplicate-preflight");
let mut pool = test_pool();
let mut log = JsonlLog::open(&path).expect("open JSONL mirror");
append_event(
&mut log,
&mut pool,
event("evt_01ARZ3NDEKTSV4RRFFQ69G5FAV", 1),
&ledger_policy(),
&mirror_policy(),
)
.expect("append initial mirrored event");
let len_before = JsonlLog::open(&path).expect("reopen JSONL mirror").len();
let err = append_event(
&mut log,
&mut pool,
event("evt_01ARZ3NDEKTSV4RRFFQ69G5FAV", 2),
&ledger_policy(),
&mirror_policy(),
)
.expect_err("existing SQLite id must fail before JSONL append");
assert!(
err.to_string().contains("already exists in SQLite"),
"unexpected error: {err}"
);
let log = JsonlLog::open(&path).expect("reopen JSONL mirror after rejected append");
assert_eq!(log.len(), len_before);
log.verify_chain()
.expect("JSONL chain remains valid after rejected append");
let _ = std::fs::remove_file(path);
}
#[test]
fn signed_mirrored_append_writes_replayable_sqlite_visible_row() {
let path = jsonl_path("signed-append");
let mut pool = test_pool();
let mut log = JsonlLog::open(&path).expect("open JSONL mirror");
let attestor = fresh_attestor(11);
let sealed = append_signed_event(
&mut log,
&mut pool,
event("evt_01ARZ3NDEKTSV4RRFFQ69G5FAV", 1),
&attestor,
&signed_ledger_policy(),
&mirror_policy(),
)
.expect("append signed mirrored event");
let parity = verify_event_set_parity(&pool, &path).expect("verify parity");
assert!(parity.is_consistent());
assert_eq!(sqlite_event_count(&pool), 1);
assert_eq!(
sqlite_event_hash(&pool, "evt_01ARZ3NDEKTSV4RRFFQ69G5FAV").as_deref(),
Some(sealed.event_hash.as_str())
);
let report = replay_jsonl_into_sqlite(&mut pool, &path).expect("replay signed JSONL mirror");
assert_eq!(report.replayed, 0);
assert_eq!(report.skipped_existing, 1);
assert!(report.parity.is_consistent());
let log = JsonlLog::open(&path).expect("reopen JSONL mirror");
let jsonl_events = log
.iter()
.expect("iterate JSONL mirror")
.collect::<Result<Vec<_>, _>>()
.expect("read JSONL events");
assert_eq!(jsonl_events, vec![sealed]);
let _ = std::fs::remove_file(path);
}
#[test]
fn signed_mirrored_append_rejects_existing_sqlite_id_before_jsonl_append() {
let path = jsonl_path("signed-append-duplicate-preflight");
let mut pool = test_pool();
let mut log = JsonlLog::open(&path).expect("open JSONL mirror");
let attestor = fresh_attestor(12);
append_signed_event(
&mut log,
&mut pool,
event("evt_01ARZ3NDEKTSV4RRFFQ69G5FAV", 1),
&attestor,
&signed_ledger_policy(),
&mirror_policy(),
)
.expect("append initial signed mirrored event");
let len_before = JsonlLog::open(&path).expect("reopen JSONL mirror").len();
let err = append_signed_event(
&mut log,
&mut pool,
event("evt_01ARZ3NDEKTSV4RRFFQ69G5FAV", 2),
&attestor,
&signed_ledger_policy(),
&mirror_policy(),
)
.expect_err("existing SQLite id must fail before signed JSONL append");
assert!(
err.to_string().contains("already exists in SQLite"),
"unexpected error: {err}"
);
let log = JsonlLog::open(&path).expect("reopen JSONL mirror after rejected append");
assert_eq!(log.len(), len_before);
log.verify_chain()
.expect("JSONL chain remains valid after rejected append");
let _ = std::fs::remove_file(path);
}