1use std::fs::{File, OpenOptions};
38use std::io::{BufRead, BufReader, Read, Seek, SeekFrom, Write};
39use std::path::{Path, PathBuf};
40
41use chrono::Utc;
42use cortex_core::{
43 attestor::Attestor, canonical::canonical_signing_input, schema_migration_v1_to_v2_event, Event,
44 EventSource, PolicyDecision, PolicyOutcome, SchemaMigrationV1ToV2Payload,
45};
46use thiserror::Error;
47
48use crate::anchor_chain::{row_preimage, GENESIS_PREV_SIGNATURE};
49use crate::hash::seal;
50use crate::signed_row::{b64_decode, b64_encode, RowSignature, SignedRow};
51
52pub const APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID: &str = "ledger.append.event_source_tier_gate";
58pub const APPEND_ATTESTATION_REQUIRED_RULE_ID: &str = "ledger.append.attestation_required";
64pub const APPEND_RUNTIME_MODE_RULE_ID: &str = "ledger.append.runtime_mode";
70pub const APPEND_SIGNED_KEY_STATE_CURRENT_USE_RULE_ID: &str =
74 "ledger.append_signed.key_state_current_use";
75pub const APPEND_SIGNED_TRUST_TIER_MINIMUM_RULE_ID: &str =
79 "ledger.append_signed.trust_tier_minimum";
80pub const SCHEMA_MIGRATION_AUTHORITY_CLASS_RULE_ID: &str =
86 "ledger.schema_migration.authority_class";
87pub const SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID: &str =
93 "ledger.schema_migration.attestation_required";
94pub const SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID: &str =
100 "ledger.schema_migration.current_use_temporal_authority";
101
102#[derive(Debug, Error)]
104pub enum JsonlError {
105 #[error("io error on {path:?}: {source}")]
107 Io {
108 path: PathBuf,
110 #[source]
112 source: std::io::Error,
113 },
114 #[error("json decode error at line {line} in {path:?}: {source}")]
116 Decode {
117 path: PathBuf,
119 line: usize,
121 #[source]
123 source: serde_json::Error,
124 },
125 #[error("json encode error: {0}")]
127 Encode(#[source] serde_json::Error),
128 #[error("chain verification failed: {0}")]
133 ChainBroken(String),
134 #[error("validation failed: {0}")]
136 Validation(String),
137}
138
139#[derive(Debug)]
149pub struct JsonlLog {
150 path: PathBuf,
151 head: Option<String>,
153 len: u64,
155 last_sig_prefix: [u8; 32],
164}
165
166pub(crate) fn ledger_id_for(path: &Path) -> String {
173 path.file_stem()
174 .and_then(|s| s.to_str())
175 .unwrap_or("cortex-jsonl")
176 .to_string()
177}
178
179impl JsonlLog {
180 pub fn open(path: impl AsRef<Path>) -> Result<Self, JsonlError> {
186 let path = path.as_ref().to_path_buf();
187 OpenOptions::new()
189 .create(true)
190 .append(true)
191 .open(&path)
192 .map_err(|source| JsonlError::Io {
193 path: path.clone(),
194 source,
195 })?;
196
197 let f = File::open(&path).map_err(|source| JsonlError::Io {
199 path: path.clone(),
200 source,
201 })?;
202 let reader = BufReader::new(f);
203 let mut head: Option<String> = None;
204 let mut len: u64 = 0;
205 let mut last_sig_prefix: [u8; 32] = GENESIS_PREV_SIGNATURE;
206 for (i, line) in reader.lines().enumerate() {
207 let line = line.map_err(|source| JsonlError::Io {
208 path: path.clone(),
209 source,
210 })?;
211 if line.trim().is_empty() {
212 continue;
213 }
214 let row: SignedRow =
215 serde_json::from_str(&line).map_err(|source| JsonlError::Decode {
216 path: path.clone(),
217 line: i + 1,
218 source,
219 })?;
220 head = Some(row.event.event_hash.clone());
221 len += 1;
222 if let Some(sig) = &row.signature {
223 if let Some(bytes) = b64_decode(&sig.bytes) {
224 if bytes.len() >= 32 {
225 last_sig_prefix.copy_from_slice(&bytes[..32]);
226 } else {
227 }
231 }
232 } else {
233 }
238 }
239
240 Ok(Self {
241 path,
242 head,
243 len,
244 last_sig_prefix,
245 })
246 }
247
248 #[must_use]
250 pub fn path(&self) -> &Path {
251 &self.path
252 }
253
254 #[must_use]
256 pub fn len(&self) -> u64 {
257 self.len
258 }
259
260 #[must_use]
262 pub fn is_empty(&self) -> bool {
263 self.len == 0
264 }
265
266 #[must_use]
270 pub fn head(&self) -> Option<&str> {
271 self.head.as_deref()
272 }
273
274 pub fn append(
303 &mut self,
304 mut event: Event,
305 policy: &PolicyDecision,
306 ) -> Result<String, JsonlError> {
307 require_append_contributors(policy)?;
308 require_event_source_attestation(policy, &event.source)?;
309 require_append_final_outcome(policy, "ledger.append")?;
310
311 event.prev_event_hash = self.head.clone();
312 seal(&mut event);
313 let row = SignedRow::unsigned(event);
314 let line = serde_json::to_string(&row).map_err(JsonlError::Encode)?;
315 self.write_line(&line)?;
316 self.head = Some(row.event.event_hash.clone());
317 self.len += 1;
318 Ok(row.event.event_hash)
320 }
321
322 pub fn append_signed(
351 &mut self,
352 mut event: Event,
353 attestor: &dyn Attestor,
354 policy: &PolicyDecision,
355 ) -> Result<String, JsonlError> {
356 require_append_signed_contributors(policy)?;
357 require_append_signed_key_state_not_break_glassed(policy)?;
358 require_append_final_outcome(policy, "ledger.append_signed")?;
359
360 event.prev_event_hash = self.head.clone();
363 seal(&mut event);
364
365 let signed_at = Utc::now();
366 let key_id = attestor.key_id().to_string();
367 let ledger_id = ledger_id_for(&self.path);
368 let preimage = row_preimage(
369 &event,
370 &self.last_sig_prefix,
371 &ledger_id,
372 &key_id,
373 signed_at,
374 );
375 let signing_input = canonical_signing_input(&preimage);
376 let sig = attestor.sign(&signing_input);
377 let sig_bytes = sig.to_bytes();
378
379 let row = SignedRow {
380 event,
381 signature: Some(RowSignature {
382 schema_version: cortex_core::canonical::SCHEMA_VERSION_ATTESTATION,
383 key_id,
384 signed_at,
385 bytes: b64_encode(&sig_bytes),
386 }),
387 };
388 let line = serde_json::to_string(&row).map_err(JsonlError::Encode)?;
389 self.write_line(&line)?;
390
391 self.head = Some(row.event.event_hash.clone());
392 self.len += 1;
393 self.last_sig_prefix.copy_from_slice(&sig_bytes[..32]);
395 Ok(row.event.event_hash)
396 }
397
398 pub fn append_schema_migration_v1_to_v2(
411 &mut self,
412 payload: SchemaMigrationV1ToV2Payload,
413 policy: &PolicyDecision,
414 ) -> Result<String, JsonlError> {
415 let (head, _event) = self.append_schema_migration_v1_to_v2_with_event(payload, policy)?;
416 Ok(head)
417 }
418
419 pub fn append_schema_migration_v1_to_v2_with_event(
451 &mut self,
452 payload: SchemaMigrationV1ToV2Payload,
453 policy: &PolicyDecision,
454 ) -> Result<(String, Event), JsonlError> {
455 require_schema_migration_contributors(policy)?;
456 require_schema_migration_attestation_not_break_glassed(policy)?;
457 require_schema_migration_current_use_not_break_glassed(policy)?;
458 require_append_final_outcome(policy, "ledger.schema_migration")?;
459
460 let expected_head = payload.previous_v1_head_hash.clone();
461 match self.head.as_deref() {
462 Some(head) if head == expected_head => {}
463 observed => {
464 return Err(JsonlError::Validation(format!(
465 "schema migration boundary previous_v1_head_hash mismatch: observed {observed:?}, expected {expected_head}"
466 )));
467 }
468 }
469
470 let now = Utc::now();
471 let event = schema_migration_v1_to_v2_event(payload, now, now, None)
472 .map_err(|err| JsonlError::Validation(err.to_string()))?;
473 self.append_unchecked_returning_event(event)
478 }
479
480 fn append_unchecked_returning_event(
491 &mut self,
492 mut event: Event,
493 ) -> Result<(String, Event), JsonlError> {
494 event.prev_event_hash = self.head.clone();
495 seal(&mut event);
496 let row = SignedRow::unsigned(event.clone());
497 let line = serde_json::to_string(&row).map_err(JsonlError::Encode)?;
498 self.write_line(&line)?;
499 self.head = Some(event.event_hash.clone());
500 self.len += 1;
501 Ok((event.event_hash.clone(), event))
502 }
503
504 fn write_line(&self, line: &str) -> Result<(), JsonlError> {
506 let mut f = OpenOptions::new()
507 .create(true)
508 .append(true)
509 .open(&self.path)
510 .map_err(|source| JsonlError::Io {
511 path: self.path.clone(),
512 source,
513 })?;
514 f.write_all(line.as_bytes())
515 .map_err(|source| JsonlError::Io {
516 path: self.path.clone(),
517 source,
518 })?;
519 f.write_all(b"\n").map_err(|source| JsonlError::Io {
520 path: self.path.clone(),
521 source,
522 })?;
523 f.sync_all().map_err(|source| JsonlError::Io {
525 path: self.path.clone(),
526 source,
527 })?;
528 Ok(())
529 }
530
531 #[must_use]
536 pub fn last_sig_prefix(&self) -> [u8; 32] {
537 self.last_sig_prefix
538 }
539
540 pub fn iter(&self) -> Result<JsonlIter, JsonlError> {
547 let mut f = File::open(&self.path).map_err(|source| JsonlError::Io {
548 path: self.path.clone(),
549 source,
550 })?;
551 f.seek(SeekFrom::Start(0))
552 .map_err(|source| JsonlError::Io {
553 path: self.path.clone(),
554 source,
555 })?;
556 Ok(JsonlIter {
557 path: self.path.clone(),
558 reader: BufReader::new(Box::new(f) as Box<dyn Read>),
559 line: 0,
560 })
561 }
562
563 pub fn iter_signed(&self) -> Result<SignedJsonlIter, JsonlError> {
570 let f = File::open(&self.path).map_err(|source| JsonlError::Io {
571 path: self.path.clone(),
572 source,
573 })?;
574 Ok(SignedJsonlIter {
575 path: self.path.clone(),
576 reader: BufReader::new(Box::new(f) as Box<dyn Read>),
577 line: 0,
578 })
579 }
580
581 pub fn verify_chain(&self) -> Result<(), JsonlError> {
590 let mut prev: Option<String> = None;
591 for (i, item) in self.iter()?.enumerate() {
592 let e = item?;
593 let expected_payload = crate::hash::payload_hash(&e.payload);
595 if e.payload_hash != expected_payload {
596 return Err(JsonlError::ChainBroken(format!(
597 "row {} payload_hash mismatch",
598 i + 1
599 )));
600 }
601 let expected_event =
602 crate::hash::event_hash(e.prev_event_hash.as_deref(), &e.payload_hash);
603 if e.event_hash != expected_event {
604 return Err(JsonlError::ChainBroken(format!(
605 "row {} event_hash mismatch",
606 i + 1
607 )));
608 }
609 if e.prev_event_hash != prev {
610 return Err(JsonlError::ChainBroken(format!(
611 "row {} prev_event_hash does not point at previous row",
612 i + 1
613 )));
614 }
615 prev = Some(e.event_hash.clone());
616 }
617 Ok(())
618 }
619}
620
621fn require_append_final_outcome(policy: &PolicyDecision, surface: &str) -> Result<(), JsonlError> {
622 match policy.final_outcome {
623 PolicyOutcome::Allow | PolicyOutcome::Warn | PolicyOutcome::BreakGlass => Ok(()),
624 PolicyOutcome::Quarantine | PolicyOutcome::Reject => Err(JsonlError::Validation(format!(
625 "{surface} preflight: composed policy outcome {:?} blocks ledger append",
626 policy.final_outcome,
627 ))),
628 }
629}
630
631fn require_contributor(policy: &PolicyDecision, rule_id: &str) -> Result<(), JsonlError> {
632 let contains_rule = policy
633 .contributing
634 .iter()
635 .chain(policy.discarded.iter())
636 .any(|contribution| contribution.rule_id.as_str() == rule_id);
637 if contains_rule {
638 Ok(())
639 } else {
640 Err(JsonlError::Validation(format!(
641 "policy decision missing required contributor `{rule_id}`; caller skipped ADR 0026 composition",
642 )))
643 }
644}
645
646fn require_append_contributors(policy: &PolicyDecision) -> Result<(), JsonlError> {
647 require_contributor(policy, APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID)?;
648 require_contributor(policy, APPEND_ATTESTATION_REQUIRED_RULE_ID)?;
649 require_contributor(policy, APPEND_RUNTIME_MODE_RULE_ID)?;
650 Ok(())
651}
652
653fn require_append_signed_contributors(policy: &PolicyDecision) -> Result<(), JsonlError> {
654 require_contributor(policy, APPEND_SIGNED_KEY_STATE_CURRENT_USE_RULE_ID)?;
655 require_contributor(policy, APPEND_SIGNED_TRUST_TIER_MINIMUM_RULE_ID)?;
656 Ok(())
657}
658
659fn require_event_source_attestation(
660 policy: &PolicyDecision,
661 source: &EventSource,
662) -> Result<(), JsonlError> {
663 if !matches!(source, EventSource::User) {
668 return Ok(());
669 }
670 let attestation = policy
671 .contributing
672 .iter()
673 .chain(policy.discarded.iter())
674 .find(|contribution| {
675 contribution.rule_id.as_str() == APPEND_ATTESTATION_REQUIRED_RULE_ID
676 })
677 .ok_or_else(|| {
678 JsonlError::Validation(format!(
679 "ledger.append preflight: required attestation contributor `{APPEND_ATTESTATION_REQUIRED_RULE_ID}` is absent from the policy decision for EventSource::User"
680 ))
681 })?;
682 if attestation.outcome == PolicyOutcome::Allow {
683 Ok(())
684 } else {
685 Err(JsonlError::Validation(format!(
686 "ledger.append preflight: attestation contributor `{APPEND_ATTESTATION_REQUIRED_RULE_ID}` returned {:?} for EventSource::User; ADR 0026 §4 forbids BreakGlass substituting for attestation",
687 attestation.outcome,
688 )))
689 }
690}
691
692fn require_schema_migration_contributors(policy: &PolicyDecision) -> Result<(), JsonlError> {
693 require_contributor(policy, SCHEMA_MIGRATION_AUTHORITY_CLASS_RULE_ID)?;
694 require_contributor(policy, SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID)?;
695 require_contributor(
696 policy,
697 SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID,
698 )?;
699 Ok(())
700}
701
702fn require_schema_migration_attestation_not_break_glassed(
703 policy: &PolicyDecision,
704) -> Result<(), JsonlError> {
705 let attestation = policy
711 .contributing
712 .iter()
713 .chain(policy.discarded.iter())
714 .find(|contribution| {
715 contribution.rule_id.as_str() == SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID
716 })
717 .ok_or_else(|| {
718 JsonlError::Validation(format!(
719 "ledger.schema_migration preflight: required attestation contributor `{SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID}` is absent from the policy decision"
720 ))
721 })?;
722 if attestation.outcome == PolicyOutcome::Allow {
723 Ok(())
724 } else {
725 Err(JsonlError::Validation(format!(
726 "ledger.schema_migration preflight: attestation contributor `{SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID}` returned {:?}; ADR 0026 §4 forbids BreakGlass substituting for operator attestation at the migration authority root",
727 attestation.outcome,
728 )))
729 }
730}
731
732fn require_schema_migration_current_use_not_break_glassed(
733 policy: &PolicyDecision,
734) -> Result<(), JsonlError> {
735 let current_use = policy
740 .contributing
741 .iter()
742 .chain(policy.discarded.iter())
743 .find(|contribution| {
744 contribution.rule_id.as_str()
745 == SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID
746 })
747 .ok_or_else(|| {
748 JsonlError::Validation(format!(
749 "ledger.schema_migration preflight: required current-use contributor `{SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID}` is absent from the policy decision"
750 ))
751 })?;
752 if current_use.outcome == PolicyOutcome::Allow {
753 Ok(())
754 } else {
755 Err(JsonlError::Validation(format!(
756 "ledger.schema_migration preflight: current-use contributor `{SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID}` returned {:?}; ADR 0023 forbids historical-only or revoked signing keys at the migration authority root",
757 current_use.outcome,
758 )))
759 }
760}
761
762fn require_append_signed_key_state_not_break_glassed(
763 policy: &PolicyDecision,
764) -> Result<(), JsonlError> {
765 let key_state = policy
770 .contributing
771 .iter()
772 .chain(policy.discarded.iter())
773 .find(|contribution| {
774 contribution.rule_id.as_str() == APPEND_SIGNED_KEY_STATE_CURRENT_USE_RULE_ID
775 })
776 .ok_or_else(|| {
777 JsonlError::Validation(format!(
778 "ledger.append_signed preflight: required current-use contributor `{APPEND_SIGNED_KEY_STATE_CURRENT_USE_RULE_ID}` is absent from the policy decision"
779 ))
780 })?;
781 if key_state.outcome == PolicyOutcome::Allow {
782 Ok(())
783 } else {
784 Err(JsonlError::Validation(format!(
785 "ledger.append_signed preflight: current-use contributor `{APPEND_SIGNED_KEY_STATE_CURRENT_USE_RULE_ID}` returned {:?}; ADR 0023 forbids historical-only or revoked signing keys at the trusted ledger root",
786 key_state.outcome,
787 )))
788 }
789}
790
791#[must_use]
803pub fn append_policy_decision_test_allow() -> PolicyDecision {
804 use cortex_core::{compose_policy_outcomes, PolicyContribution};
805 compose_policy_outcomes(
806 vec![
807 PolicyContribution::new(
808 APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID,
809 PolicyOutcome::Allow,
810 "test fixture: event source tier gate satisfied",
811 )
812 .expect("static test contribution is valid"),
813 PolicyContribution::new(
814 APPEND_ATTESTATION_REQUIRED_RULE_ID,
815 PolicyOutcome::Allow,
816 "test fixture: attestation requirement satisfied",
817 )
818 .expect("static test contribution is valid"),
819 PolicyContribution::new(
820 APPEND_RUNTIME_MODE_RULE_ID,
821 PolicyOutcome::Allow,
822 "test fixture: runtime mode permits unsigned append",
823 )
824 .expect("static test contribution is valid"),
825 ],
826 None,
827 )
828}
829
830#[must_use]
844pub fn schema_migration_v1_to_v2_policy_decision_test_allow() -> PolicyDecision {
845 use cortex_core::{compose_policy_outcomes, PolicyContribution};
846 compose_policy_outcomes(
847 vec![
848 PolicyContribution::new(
849 SCHEMA_MIGRATION_AUTHORITY_CLASS_RULE_ID,
850 PolicyOutcome::Allow,
851 "test fixture: operator authority class satisfied",
852 )
853 .expect("static test contribution is valid"),
854 PolicyContribution::new(
855 SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID,
856 PolicyOutcome::Allow,
857 "test fixture: operator attestation present",
858 )
859 .expect("static test contribution is valid"),
860 PolicyContribution::new(
861 SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID,
862 PolicyOutcome::Allow,
863 "test fixture: signing key state is current-use",
864 )
865 .expect("static test contribution is valid"),
866 ],
867 None,
868 )
869}
870
871#[must_use]
876pub fn append_signed_policy_decision_test_allow() -> PolicyDecision {
877 use cortex_core::{compose_policy_outcomes, PolicyContribution};
878 compose_policy_outcomes(
879 vec![
880 PolicyContribution::new(
881 APPEND_SIGNED_KEY_STATE_CURRENT_USE_RULE_ID,
882 PolicyOutcome::Allow,
883 "test fixture: signing key state is current-use",
884 )
885 .expect("static test contribution is valid"),
886 PolicyContribution::new(
887 APPEND_SIGNED_TRUST_TIER_MINIMUM_RULE_ID,
888 PolicyOutcome::Allow,
889 "test fixture: signing principal trust tier satisfies minimum",
890 )
891 .expect("static test contribution is valid"),
892 ],
893 None,
894 )
895}
896
897pub struct JsonlIter {
899 path: PathBuf,
900 reader: BufReader<Box<dyn Read>>,
901 line: usize,
902}
903
904impl std::fmt::Debug for JsonlIter {
905 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
906 f.debug_struct("JsonlIter")
907 .field("path", &self.path)
908 .field("line", &self.line)
909 .finish()
910 }
911}
912
913impl Iterator for JsonlIter {
914 type Item = Result<Event, JsonlError>;
915
916 fn next(&mut self) -> Option<Self::Item> {
917 loop {
918 let mut buf = String::new();
919 let n = match self.reader.read_line(&mut buf) {
920 Ok(n) => n,
921 Err(source) => {
922 return Some(Err(JsonlError::Io {
923 path: self.path.clone(),
924 source,
925 }));
926 }
927 };
928 if n == 0 {
929 return None; }
931 self.line += 1;
932 let trimmed = buf.trim();
933 if trimmed.is_empty() {
934 continue;
935 }
936 return Some(
937 serde_json::from_str::<SignedRow>(trimmed)
938 .map(|row| row.event)
939 .map_err(|source| JsonlError::Decode {
940 path: self.path.clone(),
941 line: self.line,
942 source,
943 }),
944 );
945 }
946 }
947}
948
949pub struct SignedJsonlIter {
953 path: PathBuf,
954 reader: BufReader<Box<dyn Read>>,
955 line: usize,
956}
957
958impl std::fmt::Debug for SignedJsonlIter {
959 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
960 f.debug_struct("SignedJsonlIter")
961 .field("path", &self.path)
962 .field("line", &self.line)
963 .finish()
964 }
965}
966
967impl Iterator for SignedJsonlIter {
968 type Item = Result<SignedRow, JsonlError>;
969
970 fn next(&mut self) -> Option<Self::Item> {
971 loop {
972 let mut buf = String::new();
973 let n = match self.reader.read_line(&mut buf) {
974 Ok(n) => n,
975 Err(source) => {
976 return Some(Err(JsonlError::Io {
977 path: self.path.clone(),
978 source,
979 }));
980 }
981 };
982 if n == 0 {
983 return None;
984 }
985 self.line += 1;
986 let trimmed = buf.trim();
987 if trimmed.is_empty() {
988 continue;
989 }
990 return Some(
991 serde_json::from_str::<SignedRow>(trimmed).map_err(|source| JsonlError::Decode {
992 path: self.path.clone(),
993 line: self.line,
994 source,
995 }),
996 );
997 }
998 }
999}
1000
1001#[cfg(test)]
1002mod tests {
1003 use super::*;
1004 use chrono::TimeZone;
1005 use cortex_core::{
1006 compose_policy_outcomes, Event, EventId, EventSource, EventType, PolicyContribution,
1007 SchemaMigrationV1ToV2Payload, SCHEMA_MIGRATION_V1_TO_V2_EVENT_KIND,
1008 SCHEMA_MIGRATION_V1_TO_V2_TARGET, SCHEMA_VERSION,
1009 };
1010 use tempfile::tempdir;
1011
1012 fn fixture_event(seq: u64) -> Event {
1013 Event {
1014 id: EventId::new(),
1015 schema_version: SCHEMA_VERSION,
1016 observed_at: chrono::Utc.with_ymd_and_hms(2026, 1, 1, 12, 0, 0).unwrap(),
1017 recorded_at: chrono::Utc.with_ymd_and_hms(2026, 1, 1, 12, 0, 1).unwrap(),
1018 source: EventSource::User,
1019 event_type: EventType::UserMessage,
1020 trace_id: None,
1021 session_id: None,
1022 domain_tags: vec![],
1023 payload: serde_json::json!({"seq": seq, "text": format!("event {seq}")}),
1024 payload_hash: String::new(),
1025 prev_event_hash: None,
1026 event_hash: String::new(),
1027 }
1028 }
1029
1030 fn allow_policy() -> PolicyDecision {
1031 append_policy_decision_test_allow()
1032 }
1033
1034 fn migration_allow_policy() -> PolicyDecision {
1035 schema_migration_v1_to_v2_policy_decision_test_allow()
1036 }
1037
1038 #[test]
1040 fn append_n_reopen_and_verify() {
1041 let dir = tempdir().unwrap();
1042 let path = dir.path().join("events.jsonl");
1043
1044 let mut log = JsonlLog::open(&path).unwrap();
1046 let mut heads = Vec::new();
1047 let policy = allow_policy();
1048 for i in 0..25u64 {
1049 let head = log.append(fixture_event(i), &policy).unwrap();
1050 heads.push(head);
1051 }
1052 assert_eq!(log.len(), 25);
1053 assert_eq!(log.head(), Some(heads.last().unwrap().as_str()));
1054
1055 let log2 = JsonlLog::open(&path).unwrap();
1057 assert_eq!(log2.len(), 25);
1058 assert_eq!(log2.head(), Some(heads.last().unwrap().as_str()));
1059 log2.verify_chain().expect("chain verifies after reopen");
1060
1061 let mut prev: Option<String> = None;
1063 let mut count = 0;
1064 for item in log2.iter().unwrap() {
1065 let e = item.unwrap();
1066 assert_eq!(e.prev_event_hash, prev);
1067 prev = Some(e.event_hash.clone());
1068 count += 1;
1069 }
1070 assert_eq!(count, 25);
1071 }
1072
1073 #[test]
1074 fn empty_log_verifies() {
1075 let dir = tempdir().unwrap();
1076 let path = dir.path().join("empty.jsonl");
1077 let log = JsonlLog::open(&path).unwrap();
1078 assert_eq!(log.len(), 0);
1079 assert!(log.head().is_none());
1080 log.verify_chain().expect("empty chain is valid");
1081 }
1082
1083 #[test]
1084 fn append_persists_after_drop() {
1085 let dir = tempdir().unwrap();
1086 let path = dir.path().join("persist.jsonl");
1087 {
1088 let mut log = JsonlLog::open(&path).unwrap();
1089 let policy = allow_policy();
1090 log.append(fixture_event(0), &policy).unwrap();
1091 log.append(fixture_event(1), &policy).unwrap();
1092 }
1094 let log2 = JsonlLog::open(&path).unwrap();
1095 assert_eq!(log2.len(), 2);
1096 }
1097
1098 #[test]
1099 fn corrupted_payload_fails_verify() {
1100 let dir = tempdir().unwrap();
1101 let path = dir.path().join("corrupt.jsonl");
1102 let mut log = JsonlLog::open(&path).unwrap();
1103 let policy = allow_policy();
1104 log.append(fixture_event(0), &policy).unwrap();
1105 log.append(fixture_event(1), &policy).unwrap();
1106 log.append(fixture_event(2), &policy).unwrap();
1107
1108 let lines: Vec<String> = std::fs::read_to_string(&path)
1110 .unwrap()
1111 .lines()
1112 .map(|s| s.to_string())
1113 .collect();
1114 let mut bad: Event = serde_json::from_str(&lines[1]).unwrap();
1115 bad.payload = serde_json::json!({"tampered": true});
1116 let mut new_content = String::new();
1117 new_content.push_str(&lines[0]);
1118 new_content.push('\n');
1119 new_content.push_str(&serde_json::to_string(&bad).unwrap());
1120 new_content.push('\n');
1121 new_content.push_str(&lines[2]);
1122 new_content.push('\n');
1123 std::fs::write(&path, new_content).unwrap();
1124
1125 let log2 = JsonlLog::open(&path).unwrap();
1126 let err = log2.verify_chain().unwrap_err();
1127 assert!(matches!(err, JsonlError::ChainBroken(_)));
1128 }
1129
1130 #[test]
1131 fn append_after_reopen_continues_chain() {
1132 let dir = tempdir().unwrap();
1133 let path = dir.path().join("continue.jsonl");
1134 let head_before;
1135 {
1136 let mut log = JsonlLog::open(&path).unwrap();
1137 let policy = allow_policy();
1138 log.append(fixture_event(0), &policy).unwrap();
1139 head_before = log.append(fixture_event(1), &policy).unwrap();
1140 }
1141 let mut log2 = JsonlLog::open(&path).unwrap();
1142 assert_eq!(log2.head(), Some(head_before.as_str()));
1143 let head_after = log2.append(fixture_event(2), &allow_policy()).unwrap();
1144 assert_ne!(head_after, head_before);
1145 log2.verify_chain().expect("continued chain verifies");
1146 }
1147
1148 #[test]
1149 fn schema_migration_v1_to_v2_event_emitted_after_v1_head() {
1150 let dir = tempdir().unwrap();
1151 let path = dir.path().join("schema-boundary.jsonl");
1152 let mut log = JsonlLog::open(&path).unwrap();
1153 let previous_v1_head = log.append(fixture_event(0), &allow_policy()).unwrap();
1154 let payload = SchemaMigrationV1ToV2Payload::new(
1155 previous_v1_head.clone(),
1156 "script-digest",
1157 None,
1158 "fixture-digest",
1159 );
1160
1161 let boundary_head = log
1162 .append_schema_migration_v1_to_v2(payload, &migration_allow_policy())
1163 .expect("boundary event appends");
1164
1165 assert_eq!(log.len(), 2);
1166 assert_eq!(log.head(), Some(boundary_head.as_str()));
1167 log.verify_chain().expect("boundary chain verifies");
1168
1169 let rows = log.iter().unwrap().collect::<Result<Vec<_>, _>>().unwrap();
1170 let boundary = rows.last().expect("boundary row exists");
1171 assert_eq!(boundary.schema_version, SCHEMA_MIGRATION_V1_TO_V2_TARGET);
1172 assert_eq!(boundary.event_type, EventType::SystemNote);
1173 assert_eq!(boundary.source, EventSource::Runtime);
1174 assert_eq!(
1175 boundary.prev_event_hash.as_deref(),
1176 Some(previous_v1_head.as_str())
1177 );
1178 assert_eq!(
1179 boundary.payload["kind"],
1180 SCHEMA_MIGRATION_V1_TO_V2_EVENT_KIND
1181 );
1182 assert_eq!(
1183 boundary.payload["payload"]["previous_v1_head_hash"],
1184 previous_v1_head
1185 );
1186 }
1187
1188 #[test]
1189 fn schema_migration_v1_to_v2_event_rejects_wrong_previous_head() {
1190 let dir = tempdir().unwrap();
1191 let path = dir.path().join("schema-boundary-mismatch.jsonl");
1192 let mut log = JsonlLog::open(&path).unwrap();
1193 log.append(fixture_event(0), &allow_policy()).unwrap();
1194 let payload =
1195 SchemaMigrationV1ToV2Payload::new("not-current-head", "script-digest", None, "fixture");
1196
1197 let err = log
1198 .append_schema_migration_v1_to_v2(payload, &migration_allow_policy())
1199 .expect_err("wrong previous head must fail");
1200
1201 assert!(matches!(err, JsonlError::Validation(_)));
1202 assert_eq!(log.len(), 1);
1203 log.verify_chain()
1204 .expect("rejected boundary append must not corrupt chain");
1205 }
1206
1207 #[test]
1208 fn schema_migration_v1_to_v2_refuses_missing_authority_class_contributor() {
1209 let dir = tempdir().unwrap();
1210 let path = dir.path().join("schema-boundary-missing-auth.jsonl");
1211 let mut log = JsonlLog::open(&path).unwrap();
1212 let previous_v1_head = log.append(fixture_event(0), &allow_policy()).unwrap();
1213 let payload = SchemaMigrationV1ToV2Payload::new(
1214 previous_v1_head,
1215 "script-digest",
1216 None,
1217 "fixture-digest",
1218 );
1219
1220 let policy = compose_policy_outcomes(
1222 vec![
1223 PolicyContribution::new(
1224 SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID,
1225 PolicyOutcome::Allow,
1226 "fixture: attestation present",
1227 )
1228 .unwrap(),
1229 PolicyContribution::new(
1230 SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID,
1231 PolicyOutcome::Allow,
1232 "fixture: current use",
1233 )
1234 .unwrap(),
1235 ],
1236 None,
1237 );
1238
1239 let err = log
1240 .append_schema_migration_v1_to_v2(payload, &policy)
1241 .expect_err("missing authority-class contributor must fail");
1242 assert!(matches!(err, JsonlError::Validation(_)));
1243 assert_eq!(log.len(), 1);
1244 log.verify_chain()
1245 .expect("rejected boundary append must not corrupt chain");
1246 }
1247
1248 #[test]
1249 fn schema_migration_v1_to_v2_refuses_reject_outcome() {
1250 let dir = tempdir().unwrap();
1251 let path = dir.path().join("schema-boundary-reject.jsonl");
1252 let mut log = JsonlLog::open(&path).unwrap();
1253 let previous_v1_head = log.append(fixture_event(0), &allow_policy()).unwrap();
1254 let payload = SchemaMigrationV1ToV2Payload::new(
1255 previous_v1_head,
1256 "script-digest",
1257 None,
1258 "fixture-digest",
1259 );
1260
1261 let policy = compose_policy_outcomes(
1263 vec![
1264 PolicyContribution::new(
1265 SCHEMA_MIGRATION_AUTHORITY_CLASS_RULE_ID,
1266 PolicyOutcome::Reject,
1267 "fixture: non-operator authority class",
1268 )
1269 .unwrap(),
1270 PolicyContribution::new(
1271 SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID,
1272 PolicyOutcome::Allow,
1273 "fixture: attestation present",
1274 )
1275 .unwrap(),
1276 PolicyContribution::new(
1277 SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID,
1278 PolicyOutcome::Allow,
1279 "fixture: current use",
1280 )
1281 .unwrap(),
1282 ],
1283 None,
1284 );
1285 assert_eq!(policy.final_outcome, PolicyOutcome::Reject);
1286
1287 let err = log
1288 .append_schema_migration_v1_to_v2(payload, &policy)
1289 .expect_err("reject outcome must fail closed");
1290 assert!(matches!(err, JsonlError::Validation(_)));
1291 assert_eq!(log.len(), 1);
1292 }
1293
1294 #[test]
1295 fn schema_migration_v1_to_v2_refuses_quarantine_outcome() {
1296 let dir = tempdir().unwrap();
1297 let path = dir.path().join("schema-boundary-quarantine.jsonl");
1298 let mut log = JsonlLog::open(&path).unwrap();
1299 let previous_v1_head = log.append(fixture_event(0), &allow_policy()).unwrap();
1300 let payload = SchemaMigrationV1ToV2Payload::new(
1301 previous_v1_head,
1302 "script-digest",
1303 None,
1304 "fixture-digest",
1305 );
1306
1307 let policy = compose_policy_outcomes(
1308 vec![
1309 PolicyContribution::new(
1310 SCHEMA_MIGRATION_AUTHORITY_CLASS_RULE_ID,
1311 PolicyOutcome::Quarantine,
1312 "fixture: under-trust authority class",
1313 )
1314 .unwrap(),
1315 PolicyContribution::new(
1316 SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID,
1317 PolicyOutcome::Allow,
1318 "fixture: attestation present",
1319 )
1320 .unwrap(),
1321 PolicyContribution::new(
1322 SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID,
1323 PolicyOutcome::Allow,
1324 "fixture: current use",
1325 )
1326 .unwrap(),
1327 ],
1328 None,
1329 );
1330 assert_eq!(policy.final_outcome, PolicyOutcome::Quarantine);
1331
1332 let err = log
1333 .append_schema_migration_v1_to_v2(payload, &policy)
1334 .expect_err("quarantine outcome must fail closed");
1335 assert!(matches!(err, JsonlError::Validation(_)));
1336 assert_eq!(log.len(), 1);
1337 }
1338
1339 #[test]
1340 fn schema_migration_v1_to_v2_refuses_attestation_break_glass() {
1341 let dir = tempdir().unwrap();
1342 let path = dir.path().join("schema-boundary-break-glass.jsonl");
1343 let mut log = JsonlLog::open(&path).unwrap();
1344 let previous_v1_head = log.append(fixture_event(0), &allow_policy()).unwrap();
1345 let payload = SchemaMigrationV1ToV2Payload::new(
1346 previous_v1_head,
1347 "script-digest",
1348 None,
1349 "fixture-digest",
1350 );
1351
1352 let policy = compose_policy_outcomes(
1355 vec![
1356 PolicyContribution::new(
1357 SCHEMA_MIGRATION_AUTHORITY_CLASS_RULE_ID,
1358 PolicyOutcome::Allow,
1359 "fixture: operator authority class",
1360 )
1361 .unwrap(),
1362 PolicyContribution::new(
1363 SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID,
1364 PolicyOutcome::BreakGlass,
1365 "fixture: operator attempted break-glass on attestation",
1366 )
1367 .unwrap(),
1368 PolicyContribution::new(
1369 SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID,
1370 PolicyOutcome::Allow,
1371 "fixture: current use",
1372 )
1373 .unwrap(),
1374 ],
1375 None,
1376 );
1377
1378 let err = log
1379 .append_schema_migration_v1_to_v2(payload, &policy)
1380 .expect_err("BreakGlass on attestation must fail");
1381 assert!(matches!(err, JsonlError::Validation(_)));
1382 assert_eq!(log.len(), 1);
1383 }
1384
1385 #[test]
1386 fn schema_migration_v1_to_v2_refuses_historical_key_current_use() {
1387 let dir = tempdir().unwrap();
1388 let path = dir.path().join("schema-boundary-historical-key.jsonl");
1389 let mut log = JsonlLog::open(&path).unwrap();
1390 let previous_v1_head = log.append(fixture_event(0), &allow_policy()).unwrap();
1391 let payload = SchemaMigrationV1ToV2Payload::new(
1392 previous_v1_head,
1393 "script-digest",
1394 None,
1395 "fixture-digest",
1396 );
1397
1398 let policy = compose_policy_outcomes(
1401 vec![
1402 PolicyContribution::new(
1403 SCHEMA_MIGRATION_AUTHORITY_CLASS_RULE_ID,
1404 PolicyOutcome::Allow,
1405 "fixture: operator authority class",
1406 )
1407 .unwrap(),
1408 PolicyContribution::new(
1409 SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID,
1410 PolicyOutcome::Allow,
1411 "fixture: attestation present",
1412 )
1413 .unwrap(),
1414 PolicyContribution::new(
1415 SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID,
1416 PolicyOutcome::Reject,
1417 "fixture: signing key retired before attestation time",
1418 )
1419 .unwrap(),
1420 ],
1421 None,
1422 );
1423
1424 let err = log
1425 .append_schema_migration_v1_to_v2(payload, &policy)
1426 .expect_err("historical-only signing key must fail");
1427 assert!(matches!(err, JsonlError::Validation(_)));
1428 assert_eq!(log.len(), 1);
1429 }
1430
1431 #[test]
1432 fn append_refuses_policy_decision_missing_contributors() {
1433 let dir = tempdir().unwrap();
1434 let path = dir.path().join("missing-contributor.jsonl");
1435 let mut log = JsonlLog::open(&path).unwrap();
1436 let policy = compose_policy_outcomes(
1438 vec![PolicyContribution::new(
1439 APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID,
1440 PolicyOutcome::Allow,
1441 "fixture: tier gate only",
1442 )
1443 .unwrap()],
1444 None,
1445 );
1446
1447 let err = log
1448 .append(fixture_event(0), &policy)
1449 .expect_err("missing contributor must fail");
1450 assert!(matches!(err, JsonlError::Validation(_)));
1451 assert_eq!(log.len(), 0);
1452 }
1453
1454 #[test]
1455 fn append_refuses_reject_outcome() {
1456 let dir = tempdir().unwrap();
1457 let path = dir.path().join("reject-outcome.jsonl");
1458 let mut log = JsonlLog::open(&path).unwrap();
1459 let policy = compose_policy_outcomes(
1460 vec![
1461 PolicyContribution::new(
1462 APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID,
1463 PolicyOutcome::Reject,
1464 "fixture: tier gate refuses",
1465 )
1466 .unwrap(),
1467 PolicyContribution::new(
1468 APPEND_ATTESTATION_REQUIRED_RULE_ID,
1469 PolicyOutcome::Allow,
1470 "fixture: attestation present",
1471 )
1472 .unwrap(),
1473 PolicyContribution::new(
1474 APPEND_RUNTIME_MODE_RULE_ID,
1475 PolicyOutcome::Allow,
1476 "fixture: runtime mode permits unsigned",
1477 )
1478 .unwrap(),
1479 ],
1480 None,
1481 );
1482 assert_eq!(policy.final_outcome, PolicyOutcome::Reject);
1483
1484 let err = log
1485 .append(fixture_event(0), &policy)
1486 .expect_err("reject outcome must fail closed");
1487 assert!(matches!(err, JsonlError::Validation(_)));
1488 assert_eq!(log.len(), 0);
1489 }
1490
1491 #[test]
1492 fn append_refuses_user_event_when_attestation_contributor_not_allow() {
1493 let dir = tempdir().unwrap();
1494 let path = dir.path().join("user-attestation.jsonl");
1495 let mut log = JsonlLog::open(&path).unwrap();
1496 let policy = compose_policy_outcomes(
1499 vec![
1500 PolicyContribution::new(
1501 APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID,
1502 PolicyOutcome::Allow,
1503 "fixture: tier gate allows",
1504 )
1505 .unwrap(),
1506 PolicyContribution::new(
1507 APPEND_ATTESTATION_REQUIRED_RULE_ID,
1508 PolicyOutcome::Warn,
1509 "fixture: attestation warning",
1510 )
1511 .unwrap(),
1512 PolicyContribution::new(
1513 APPEND_RUNTIME_MODE_RULE_ID,
1514 PolicyOutcome::Allow,
1515 "fixture: runtime mode permits unsigned",
1516 )
1517 .unwrap(),
1518 ],
1519 None,
1520 );
1521 let err = log
1523 .append(fixture_event(0), &policy)
1524 .expect_err("User event without Allow attestation must fail");
1525 assert!(matches!(err, JsonlError::Validation(_)));
1526 assert_eq!(log.len(), 0);
1527 }
1528
1529 #[test]
1530 fn append_allows_warn_outcome() {
1531 let dir = tempdir().unwrap();
1532 let path = dir.path().join("warn-outcome.jsonl");
1533 let mut log = JsonlLog::open(&path).unwrap();
1534 let policy = compose_policy_outcomes(
1537 vec![
1538 PolicyContribution::new(
1539 APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID,
1540 PolicyOutcome::Allow,
1541 "fixture: tier gate allows",
1542 )
1543 .unwrap(),
1544 PolicyContribution::new(
1545 APPEND_ATTESTATION_REQUIRED_RULE_ID,
1546 PolicyOutcome::Allow,
1547 "fixture: attestation present",
1548 )
1549 .unwrap(),
1550 PolicyContribution::new(
1551 APPEND_RUNTIME_MODE_RULE_ID,
1552 PolicyOutcome::Warn,
1553 "fixture: runtime mode is local-development",
1554 )
1555 .unwrap(),
1556 ],
1557 None,
1558 );
1559 assert_eq!(policy.final_outcome, PolicyOutcome::Warn);
1560 log.append(fixture_event(0), &policy)
1561 .expect("warn outcome must still append");
1562 assert_eq!(log.len(), 1);
1563 }
1564}