use std::path::{Path, PathBuf};
use cortex_core::{
attestor::{verify_rotation, VerifyError},
canonical::{canonical_signing_input, SCHEMA_VERSION_ATTESTATION},
EventId, EventType, SCHEMA_MIGRATION_V1_TO_V2_EVENT_KIND,
};
use ed25519_dalek::{Signature, Verifier, VerifyingKey};
use serde::{Deserialize, Serialize};
use crate::anchor_chain::{extract_rotation_payload, row_preimage, GENESIS_PREV_SIGNATURE};
use crate::hash::{event_hash, payload_hash};
use crate::jsonl::{JsonlError, JsonlLog};
use crate::signed_row::b64_decode;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum FailureReason {
Decode {
message: String,
},
UnknownEventSchemaVersion {
observed: u16,
expected: u16,
},
PostCutoverV2AuditDispatchUnsupported {
observed: u16,
event_type: EventType,
},
Orphan {
observed_prev: Option<String>,
expected_prev: Option<String>,
},
HashBreak {
which: HashKind,
observed: String,
expected: String,
},
OrdinalGap {
trace_id: String,
observed: u64,
expected: u64,
},
MissingSignature,
BadSignature {
key_id: String,
message: String,
},
UnknownAttestationSchemaVersion {
observed: u16,
expected: u16,
},
RotationEnvelopeRejected {
message: String,
},
}
impl FailureReason {
#[must_use]
pub fn invariant(&self) -> Option<&'static str> {
match self {
Self::UnknownEventSchemaVersion { .. } => {
Some(UNSUPPORTED_EVENT_SCHEMA_VERSION_INVARIANT)
}
Self::PostCutoverV2AuditDispatchUnsupported { .. } => {
Some(POST_CUTOVER_V2_AUDIT_DISPATCH_UNSUPPORTED_INVARIANT)
}
_ => None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum HashKind {
PayloadHashMismatch,
EventHashMismatch,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RowFailure {
pub line: usize,
pub event_id: Option<EventId>,
pub reason: FailureReason,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Report {
pub path: PathBuf,
pub rows_scanned: usize,
pub failures: Vec<RowFailure>,
}
impl Report {
#[must_use]
pub fn ok(&self) -> bool {
self.failures.is_empty()
}
}
pub const UNSUPPORTED_EVENT_SCHEMA_VERSION_INVARIANT: &str =
"audit.event_schema_version.unsupported";
pub const POST_CUTOVER_V2_AUDIT_DISPATCH_UNSUPPORTED_INVARIANT: &str =
"audit.post_cutover_v2.dispatch.unsupported";
pub const SCHEMA_MIGRATION_V1_TO_V2_BOUNDARY_MISSING_INVARIANT: &str =
"schema_migration.v1_to_v2.boundary.missing";
pub const SCHEMA_MIGRATION_V1_TO_V2_BOUNDARY_DUPLICATE_INVARIANT: &str =
"schema_migration.v1_to_v2.boundary.duplicate";
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SchemaMigrationBoundaryRow {
pub line: usize,
pub event_id: EventId,
pub event_hash: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum SchemaMigrationBoundaryFailureDetail {
Missing {
required: bool,
observed: usize,
},
Duplicate {
observed: usize,
lines: Vec<usize>,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SchemaMigrationBoundaryFailure {
pub invariant: String,
pub detail: SchemaMigrationBoundaryFailureDetail,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SchemaMigrationBoundaryReport {
pub path: PathBuf,
pub required: bool,
pub rows_scanned: usize,
pub boundary_rows: Vec<SchemaMigrationBoundaryRow>,
pub failures: Vec<SchemaMigrationBoundaryFailure>,
}
impl SchemaMigrationBoundaryReport {
#[must_use]
pub fn ok(&self) -> bool {
self.failures.is_empty()
}
}
const SUPPORTED_V1_EVENT_SCHEMA_VERSION: u16 = 1;
const SUPPORTED_SCHEMA_MIGRATION_BOUNDARY_VERSION: u16 = 2;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum EventHashFraming {
V1,
SchemaMigrationV1ToV2Boundary,
V2ExistingEventWire,
Unknown,
}
fn is_schema_migration_v1_to_v2_boundary(e: &cortex_core::Event) -> bool {
e.schema_version == SUPPORTED_SCHEMA_MIGRATION_BOUNDARY_VERSION
&& e.event_type == EventType::SystemNote
&& e.payload.get("kind").and_then(serde_json::Value::as_str)
== Some(SCHEMA_MIGRATION_V1_TO_V2_EVENT_KIND)
}
fn event_hash_framing(e: &cortex_core::Event) -> EventHashFraming {
match e.schema_version {
SUPPORTED_V1_EVENT_SCHEMA_VERSION => EventHashFraming::V1,
SUPPORTED_SCHEMA_MIGRATION_BOUNDARY_VERSION if is_schema_migration_v1_to_v2_boundary(e) => {
EventHashFraming::SchemaMigrationV1ToV2Boundary
}
SUPPORTED_SCHEMA_MIGRATION_BOUNDARY_VERSION => EventHashFraming::V2ExistingEventWire,
_ => EventHashFraming::Unknown,
}
}
impl EventHashFraming {
fn supports_hash_chain(self) -> bool {
matches!(
self,
Self::V1 | Self::SchemaMigrationV1ToV2Boundary | Self::V2ExistingEventWire
)
}
fn unsupported_reason(self, e: &cortex_core::Event) -> Option<FailureReason> {
match self {
Self::V1 | Self::SchemaMigrationV1ToV2Boundary | Self::V2ExistingEventWire => None,
Self::Unknown => Some(FailureReason::UnknownEventSchemaVersion {
observed: e.schema_version,
expected: SUPPORTED_V1_EVENT_SCHEMA_VERSION,
}),
}
}
}
fn unsupported_event_schema_reason(e: &cortex_core::Event) -> FailureReason {
event_hash_framing(e)
.unsupported_reason(e)
.expect("unsupported_event_schema_reason called only for unsupported framing")
}
fn check_hash_chain_fields(
line: usize,
e: &cortex_core::Event,
prev_event_hash: &Option<String>,
failures: &mut Vec<RowFailure>,
) -> bool {
let framing = event_hash_framing(e);
if !framing.supports_hash_chain() {
failures.push(RowFailure {
line,
event_id: Some(e.id),
reason: unsupported_event_schema_reason(e),
});
return false;
}
let expected_payload = payload_hash(&e.payload);
if e.payload_hash != expected_payload {
failures.push(RowFailure {
line,
event_id: Some(e.id),
reason: FailureReason::HashBreak {
which: HashKind::PayloadHashMismatch,
observed: e.payload_hash.clone(),
expected: expected_payload,
},
});
}
let expected_event = event_hash(e.prev_event_hash.as_deref(), &e.payload_hash);
if e.event_hash != expected_event {
failures.push(RowFailure {
line,
event_id: Some(e.id),
reason: FailureReason::HashBreak {
which: HashKind::EventHashMismatch,
observed: e.event_hash.clone(),
expected: expected_event,
},
});
}
if e.prev_event_hash != *prev_event_hash {
failures.push(RowFailure {
line,
event_id: Some(e.id),
reason: FailureReason::Orphan {
observed_prev: e.prev_event_hash.clone(),
expected_prev: prev_event_hash.clone(),
},
});
}
true
}
pub fn verify_chain(path: impl AsRef<Path>) -> Result<Report, JsonlError> {
let path = path.as_ref().to_path_buf();
let log = JsonlLog::open(&path)?;
let mut failures = Vec::new();
let mut rows_scanned = 0usize;
let mut prev_event_hash: Option<String> = None;
for (i, item) in log.iter()?.enumerate() {
let line = i + 1;
rows_scanned += 1;
let e = match item {
Ok(e) => e,
Err(JsonlError::Decode { source, .. }) => {
failures.push(RowFailure {
line,
event_id: None,
reason: FailureReason::Decode {
message: source.to_string(),
},
});
continue;
}
Err(other) => return Err(other),
};
if !check_hash_chain_fields(line, &e, &prev_event_hash, &mut failures) {
continue;
}
prev_event_hash = Some(e.event_hash.clone());
}
Ok(Report {
path,
rows_scanned,
failures,
})
}
pub fn verify_schema_migration_v1_to_v2_boundary(
path: impl AsRef<Path>,
required: bool,
) -> Result<SchemaMigrationBoundaryReport, JsonlError> {
let path = path.as_ref().to_path_buf();
let log = JsonlLog::open(&path)?;
let mut rows_scanned = 0usize;
let mut boundary_rows = Vec::new();
for (i, item) in log.iter()?.enumerate() {
let line = i + 1;
rows_scanned += 1;
let e = item?;
if is_schema_migration_v1_to_v2_boundary(&e) {
boundary_rows.push(SchemaMigrationBoundaryRow {
line,
event_id: e.id,
event_hash: e.event_hash,
});
}
}
let mut failures = Vec::new();
if required && boundary_rows.is_empty() {
failures.push(SchemaMigrationBoundaryFailure {
invariant: SCHEMA_MIGRATION_V1_TO_V2_BOUNDARY_MISSING_INVARIANT.to_string(),
detail: SchemaMigrationBoundaryFailureDetail::Missing {
required,
observed: boundary_rows.len(),
},
});
}
if boundary_rows.len() > 1 {
failures.push(SchemaMigrationBoundaryFailure {
invariant: SCHEMA_MIGRATION_V1_TO_V2_BOUNDARY_DUPLICATE_INVARIANT.to_string(),
detail: SchemaMigrationBoundaryFailureDetail::Duplicate {
observed: boundary_rows.len(),
lines: boundary_rows.iter().map(|row| row.line).collect(),
},
});
}
Ok(SchemaMigrationBoundaryReport {
path,
required,
rows_scanned,
boundary_rows,
failures,
})
}
#[derive(Debug, Clone)]
pub struct SignedChainOutcome {
pub report: Report,
pub active_pubkey: VerifyingKey,
}
pub fn verify_signed_chain(
path: impl AsRef<Path>,
initial_pubkey: &VerifyingKey,
initial_key_id: &str,
) -> Result<SignedChainOutcome, JsonlError> {
let path = path.as_ref().to_path_buf();
let log = JsonlLog::open(&path)?;
let ledger_id = crate::jsonl::ledger_id_for(&path);
let mut failures = Vec::new();
let mut rows_scanned = 0usize;
let mut prev_event_hash: Option<String> = None;
let mut prev_sig_prefix: [u8; 32] = GENESIS_PREV_SIGNATURE;
let mut active_pubkey: VerifyingKey = *initial_pubkey;
let mut active_key_id: String = initial_key_id.to_string();
for (i, item) in log.iter_signed()?.enumerate() {
let line = i + 1;
rows_scanned += 1;
let row = match item {
Ok(r) => r,
Err(JsonlError::Decode { source, .. }) => {
failures.push(RowFailure {
line,
event_id: None,
reason: FailureReason::Decode {
message: source.to_string(),
},
});
continue;
}
Err(other) => return Err(other),
};
let e = &row.event;
if !check_hash_chain_fields(line, e, &prev_event_hash, &mut failures) {
continue;
}
prev_event_hash = Some(e.event_hash.clone());
let Some(sig_field) = row.signature.as_ref() else {
failures.push(RowFailure {
line,
event_id: Some(e.id),
reason: FailureReason::MissingSignature,
});
continue;
};
if sig_field.schema_version != SCHEMA_VERSION_ATTESTATION {
failures.push(RowFailure {
line,
event_id: Some(e.id),
reason: FailureReason::UnknownAttestationSchemaVersion {
observed: sig_field.schema_version,
expected: SCHEMA_VERSION_ATTESTATION,
},
});
continue;
}
let sig_bytes = match b64_decode(&sig_field.bytes) {
Some(v) if v.len() == 64 => v,
_ => {
failures.push(RowFailure {
line,
event_id: Some(e.id),
reason: FailureReason::BadSignature {
key_id: sig_field.key_id.clone(),
message: "malformed base64 or wrong length".into(),
},
});
continue;
}
};
let sig_arr: [u8; 64] = sig_bytes
.as_slice()
.try_into()
.expect("len-64 vec converts to [u8; 64]");
let signature = Signature::from_bytes(&sig_arr);
let preimage = row_preimage(
e,
&prev_sig_prefix,
&ledger_id,
&active_key_id,
sig_field.signed_at,
);
let signing_input = canonical_signing_input(&preimage);
if active_pubkey.verify(&signing_input, &signature).is_err() {
failures.push(RowFailure {
line,
event_id: Some(e.id),
reason: FailureReason::BadSignature {
key_id: sig_field.key_id.clone(),
message: "signature did not verify under active key".into(),
},
});
prev_sig_prefix.copy_from_slice(&sig_arr[..32]);
continue;
}
if let Some(payload) = extract_rotation_payload(e) {
if payload.envelope.old_pubkey != active_pubkey.to_bytes() {
failures.push(RowFailure {
line,
event_id: Some(e.id),
reason: FailureReason::RotationEnvelopeRejected {
message: "envelope.old_pubkey does not match currently-active operator key"
.into(),
},
});
} else {
match verify_rotation(&payload.envelope) {
Ok(()) => {
match VerifyingKey::from_bytes(&payload.envelope.new_pubkey) {
Ok(new_pk) => {
active_key_id = hex_lower(&payload.envelope.new_pubkey);
active_pubkey = new_pk;
}
Err(_) => {
failures.push(RowFailure {
line,
event_id: Some(e.id),
reason: FailureReason::RotationEnvelopeRejected {
message: "envelope.new_pubkey is not a valid Ed25519 \
point"
.into(),
},
});
}
}
}
Err(err) => {
failures.push(RowFailure {
line,
event_id: Some(e.id),
reason: FailureReason::RotationEnvelopeRejected {
message: rotation_error_message(&err),
},
});
}
}
}
}
prev_sig_prefix.copy_from_slice(&sig_arr[..32]);
}
Ok(SignedChainOutcome {
report: Report {
path,
rows_scanned,
failures,
},
active_pubkey,
})
}
fn rotation_error_message(e: &VerifyError) -> String {
match e {
VerifyError::UnknownSchemaVersion { found, expected } => {
format!("rotation envelope unknown schema_version (found {found}, expected {expected})")
}
VerifyError::KeyIdMismatch { preimage, expected } => {
format!("rotation envelope key_id mismatch (preimage={preimage}, expected={expected})")
}
VerifyError::BadSignature => "rotation envelope signature did not verify".into(),
VerifyError::MalformedSignature => "rotation envelope signature bytes are malformed".into(),
}
}
fn hex_lower(bytes: &[u8]) -> String {
let mut s = String::with_capacity(bytes.len() * 2);
for b in bytes {
s.push_str(&format!("{b:02x}"));
}
s
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::TimeZone;
use cortex_core::{
Event, EventId, EventSource, EventType, SchemaMigrationV1ToV2Payload,
SCHEMA_MIGRATION_V1_TO_V2_EVENT_KIND, SCHEMA_VERSION,
};
use std::io::Write;
use tempfile::tempdir;
fn fixture_event(seq: u64) -> Event {
Event {
id: EventId::new(),
schema_version: SCHEMA_VERSION,
observed_at: chrono::Utc.with_ymd_and_hms(2026, 1, 1, 12, 0, 0).unwrap(),
recorded_at: chrono::Utc.with_ymd_and_hms(2026, 1, 1, 12, 0, 1).unwrap(),
source: EventSource::User,
event_type: EventType::UserMessage,
trace_id: None,
session_id: None,
domain_tags: vec![],
payload: serde_json::json!({"seq": seq}),
payload_hash: String::new(),
prev_event_hash: None,
event_hash: String::new(),
}
}
fn allow_policy() -> cortex_core::PolicyDecision {
crate::jsonl::append_policy_decision_test_allow()
}
fn migration_allow_policy() -> cortex_core::PolicyDecision {
crate::jsonl::schema_migration_v1_to_v2_policy_decision_test_allow()
}
#[test]
fn clean_chain_reports_ok() {
let dir = tempdir().unwrap();
let path = dir.path().join("clean.jsonl");
let mut log = JsonlLog::open(&path).unwrap();
for i in 0..5u64 {
log.append(fixture_event(i), &allow_policy()).unwrap();
}
let report = verify_chain(&path).unwrap();
let failures = &report.failures;
assert!(report.ok(), "expected clean chain, got {failures:?}");
assert_eq!(report.rows_scanned, 5);
}
#[test]
fn corruption_fixture_produces_expected_failure_report() {
let dir = tempdir().unwrap();
let path = dir.path().join("corrupt.jsonl");
let mut log = JsonlLog::open(&path).unwrap();
for i in 0..4u64 {
log.append(fixture_event(i), &allow_policy()).unwrap();
}
let raw = std::fs::read_to_string(&path).unwrap();
let mut rows: Vec<Event> = raw
.lines()
.filter(|l| !l.trim().is_empty())
.map(|l| serde_json::from_str::<Event>(l).unwrap())
.collect();
assert_eq!(rows.len(), 4);
rows[1].payload = serde_json::json!({"tampered": true});
rows[3].prev_event_hash = Some("0".repeat(64));
crate::hash::seal(&mut rows[3]);
let mut f = std::fs::OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(&path)
.unwrap();
for r in &rows {
writeln!(f, "{}", serde_json::to_string(r).unwrap()).unwrap();
}
f.sync_all().unwrap();
drop(f);
let report = verify_chain(&path).unwrap();
assert_eq!(report.rows_scanned, 4);
let by_line: std::collections::BTreeMap<usize, Vec<&FailureReason>> = report
.failures
.iter()
.fold(std::collections::BTreeMap::new(), |mut m, f| {
m.entry(f.line).or_default().push(&f.reason);
m
});
assert!(!by_line.contains_key(&1), "row 1 should be clean");
let r2 = by_line.get(&2).expect("row 2 should have failures");
assert!(
r2.iter().any(|r| matches!(
r,
FailureReason::HashBreak {
which: HashKind::PayloadHashMismatch,
..
}
)),
"row 2 missing PayloadHashMismatch: {r2:?}",
);
assert!(
!r2.iter().any(|r| matches!(
r,
FailureReason::HashBreak {
which: HashKind::EventHashMismatch,
..
}
)),
"row 2 should not flag EventHashMismatch (event_hash recompute \
uses stored payload_hash, which is internally consistent): {r2:?}",
);
assert!(
!by_line.contains_key(&3),
"row 3 should be clean, got {by_line:?}"
);
let r4 = by_line.get(&4).expect("row 4 should have failures");
assert!(
r4.iter().any(|r| matches!(r, FailureReason::Orphan { .. })),
"row 4 missing Orphan: {r4:?}",
);
assert!(
!r4.iter()
.any(|r| matches!(r, FailureReason::HashBreak { .. })),
"row 4 should not have HashBreak failures (we re-sealed): {r4:?}",
);
assert!(!report.ok());
}
#[test]
fn mutated_event_hash_triggers_event_hash_mismatch() {
let dir = tempdir().unwrap();
let path = dir.path().join("ehm.jsonl");
let mut log = JsonlLog::open(&path).unwrap();
for i in 0..3u64 {
log.append(fixture_event(i), &allow_policy()).unwrap();
}
let raw = std::fs::read_to_string(&path).unwrap();
let mut rows: Vec<Event> = raw
.lines()
.filter(|l| !l.trim().is_empty())
.map(|l| serde_json::from_str::<Event>(l).unwrap())
.collect();
rows[1].event_hash = format!("{}{}", "f", &rows[1].event_hash[1..]);
let mut f = std::fs::OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(&path)
.unwrap();
for r in &rows {
writeln!(f, "{}", serde_json::to_string(r).unwrap()).unwrap();
}
f.sync_all().unwrap();
drop(f);
let report = verify_chain(&path).unwrap();
let failures = &report.failures;
assert!(
failures.iter().any(|r| r.line == 2
&& matches!(
r.reason,
FailureReason::HashBreak {
which: HashKind::EventHashMismatch,
..
}
)),
"expected EventHashMismatch on row 2: {failures:?}",
);
assert!(
failures
.iter()
.any(|r| r.line == 3 && matches!(r.reason, FailureReason::Orphan { .. })),
"expected Orphan on row 3: {failures:?}",
);
}
#[test]
fn unknown_event_schema_version_fails_closed_without_v1_hash_recompute() {
let dir = tempdir().unwrap();
let path = dir.path().join("unknown-schema.jsonl");
let mut log = JsonlLog::open(&path).unwrap();
log.append(fixture_event(0), &allow_policy()).unwrap();
let raw = std::fs::read_to_string(&path).unwrap();
let mut row: Event = serde_json::from_str(raw.lines().next().unwrap()).unwrap();
row.schema_version = SUPPORTED_SCHEMA_MIGRATION_BOUNDARY_VERSION + 1;
row.payload = serde_json::json!({"tampered": true});
std::fs::write(&path, format!("{}\n", serde_json::to_string(&row).unwrap())).unwrap();
let report = verify_chain(&path).unwrap();
let failures = &report.failures;
let unsupported_schema_failure = failures.iter().find(|failure| {
matches!(
failure.reason,
FailureReason::UnknownEventSchemaVersion {
observed,
expected
} if observed == 3 && expected == SUPPORTED_V1_EVENT_SCHEMA_VERSION
)
});
assert!(
unsupported_schema_failure.is_some(),
"expected unknown event schema failure: {failures:?}"
);
assert_eq!(
unsupported_schema_failure.unwrap().reason.invariant(),
Some(UNSUPPORTED_EVENT_SCHEMA_VERSION_INVARIANT)
);
assert!(
!failures
.iter()
.any(|failure| matches!(failure.reason, FailureReason::HashBreak { .. })),
"unknown event schemas must not be verified under v1 hash framing: {failures:?}"
);
}
fn schema_migration_payload(previous_head: impl Into<String>) -> SchemaMigrationV1ToV2Payload {
SchemaMigrationV1ToV2Payload::new(previous_head, "script-digest", None, "fixture-digest")
}
#[test]
fn schema_migration_boundary_exactly_one_passes_when_required() {
let dir = tempdir().unwrap();
let path = dir.path().join("boundary-required.jsonl");
let mut log = JsonlLog::open(&path).unwrap();
log.append(fixture_event(0), &allow_policy()).unwrap();
let v1_head = log.head().expect("v1 head").to_string();
log.append_schema_migration_v1_to_v2(
schema_migration_payload(v1_head),
&migration_allow_policy(),
)
.expect("append boundary event");
let report = verify_schema_migration_v1_to_v2_boundary(&path, true).unwrap();
assert!(report.ok(), "exactly-one boundary should pass: {report:?}");
assert_eq!(report.rows_scanned, 2);
assert_eq!(report.boundary_rows.len(), 1);
assert_eq!(report.boundary_rows[0].line, 2);
assert!(report.failures.is_empty());
}
#[test]
fn schema_migration_boundary_missing_fails_when_required() {
let dir = tempdir().unwrap();
let path = dir.path().join("boundary-missing.jsonl");
let mut log = JsonlLog::open(&path).unwrap();
log.append(fixture_event(0), &allow_policy()).unwrap();
let report = verify_schema_migration_v1_to_v2_boundary(&path, true).unwrap();
assert!(!report.ok(), "missing boundary should fail");
assert_eq!(report.rows_scanned, 1);
assert!(report.boundary_rows.is_empty());
assert_eq!(report.failures.len(), 1);
assert_eq!(
report.failures[0].invariant,
SCHEMA_MIGRATION_V1_TO_V2_BOUNDARY_MISSING_INVARIANT
);
assert!(matches!(
report.failures[0].detail,
SchemaMigrationBoundaryFailureDetail::Missing {
required: true,
observed: 0
}
));
}
#[test]
fn schema_migration_boundary_duplicate_fails_with_lines() {
let dir = tempdir().unwrap();
let path = dir.path().join("boundary-duplicate.jsonl");
let mut log = JsonlLog::open(&path).unwrap();
log.append(fixture_event(0), &allow_policy()).unwrap();
let v1_head = log.head().expect("v1 head").to_string();
let first_boundary_hash = log
.append_schema_migration_v1_to_v2(
schema_migration_payload(v1_head),
&migration_allow_policy(),
)
.expect("append first boundary event");
log.append_schema_migration_v1_to_v2(
schema_migration_payload(first_boundary_hash),
&migration_allow_policy(),
)
.expect("append duplicate boundary event");
let chain_report = verify_chain(&path).unwrap();
assert!(
chain_report.ok(),
"duplicate fixture should isolate boundary count, not chain damage: {chain_report:?}"
);
let report = verify_schema_migration_v1_to_v2_boundary(&path, true).unwrap();
assert!(!report.ok(), "duplicate boundary should fail");
assert_eq!(report.rows_scanned, 3);
assert_eq!(report.boundary_rows.len(), 2);
assert_eq!(report.failures.len(), 1);
assert_eq!(
report.failures[0].invariant,
SCHEMA_MIGRATION_V1_TO_V2_BOUNDARY_DUPLICATE_INVARIANT
);
assert!(matches!(
&report.failures[0].detail,
SchemaMigrationBoundaryFailureDetail::Duplicate {
observed: 2,
lines
} if lines == &vec![2, 3]
));
}
#[test]
fn schema_migration_verify_chain_crosses_v1_to_v2_boundary_without_rewriting_v1_row() {
let dir = tempdir().unwrap();
let path = dir.path().join("boundary.jsonl");
let mut log = JsonlLog::open(&path).unwrap();
log.append(fixture_event(0), &allow_policy()).unwrap();
let before_raw = std::fs::read_to_string(&path).unwrap();
let before_v1: Event = serde_json::from_str(before_raw.lines().next().unwrap()).unwrap();
let payload = schema_migration_payload(before_v1.event_hash.clone());
log.append_schema_migration_v1_to_v2(payload, &migration_allow_policy())
.expect("append boundary event");
let report = verify_chain(&path).unwrap();
assert!(report.ok(), "boundary chain should verify: {report:?}");
assert_eq!(report.rows_scanned, 2);
let after_raw = std::fs::read_to_string(&path).unwrap();
let rows: Vec<Event> = after_raw
.lines()
.map(|line| serde_json::from_str(line).unwrap())
.collect();
assert_eq!(rows.len(), 2);
assert_eq!(rows[0].schema_version, SCHEMA_VERSION);
assert_eq!(rows[0].payload_hash, before_v1.payload_hash);
assert_eq!(rows[0].prev_event_hash, before_v1.prev_event_hash);
assert_eq!(rows[0].event_hash, before_v1.event_hash);
assert_eq!(rows[1].schema_version, 2);
assert_eq!(
rows[1].prev_event_hash.as_deref(),
Some(rows[0].event_hash.as_str())
);
assert_eq!(
rows[1].payload["kind"],
SCHEMA_MIGRATION_V1_TO_V2_EVENT_KIND
);
}
#[test]
fn decode_failure_is_reported_and_chain_continues() {
let dir = tempdir().unwrap();
let path = dir.path().join("decode.jsonl");
let mut log = JsonlLog::open(&path).unwrap();
log.append(fixture_event(0), &allow_policy()).unwrap();
let mut f = std::fs::OpenOptions::new()
.append(true)
.open(&path)
.unwrap();
writeln!(f, "{{not valid json").unwrap();
f.sync_all().unwrap();
drop(f);
let reopened = JsonlLog::open(&path);
assert!(reopened.is_err());
let report = verify_chain(&path);
assert!(matches!(report, Err(JsonlError::Decode { .. })));
}
}