1use std::path::{Path, PathBuf};
36
37use cortex_core::{
38 attestor::{verify_rotation, VerifyError},
39 canonical::{canonical_signing_input, SCHEMA_VERSION_ATTESTATION},
40 EventId, EventType, SCHEMA_MIGRATION_V1_TO_V2_EVENT_KIND,
41};
42use ed25519_dalek::{Signature, Verifier, VerifyingKey};
43use serde::{Deserialize, Serialize};
44
45use crate::anchor_chain::{extract_rotation_payload, row_preimage, GENESIS_PREV_SIGNATURE};
46use crate::hash::{event_hash, payload_hash};
47use crate::jsonl::{JsonlError, JsonlLog};
48use crate::signed_row::b64_decode;
49
50#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
52#[serde(tag = "kind", rename_all = "snake_case")]
53pub enum FailureReason {
54 Decode {
57 message: String,
59 },
60 UnknownEventSchemaVersion {
64 observed: u16,
66 expected: u16,
68 },
69 PostCutoverV2AuditDispatchUnsupported {
74 observed: u16,
76 event_type: EventType,
78 },
79 Orphan {
82 observed_prev: Option<String>,
84 expected_prev: Option<String>,
86 },
87 HashBreak {
89 which: HashKind,
91 observed: String,
93 expected: String,
95 },
96 OrdinalGap {
100 trace_id: String,
102 observed: u64,
104 expected: u64,
106 },
107 MissingSignature,
112 BadSignature {
116 key_id: String,
118 message: String,
121 },
122 UnknownAttestationSchemaVersion {
127 observed: u16,
129 expected: u16,
131 },
132 RotationEnvelopeRejected {
138 message: String,
140 },
141}
142
143impl FailureReason {
144 #[must_use]
147 pub fn invariant(&self) -> Option<&'static str> {
148 match self {
149 Self::UnknownEventSchemaVersion { .. } => {
150 Some(UNSUPPORTED_EVENT_SCHEMA_VERSION_INVARIANT)
151 }
152 Self::PostCutoverV2AuditDispatchUnsupported { .. } => {
153 Some(POST_CUTOVER_V2_AUDIT_DISPATCH_UNSUPPORTED_INVARIANT)
154 }
155 _ => None,
156 }
157 }
158}
159
160#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
162#[serde(rename_all = "snake_case")]
163pub enum HashKind {
164 PayloadHashMismatch,
166 EventHashMismatch,
168}
169
170#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
172pub struct RowFailure {
173 pub line: usize,
175 pub event_id: Option<EventId>,
177 pub reason: FailureReason,
179}
180
181#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
185pub struct Report {
186 pub path: PathBuf,
188 pub rows_scanned: usize,
191 pub failures: Vec<RowFailure>,
193}
194
195impl Report {
196 #[must_use]
198 pub fn ok(&self) -> bool {
199 self.failures.is_empty()
200 }
201}
202
203pub const UNSUPPORTED_EVENT_SCHEMA_VERSION_INVARIANT: &str =
205 "audit.event_schema_version.unsupported";
206
207pub const POST_CUTOVER_V2_AUDIT_DISPATCH_UNSUPPORTED_INVARIANT: &str =
210 "audit.post_cutover_v2.dispatch.unsupported";
211
212pub const SCHEMA_MIGRATION_V1_TO_V2_BOUNDARY_MISSING_INVARIANT: &str =
214 "schema_migration.v1_to_v2.boundary.missing";
215
216pub const SCHEMA_MIGRATION_V1_TO_V2_BOUNDARY_DUPLICATE_INVARIANT: &str =
218 "schema_migration.v1_to_v2.boundary.duplicate";
219
220#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
222pub struct SchemaMigrationBoundaryRow {
223 pub line: usize,
225 pub event_id: EventId,
227 pub event_hash: String,
229}
230
231#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
233#[serde(tag = "kind", rename_all = "snake_case")]
234pub enum SchemaMigrationBoundaryFailureDetail {
235 Missing {
237 required: bool,
239 observed: usize,
241 },
242 Duplicate {
244 observed: usize,
246 lines: Vec<usize>,
248 },
249}
250
251#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
253pub struct SchemaMigrationBoundaryFailure {
254 pub invariant: String,
256 pub detail: SchemaMigrationBoundaryFailureDetail,
258}
259
260#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
262pub struct SchemaMigrationBoundaryReport {
263 pub path: PathBuf,
265 pub required: bool,
267 pub rows_scanned: usize,
269 pub boundary_rows: Vec<SchemaMigrationBoundaryRow>,
271 pub failures: Vec<SchemaMigrationBoundaryFailure>,
273}
274
275impl SchemaMigrationBoundaryReport {
276 #[must_use]
278 pub fn ok(&self) -> bool {
279 self.failures.is_empty()
280 }
281}
282
283const SUPPORTED_V1_EVENT_SCHEMA_VERSION: u16 = 1;
284const SUPPORTED_SCHEMA_MIGRATION_BOUNDARY_VERSION: u16 = 2;
285
286#[derive(Debug, Clone, Copy, PartialEq, Eq)]
287enum EventHashFraming {
288 V1,
289 SchemaMigrationV1ToV2Boundary,
290 V2ExistingEventWire,
291 Unknown,
292}
293
294fn is_schema_migration_v1_to_v2_boundary(e: &cortex_core::Event) -> bool {
295 e.schema_version == SUPPORTED_SCHEMA_MIGRATION_BOUNDARY_VERSION
296 && e.event_type == EventType::SystemNote
297 && e.payload.get("kind").and_then(serde_json::Value::as_str)
298 == Some(SCHEMA_MIGRATION_V1_TO_V2_EVENT_KIND)
299}
300
301fn event_hash_framing(e: &cortex_core::Event) -> EventHashFraming {
302 match e.schema_version {
303 SUPPORTED_V1_EVENT_SCHEMA_VERSION => EventHashFraming::V1,
304 SUPPORTED_SCHEMA_MIGRATION_BOUNDARY_VERSION if is_schema_migration_v1_to_v2_boundary(e) => {
305 EventHashFraming::SchemaMigrationV1ToV2Boundary
306 }
307 SUPPORTED_SCHEMA_MIGRATION_BOUNDARY_VERSION => EventHashFraming::V2ExistingEventWire,
308 _ => EventHashFraming::Unknown,
309 }
310}
311
312impl EventHashFraming {
313 fn supports_hash_chain(self) -> bool {
314 matches!(
315 self,
316 Self::V1 | Self::SchemaMigrationV1ToV2Boundary | Self::V2ExistingEventWire
317 )
318 }
319
320 fn unsupported_reason(self, e: &cortex_core::Event) -> Option<FailureReason> {
321 match self {
322 Self::V1 | Self::SchemaMigrationV1ToV2Boundary | Self::V2ExistingEventWire => None,
323 Self::Unknown => Some(FailureReason::UnknownEventSchemaVersion {
324 observed: e.schema_version,
325 expected: SUPPORTED_V1_EVENT_SCHEMA_VERSION,
326 }),
327 }
328 }
329}
330
331fn unsupported_event_schema_reason(e: &cortex_core::Event) -> FailureReason {
332 event_hash_framing(e)
333 .unsupported_reason(e)
334 .expect("unsupported_event_schema_reason called only for unsupported framing")
335}
336
337fn check_hash_chain_fields(
338 line: usize,
339 e: &cortex_core::Event,
340 prev_event_hash: &Option<String>,
341 failures: &mut Vec<RowFailure>,
342) -> bool {
343 let framing = event_hash_framing(e);
344 if !framing.supports_hash_chain() {
345 failures.push(RowFailure {
346 line,
347 event_id: Some(e.id),
348 reason: unsupported_event_schema_reason(e),
349 });
350 return false;
351 }
352
353 let expected_payload = payload_hash(&e.payload);
354 if e.payload_hash != expected_payload {
355 failures.push(RowFailure {
356 line,
357 event_id: Some(e.id),
358 reason: FailureReason::HashBreak {
359 which: HashKind::PayloadHashMismatch,
360 observed: e.payload_hash.clone(),
361 expected: expected_payload,
362 },
363 });
364 }
365
366 let expected_event = event_hash(e.prev_event_hash.as_deref(), &e.payload_hash);
367 if e.event_hash != expected_event {
368 failures.push(RowFailure {
369 line,
370 event_id: Some(e.id),
371 reason: FailureReason::HashBreak {
372 which: HashKind::EventHashMismatch,
373 observed: e.event_hash.clone(),
374 expected: expected_event,
375 },
376 });
377 }
378
379 if e.prev_event_hash != *prev_event_hash {
380 failures.push(RowFailure {
381 line,
382 event_id: Some(e.id),
383 reason: FailureReason::Orphan {
384 observed_prev: e.prev_event_hash.clone(),
385 expected_prev: prev_event_hash.clone(),
386 },
387 });
388 }
389
390 true
391}
392
393pub fn verify_chain(path: impl AsRef<Path>) -> Result<Report, JsonlError> {
399 let path = path.as_ref().to_path_buf();
400 let log = JsonlLog::open(&path)?;
401 let mut failures = Vec::new();
402 let mut rows_scanned = 0usize;
403 let mut prev_event_hash: Option<String> = None;
404
405 for (i, item) in log.iter()?.enumerate() {
406 let line = i + 1;
407 rows_scanned += 1;
408 let e = match item {
409 Ok(e) => e,
410 Err(JsonlError::Decode { source, .. }) => {
411 failures.push(RowFailure {
412 line,
413 event_id: None,
414 reason: FailureReason::Decode {
415 message: source.to_string(),
416 },
417 });
418 continue;
424 }
425 Err(other) => return Err(other),
426 };
427
428 if !check_hash_chain_fields(line, &e, &prev_event_hash, &mut failures) {
429 continue;
432 }
433 prev_event_hash = Some(e.event_hash.clone());
434 }
435
436 Ok(Report {
437 path,
438 rows_scanned,
439 failures,
440 })
441}
442
443pub fn verify_schema_migration_v1_to_v2_boundary(
454 path: impl AsRef<Path>,
455 required: bool,
456) -> Result<SchemaMigrationBoundaryReport, JsonlError> {
457 let path = path.as_ref().to_path_buf();
458 let log = JsonlLog::open(&path)?;
459 let mut rows_scanned = 0usize;
460 let mut boundary_rows = Vec::new();
461
462 for (i, item) in log.iter()?.enumerate() {
463 let line = i + 1;
464 rows_scanned += 1;
465 let e = item?;
466
467 if is_schema_migration_v1_to_v2_boundary(&e) {
468 boundary_rows.push(SchemaMigrationBoundaryRow {
469 line,
470 event_id: e.id,
471 event_hash: e.event_hash,
472 });
473 }
474 }
475
476 let mut failures = Vec::new();
477 if required && boundary_rows.is_empty() {
478 failures.push(SchemaMigrationBoundaryFailure {
479 invariant: SCHEMA_MIGRATION_V1_TO_V2_BOUNDARY_MISSING_INVARIANT.to_string(),
480 detail: SchemaMigrationBoundaryFailureDetail::Missing {
481 required,
482 observed: boundary_rows.len(),
483 },
484 });
485 }
486 if boundary_rows.len() > 1 {
487 failures.push(SchemaMigrationBoundaryFailure {
488 invariant: SCHEMA_MIGRATION_V1_TO_V2_BOUNDARY_DUPLICATE_INVARIANT.to_string(),
489 detail: SchemaMigrationBoundaryFailureDetail::Duplicate {
490 observed: boundary_rows.len(),
491 lines: boundary_rows.iter().map(|row| row.line).collect(),
492 },
493 });
494 }
495
496 Ok(SchemaMigrationBoundaryReport {
497 path,
498 required,
499 rows_scanned,
500 boundary_rows,
501 failures,
502 })
503}
504
505#[derive(Debug, Clone)]
514pub struct SignedChainOutcome {
515 pub report: Report,
517 pub active_pubkey: VerifyingKey,
523}
524
525pub fn verify_signed_chain(
554 path: impl AsRef<Path>,
555 initial_pubkey: &VerifyingKey,
556 initial_key_id: &str,
557) -> Result<SignedChainOutcome, JsonlError> {
558 let path = path.as_ref().to_path_buf();
559 let log = JsonlLog::open(&path)?;
560 let ledger_id = crate::jsonl::ledger_id_for(&path);
561 let mut failures = Vec::new();
562 let mut rows_scanned = 0usize;
563 let mut prev_event_hash: Option<String> = None;
564 let mut prev_sig_prefix: [u8; 32] = GENESIS_PREV_SIGNATURE;
565 let mut active_pubkey: VerifyingKey = *initial_pubkey;
566 let mut active_key_id: String = initial_key_id.to_string();
567
568 for (i, item) in log.iter_signed()?.enumerate() {
569 let line = i + 1;
570 rows_scanned += 1;
571 let row = match item {
572 Ok(r) => r,
573 Err(JsonlError::Decode { source, .. }) => {
574 failures.push(RowFailure {
575 line,
576 event_id: None,
577 reason: FailureReason::Decode {
578 message: source.to_string(),
579 },
580 });
581 continue;
582 }
583 Err(other) => return Err(other),
584 };
585
586 let e = &row.event;
587
588 if !check_hash_chain_fields(line, e, &prev_event_hash, &mut failures) {
591 continue;
594 }
595 prev_event_hash = Some(e.event_hash.clone());
596
597 let Some(sig_field) = row.signature.as_ref() else {
599 failures.push(RowFailure {
600 line,
601 event_id: Some(e.id),
602 reason: FailureReason::MissingSignature,
603 });
604 continue;
608 };
609
610 if sig_field.schema_version != SCHEMA_VERSION_ATTESTATION {
611 failures.push(RowFailure {
612 line,
613 event_id: Some(e.id),
614 reason: FailureReason::UnknownAttestationSchemaVersion {
615 observed: sig_field.schema_version,
616 expected: SCHEMA_VERSION_ATTESTATION,
617 },
618 });
619 continue;
621 }
622
623 let sig_bytes = match b64_decode(&sig_field.bytes) {
627 Some(v) if v.len() == 64 => v,
628 _ => {
629 failures.push(RowFailure {
630 line,
631 event_id: Some(e.id),
632 reason: FailureReason::BadSignature {
633 key_id: sig_field.key_id.clone(),
634 message: "malformed base64 or wrong length".into(),
635 },
636 });
637 continue;
638 }
639 };
640 let sig_arr: [u8; 64] = sig_bytes
641 .as_slice()
642 .try_into()
643 .expect("len-64 vec converts to [u8; 64]");
644 let signature = Signature::from_bytes(&sig_arr);
645
646 let preimage = row_preimage(
647 e,
648 &prev_sig_prefix,
649 &ledger_id,
650 &active_key_id,
651 sig_field.signed_at,
652 );
653 let signing_input = canonical_signing_input(&preimage);
654
655 if active_pubkey.verify(&signing_input, &signature).is_err() {
661 failures.push(RowFailure {
662 line,
663 event_id: Some(e.id),
664 reason: FailureReason::BadSignature {
665 key_id: sig_field.key_id.clone(),
666 message: "signature did not verify under active key".into(),
667 },
668 });
669 prev_sig_prefix.copy_from_slice(&sig_arr[..32]);
675 continue;
676 }
677
678 if let Some(payload) = extract_rotation_payload(e) {
682 if payload.envelope.old_pubkey != active_pubkey.to_bytes() {
687 failures.push(RowFailure {
688 line,
689 event_id: Some(e.id),
690 reason: FailureReason::RotationEnvelopeRejected {
691 message: "envelope.old_pubkey does not match currently-active operator key"
692 .into(),
693 },
694 });
695 } else {
696 match verify_rotation(&payload.envelope) {
697 Ok(()) => {
698 match VerifyingKey::from_bytes(&payload.envelope.new_pubkey) {
703 Ok(new_pk) => {
704 active_key_id = hex_lower(&payload.envelope.new_pubkey);
705 active_pubkey = new_pk;
706 }
707 Err(_) => {
708 failures.push(RowFailure {
709 line,
710 event_id: Some(e.id),
711 reason: FailureReason::RotationEnvelopeRejected {
712 message: "envelope.new_pubkey is not a valid Ed25519 \
713 point"
714 .into(),
715 },
716 });
717 }
718 }
719 }
720 Err(err) => {
721 failures.push(RowFailure {
722 line,
723 event_id: Some(e.id),
724 reason: FailureReason::RotationEnvelopeRejected {
725 message: rotation_error_message(&err),
726 },
727 });
728 }
729 }
730 }
731 }
732
733 prev_sig_prefix.copy_from_slice(&sig_arr[..32]);
735 }
736
737 Ok(SignedChainOutcome {
738 report: Report {
739 path,
740 rows_scanned,
741 failures,
742 },
743 active_pubkey,
744 })
745}
746
747fn rotation_error_message(e: &VerifyError) -> String {
750 match e {
751 VerifyError::UnknownSchemaVersion { found, expected } => {
752 format!("rotation envelope unknown schema_version (found {found}, expected {expected})")
753 }
754 VerifyError::KeyIdMismatch { preimage, expected } => {
755 format!("rotation envelope key_id mismatch (preimage={preimage}, expected={expected})")
756 }
757 VerifyError::BadSignature => "rotation envelope signature did not verify".into(),
758 VerifyError::MalformedSignature => "rotation envelope signature bytes are malformed".into(),
759 }
760}
761
762fn hex_lower(bytes: &[u8]) -> String {
765 let mut s = String::with_capacity(bytes.len() * 2);
766 for b in bytes {
767 s.push_str(&format!("{b:02x}"));
768 }
769 s
770}
771
772#[cfg(test)]
773mod tests {
774 use super::*;
775 use chrono::TimeZone;
776 use cortex_core::{
777 Event, EventId, EventSource, EventType, SchemaMigrationV1ToV2Payload,
778 SCHEMA_MIGRATION_V1_TO_V2_EVENT_KIND, SCHEMA_VERSION,
779 };
780 use std::io::Write;
781 use tempfile::tempdir;
782
783 fn fixture_event(seq: u64) -> Event {
784 Event {
785 id: EventId::new(),
786 schema_version: SCHEMA_VERSION,
787 observed_at: chrono::Utc.with_ymd_and_hms(2026, 1, 1, 12, 0, 0).unwrap(),
788 recorded_at: chrono::Utc.with_ymd_and_hms(2026, 1, 1, 12, 0, 1).unwrap(),
789 source: EventSource::User,
790 event_type: EventType::UserMessage,
791 trace_id: None,
792 session_id: None,
793 domain_tags: vec![],
794 payload: serde_json::json!({"seq": seq}),
795 payload_hash: String::new(),
796 prev_event_hash: None,
797 event_hash: String::new(),
798 }
799 }
800
801 fn allow_policy() -> cortex_core::PolicyDecision {
802 crate::jsonl::append_policy_decision_test_allow()
803 }
804
805 fn migration_allow_policy() -> cortex_core::PolicyDecision {
806 crate::jsonl::schema_migration_v1_to_v2_policy_decision_test_allow()
807 }
808
809 #[test]
810 fn clean_chain_reports_ok() {
811 let dir = tempdir().unwrap();
812 let path = dir.path().join("clean.jsonl");
813 let mut log = JsonlLog::open(&path).unwrap();
814 for i in 0..5u64 {
815 log.append(fixture_event(i), &allow_policy()).unwrap();
816 }
817 let report = verify_chain(&path).unwrap();
818 let failures = &report.failures;
819 assert!(report.ok(), "expected clean chain, got {failures:?}");
820 assert_eq!(report.rows_scanned, 5);
821 }
822
823 #[test]
834 fn corruption_fixture_produces_expected_failure_report() {
835 let dir = tempdir().unwrap();
836 let path = dir.path().join("corrupt.jsonl");
837
838 let mut log = JsonlLog::open(&path).unwrap();
841 for i in 0..4u64 {
842 log.append(fixture_event(i), &allow_policy()).unwrap();
843 }
844
845 let raw = std::fs::read_to_string(&path).unwrap();
848 let mut rows: Vec<Event> = raw
849 .lines()
850 .filter(|l| !l.trim().is_empty())
851 .map(|l| serde_json::from_str::<Event>(l).unwrap())
852 .collect();
853 assert_eq!(rows.len(), 4);
854
855 rows[1].payload = serde_json::json!({"tampered": true});
857
858 rows[3].prev_event_hash = Some("0".repeat(64));
863 crate::hash::seal(&mut rows[3]);
864
865 let mut f = std::fs::OpenOptions::new()
867 .write(true)
868 .truncate(true)
869 .create(true)
870 .open(&path)
871 .unwrap();
872 for r in &rows {
873 writeln!(f, "{}", serde_json::to_string(r).unwrap()).unwrap();
874 }
875 f.sync_all().unwrap();
876 drop(f);
877
878 let report = verify_chain(&path).unwrap();
879 assert_eq!(report.rows_scanned, 4);
880
881 let by_line: std::collections::BTreeMap<usize, Vec<&FailureReason>> = report
883 .failures
884 .iter()
885 .fold(std::collections::BTreeMap::new(), |mut m, f| {
886 m.entry(f.line).or_default().push(&f.reason);
887 m
888 });
889
890 assert!(!by_line.contains_key(&1), "row 1 should be clean");
892
893 let r2 = by_line.get(&2).expect("row 2 should have failures");
901 assert!(
902 r2.iter().any(|r| matches!(
903 r,
904 FailureReason::HashBreak {
905 which: HashKind::PayloadHashMismatch,
906 ..
907 }
908 )),
909 "row 2 missing PayloadHashMismatch: {r2:?}",
910 );
911 assert!(
912 !r2.iter().any(|r| matches!(
913 r,
914 FailureReason::HashBreak {
915 which: HashKind::EventHashMismatch,
916 ..
917 }
918 )),
919 "row 2 should not flag EventHashMismatch (event_hash recompute \
920 uses stored payload_hash, which is internally consistent): {r2:?}",
921 );
922
923 assert!(
926 !by_line.contains_key(&3),
927 "row 3 should be clean, got {by_line:?}"
928 );
929
930 let r4 = by_line.get(&4).expect("row 4 should have failures");
932 assert!(
933 r4.iter().any(|r| matches!(r, FailureReason::Orphan { .. })),
934 "row 4 missing Orphan: {r4:?}",
935 );
936 assert!(
938 !r4.iter()
939 .any(|r| matches!(r, FailureReason::HashBreak { .. })),
940 "row 4 should not have HashBreak failures (we re-sealed): {r4:?}",
941 );
942
943 assert!(!report.ok());
944 }
945
946 #[test]
949 fn mutated_event_hash_triggers_event_hash_mismatch() {
950 let dir = tempdir().unwrap();
951 let path = dir.path().join("ehm.jsonl");
952 let mut log = JsonlLog::open(&path).unwrap();
953 for i in 0..3u64 {
954 log.append(fixture_event(i), &allow_policy()).unwrap();
955 }
956
957 let raw = std::fs::read_to_string(&path).unwrap();
958 let mut rows: Vec<Event> = raw
959 .lines()
960 .filter(|l| !l.trim().is_empty())
961 .map(|l| serde_json::from_str::<Event>(l).unwrap())
962 .collect();
963 rows[1].event_hash = format!("{}{}", "f", &rows[1].event_hash[1..]);
965
966 let mut f = std::fs::OpenOptions::new()
967 .write(true)
968 .truncate(true)
969 .create(true)
970 .open(&path)
971 .unwrap();
972 for r in &rows {
973 writeln!(f, "{}", serde_json::to_string(r).unwrap()).unwrap();
974 }
975 f.sync_all().unwrap();
976 drop(f);
977
978 let report = verify_chain(&path).unwrap();
979 let failures = &report.failures;
980 assert!(
982 failures.iter().any(|r| r.line == 2
983 && matches!(
984 r.reason,
985 FailureReason::HashBreak {
986 which: HashKind::EventHashMismatch,
987 ..
988 }
989 )),
990 "expected EventHashMismatch on row 2: {failures:?}",
991 );
992 assert!(
995 failures
996 .iter()
997 .any(|r| r.line == 3 && matches!(r.reason, FailureReason::Orphan { .. })),
998 "expected Orphan on row 3: {failures:?}",
999 );
1000 }
1001
1002 #[test]
1003 fn unknown_event_schema_version_fails_closed_without_v1_hash_recompute() {
1004 let dir = tempdir().unwrap();
1005 let path = dir.path().join("unknown-schema.jsonl");
1006 let mut log = JsonlLog::open(&path).unwrap();
1007 log.append(fixture_event(0), &allow_policy()).unwrap();
1008
1009 let raw = std::fs::read_to_string(&path).unwrap();
1010 let mut row: Event = serde_json::from_str(raw.lines().next().unwrap()).unwrap();
1011 row.schema_version = SUPPORTED_SCHEMA_MIGRATION_BOUNDARY_VERSION + 1;
1012 row.payload = serde_json::json!({"tampered": true});
1013
1014 std::fs::write(&path, format!("{}\n", serde_json::to_string(&row).unwrap())).unwrap();
1015
1016 let report = verify_chain(&path).unwrap();
1017 let failures = &report.failures;
1018 let unsupported_schema_failure = failures.iter().find(|failure| {
1019 matches!(
1020 failure.reason,
1021 FailureReason::UnknownEventSchemaVersion {
1022 observed,
1023 expected
1024 } if observed == 3 && expected == SUPPORTED_V1_EVENT_SCHEMA_VERSION
1025 )
1026 });
1027 assert!(
1028 unsupported_schema_failure.is_some(),
1029 "expected unknown event schema failure: {failures:?}"
1030 );
1031 assert_eq!(
1032 unsupported_schema_failure.unwrap().reason.invariant(),
1033 Some(UNSUPPORTED_EVENT_SCHEMA_VERSION_INVARIANT)
1034 );
1035 assert!(
1036 !failures
1037 .iter()
1038 .any(|failure| matches!(failure.reason, FailureReason::HashBreak { .. })),
1039 "unknown event schemas must not be verified under v1 hash framing: {failures:?}"
1040 );
1041 }
1042
1043 fn schema_migration_payload(previous_head: impl Into<String>) -> SchemaMigrationV1ToV2Payload {
1044 SchemaMigrationV1ToV2Payload::new(previous_head, "script-digest", None, "fixture-digest")
1045 }
1046
1047 #[test]
1048 fn schema_migration_boundary_exactly_one_passes_when_required() {
1049 let dir = tempdir().unwrap();
1050 let path = dir.path().join("boundary-required.jsonl");
1051 let mut log = JsonlLog::open(&path).unwrap();
1052 log.append(fixture_event(0), &allow_policy()).unwrap();
1053
1054 let v1_head = log.head().expect("v1 head").to_string();
1055 log.append_schema_migration_v1_to_v2(
1056 schema_migration_payload(v1_head),
1057 &migration_allow_policy(),
1058 )
1059 .expect("append boundary event");
1060
1061 let report = verify_schema_migration_v1_to_v2_boundary(&path, true).unwrap();
1062 assert!(report.ok(), "exactly-one boundary should pass: {report:?}");
1063 assert_eq!(report.rows_scanned, 2);
1064 assert_eq!(report.boundary_rows.len(), 1);
1065 assert_eq!(report.boundary_rows[0].line, 2);
1066 assert!(report.failures.is_empty());
1067 }
1068
1069 #[test]
1070 fn schema_migration_boundary_missing_fails_when_required() {
1071 let dir = tempdir().unwrap();
1072 let path = dir.path().join("boundary-missing.jsonl");
1073 let mut log = JsonlLog::open(&path).unwrap();
1074 log.append(fixture_event(0), &allow_policy()).unwrap();
1075
1076 let report = verify_schema_migration_v1_to_v2_boundary(&path, true).unwrap();
1077 assert!(!report.ok(), "missing boundary should fail");
1078 assert_eq!(report.rows_scanned, 1);
1079 assert!(report.boundary_rows.is_empty());
1080 assert_eq!(report.failures.len(), 1);
1081 assert_eq!(
1082 report.failures[0].invariant,
1083 SCHEMA_MIGRATION_V1_TO_V2_BOUNDARY_MISSING_INVARIANT
1084 );
1085 assert!(matches!(
1086 report.failures[0].detail,
1087 SchemaMigrationBoundaryFailureDetail::Missing {
1088 required: true,
1089 observed: 0
1090 }
1091 ));
1092 }
1093
1094 #[test]
1095 fn schema_migration_boundary_duplicate_fails_with_lines() {
1096 let dir = tempdir().unwrap();
1097 let path = dir.path().join("boundary-duplicate.jsonl");
1098 let mut log = JsonlLog::open(&path).unwrap();
1099 log.append(fixture_event(0), &allow_policy()).unwrap();
1100
1101 let v1_head = log.head().expect("v1 head").to_string();
1102 let first_boundary_hash = log
1103 .append_schema_migration_v1_to_v2(
1104 schema_migration_payload(v1_head),
1105 &migration_allow_policy(),
1106 )
1107 .expect("append first boundary event");
1108 log.append_schema_migration_v1_to_v2(
1109 schema_migration_payload(first_boundary_hash),
1110 &migration_allow_policy(),
1111 )
1112 .expect("append duplicate boundary event");
1113
1114 let chain_report = verify_chain(&path).unwrap();
1115 assert!(
1116 chain_report.ok(),
1117 "duplicate fixture should isolate boundary count, not chain damage: {chain_report:?}"
1118 );
1119
1120 let report = verify_schema_migration_v1_to_v2_boundary(&path, true).unwrap();
1121 assert!(!report.ok(), "duplicate boundary should fail");
1122 assert_eq!(report.rows_scanned, 3);
1123 assert_eq!(report.boundary_rows.len(), 2);
1124 assert_eq!(report.failures.len(), 1);
1125 assert_eq!(
1126 report.failures[0].invariant,
1127 SCHEMA_MIGRATION_V1_TO_V2_BOUNDARY_DUPLICATE_INVARIANT
1128 );
1129 assert!(matches!(
1130 &report.failures[0].detail,
1131 SchemaMigrationBoundaryFailureDetail::Duplicate {
1132 observed: 2,
1133 lines
1134 } if lines == &vec![2, 3]
1135 ));
1136 }
1137
1138 #[test]
1139 fn schema_migration_verify_chain_crosses_v1_to_v2_boundary_without_rewriting_v1_row() {
1140 let dir = tempdir().unwrap();
1141 let path = dir.path().join("boundary.jsonl");
1142 let mut log = JsonlLog::open(&path).unwrap();
1143 log.append(fixture_event(0), &allow_policy()).unwrap();
1144
1145 let before_raw = std::fs::read_to_string(&path).unwrap();
1146 let before_v1: Event = serde_json::from_str(before_raw.lines().next().unwrap()).unwrap();
1147 let payload = schema_migration_payload(before_v1.event_hash.clone());
1148
1149 log.append_schema_migration_v1_to_v2(payload, &migration_allow_policy())
1150 .expect("append boundary event");
1151
1152 let report = verify_chain(&path).unwrap();
1153 assert!(report.ok(), "boundary chain should verify: {report:?}");
1154 assert_eq!(report.rows_scanned, 2);
1155
1156 let after_raw = std::fs::read_to_string(&path).unwrap();
1157 let rows: Vec<Event> = after_raw
1158 .lines()
1159 .map(|line| serde_json::from_str(line).unwrap())
1160 .collect();
1161 assert_eq!(rows.len(), 2);
1162 assert_eq!(rows[0].schema_version, SCHEMA_VERSION);
1163 assert_eq!(rows[0].payload_hash, before_v1.payload_hash);
1164 assert_eq!(rows[0].prev_event_hash, before_v1.prev_event_hash);
1165 assert_eq!(rows[0].event_hash, before_v1.event_hash);
1166 assert_eq!(rows[1].schema_version, 2);
1167 assert_eq!(
1168 rows[1].prev_event_hash.as_deref(),
1169 Some(rows[0].event_hash.as_str())
1170 );
1171 assert_eq!(
1172 rows[1].payload["kind"],
1173 SCHEMA_MIGRATION_V1_TO_V2_EVENT_KIND
1174 );
1175 }
1176
1177 #[test]
1178 fn decode_failure_is_reported_and_chain_continues() {
1179 let dir = tempdir().unwrap();
1180 let path = dir.path().join("decode.jsonl");
1181 let mut log = JsonlLog::open(&path).unwrap();
1182 log.append(fixture_event(0), &allow_policy()).unwrap();
1183 let mut f = std::fs::OpenOptions::new()
1185 .append(true)
1186 .open(&path)
1187 .unwrap();
1188 writeln!(f, "{{not valid json").unwrap();
1189 f.sync_all().unwrap();
1190 drop(f);
1191 let reopened = JsonlLog::open(&path);
1198 assert!(reopened.is_err());
1201
1202 let report = verify_chain(&path);
1205 assert!(matches!(report, Err(JsonlError::Decode { .. })));
1211 }
1212}