use std::fs::{File, OpenOptions};
use std::io::{BufRead, BufReader, Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use chrono::Utc;
use cortex_core::{
attestor::Attestor, canonical::canonical_signing_input, schema_migration_v1_to_v2_event, Event,
EventSource, PolicyDecision, PolicyOutcome, SchemaMigrationV1ToV2Payload,
};
use thiserror::Error;
use crate::anchor_chain::{row_preimage, GENESIS_PREV_SIGNATURE};
use crate::hash::seal;
use crate::signed_row::{b64_decode, b64_encode, RowSignature, SignedRow};
pub const APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID: &str = "ledger.append.event_source_tier_gate";
pub const APPEND_ATTESTATION_REQUIRED_RULE_ID: &str = "ledger.append.attestation_required";
pub const APPEND_RUNTIME_MODE_RULE_ID: &str = "ledger.append.runtime_mode";
pub const APPEND_SIGNED_KEY_STATE_CURRENT_USE_RULE_ID: &str =
"ledger.append_signed.key_state_current_use";
pub const APPEND_SIGNED_TRUST_TIER_MINIMUM_RULE_ID: &str =
"ledger.append_signed.trust_tier_minimum";
pub const SCHEMA_MIGRATION_AUTHORITY_CLASS_RULE_ID: &str =
"ledger.schema_migration.authority_class";
pub const SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID: &str =
"ledger.schema_migration.attestation_required";
pub const SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID: &str =
"ledger.schema_migration.current_use_temporal_authority";
#[derive(Debug, Error)]
pub enum JsonlError {
#[error("io error on {path:?}: {source}")]
Io {
path: PathBuf,
#[source]
source: std::io::Error,
},
#[error("json decode error at line {line} in {path:?}: {source}")]
Decode {
path: PathBuf,
line: usize,
#[source]
source: serde_json::Error,
},
#[error("json encode error: {0}")]
Encode(#[source] serde_json::Error),
#[error("chain verification failed: {0}")]
ChainBroken(String),
#[error("validation failed: {0}")]
Validation(String),
}
#[derive(Debug)]
pub struct JsonlLog {
path: PathBuf,
head: Option<String>,
len: u64,
last_sig_prefix: [u8; 32],
}
pub(crate) fn ledger_id_for(path: &Path) -> String {
path.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("cortex-jsonl")
.to_string()
}
impl JsonlLog {
pub fn open(path: impl AsRef<Path>) -> Result<Self, JsonlError> {
let path = path.as_ref().to_path_buf();
OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.map_err(|source| JsonlError::Io {
path: path.clone(),
source,
})?;
let f = File::open(&path).map_err(|source| JsonlError::Io {
path: path.clone(),
source,
})?;
let reader = BufReader::new(f);
let mut head: Option<String> = None;
let mut len: u64 = 0;
let mut last_sig_prefix: [u8; 32] = GENESIS_PREV_SIGNATURE;
for (i, line) in reader.lines().enumerate() {
let line = line.map_err(|source| JsonlError::Io {
path: path.clone(),
source,
})?;
if line.trim().is_empty() {
continue;
}
let row: SignedRow =
serde_json::from_str(&line).map_err(|source| JsonlError::Decode {
path: path.clone(),
line: i + 1,
source,
})?;
head = Some(row.event.event_hash.clone());
len += 1;
if let Some(sig) = &row.signature {
if let Some(bytes) = b64_decode(&sig.bytes) {
if bytes.len() >= 32 {
last_sig_prefix.copy_from_slice(&bytes[..32]);
} else {
}
}
} else {
}
}
Ok(Self {
path,
head,
len,
last_sig_prefix,
})
}
#[must_use]
pub fn path(&self) -> &Path {
&self.path
}
#[must_use]
pub fn len(&self) -> u64 {
self.len
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.len == 0
}
#[must_use]
pub fn head(&self) -> Option<&str> {
self.head.as_deref()
}
pub fn append(
&mut self,
mut event: Event,
policy: &PolicyDecision,
) -> Result<String, JsonlError> {
require_append_contributors(policy)?;
require_event_source_attestation(policy, &event.source)?;
require_append_final_outcome(policy, "ledger.append")?;
event.prev_event_hash = self.head.clone();
seal(&mut event);
let row = SignedRow::unsigned(event);
let line = serde_json::to_string(&row).map_err(JsonlError::Encode)?;
self.write_line(&line)?;
self.head = Some(row.event.event_hash.clone());
self.len += 1;
Ok(row.event.event_hash)
}
pub fn append_signed(
&mut self,
mut event: Event,
attestor: &dyn Attestor,
policy: &PolicyDecision,
) -> Result<String, JsonlError> {
require_append_signed_contributors(policy)?;
require_append_signed_key_state_not_break_glassed(policy)?;
require_append_final_outcome(policy, "ledger.append_signed")?;
event.prev_event_hash = self.head.clone();
seal(&mut event);
let signed_at = Utc::now();
let key_id = attestor.key_id().to_string();
let ledger_id = ledger_id_for(&self.path);
let preimage = row_preimage(
&event,
&self.last_sig_prefix,
&ledger_id,
&key_id,
signed_at,
);
let signing_input = canonical_signing_input(&preimage);
let sig = attestor.sign(&signing_input);
let sig_bytes = sig.to_bytes();
let row = SignedRow {
event,
signature: Some(RowSignature {
schema_version: cortex_core::canonical::SCHEMA_VERSION_ATTESTATION,
key_id,
signed_at,
bytes: b64_encode(&sig_bytes),
}),
};
let line = serde_json::to_string(&row).map_err(JsonlError::Encode)?;
self.write_line(&line)?;
self.head = Some(row.event.event_hash.clone());
self.len += 1;
self.last_sig_prefix.copy_from_slice(&sig_bytes[..32]);
Ok(row.event.event_hash)
}
pub fn append_schema_migration_v1_to_v2(
&mut self,
payload: SchemaMigrationV1ToV2Payload,
policy: &PolicyDecision,
) -> Result<String, JsonlError> {
let (head, _event) = self.append_schema_migration_v1_to_v2_with_event(payload, policy)?;
Ok(head)
}
pub fn append_schema_migration_v1_to_v2_with_event(
&mut self,
payload: SchemaMigrationV1ToV2Payload,
policy: &PolicyDecision,
) -> Result<(String, Event), JsonlError> {
require_schema_migration_contributors(policy)?;
require_schema_migration_attestation_not_break_glassed(policy)?;
require_schema_migration_current_use_not_break_glassed(policy)?;
require_append_final_outcome(policy, "ledger.schema_migration")?;
let expected_head = payload.previous_v1_head_hash.clone();
match self.head.as_deref() {
Some(head) if head == expected_head => {}
observed => {
return Err(JsonlError::Validation(format!(
"schema migration boundary previous_v1_head_hash mismatch: observed {observed:?}, expected {expected_head}"
)));
}
}
let now = Utc::now();
let event = schema_migration_v1_to_v2_event(payload, now, now, None)
.map_err(|err| JsonlError::Validation(err.to_string()))?;
self.append_unchecked_returning_event(event)
}
fn append_unchecked_returning_event(
&mut self,
mut event: Event,
) -> Result<(String, Event), JsonlError> {
event.prev_event_hash = self.head.clone();
seal(&mut event);
let row = SignedRow::unsigned(event.clone());
let line = serde_json::to_string(&row).map_err(JsonlError::Encode)?;
self.write_line(&line)?;
self.head = Some(event.event_hash.clone());
self.len += 1;
Ok((event.event_hash.clone(), event))
}
fn write_line(&self, line: &str) -> Result<(), JsonlError> {
let mut f = OpenOptions::new()
.create(true)
.append(true)
.open(&self.path)
.map_err(|source| JsonlError::Io {
path: self.path.clone(),
source,
})?;
f.write_all(line.as_bytes())
.map_err(|source| JsonlError::Io {
path: self.path.clone(),
source,
})?;
f.write_all(b"\n").map_err(|source| JsonlError::Io {
path: self.path.clone(),
source,
})?;
f.sync_all().map_err(|source| JsonlError::Io {
path: self.path.clone(),
source,
})?;
Ok(())
}
#[must_use]
pub fn last_sig_prefix(&self) -> [u8; 32] {
self.last_sig_prefix
}
pub fn iter(&self) -> Result<JsonlIter, JsonlError> {
let mut f = File::open(&self.path).map_err(|source| JsonlError::Io {
path: self.path.clone(),
source,
})?;
f.seek(SeekFrom::Start(0))
.map_err(|source| JsonlError::Io {
path: self.path.clone(),
source,
})?;
Ok(JsonlIter {
path: self.path.clone(),
reader: BufReader::new(Box::new(f) as Box<dyn Read>),
line: 0,
})
}
pub fn iter_signed(&self) -> Result<SignedJsonlIter, JsonlError> {
let f = File::open(&self.path).map_err(|source| JsonlError::Io {
path: self.path.clone(),
source,
})?;
Ok(SignedJsonlIter {
path: self.path.clone(),
reader: BufReader::new(Box::new(f) as Box<dyn Read>),
line: 0,
})
}
pub fn verify_chain(&self) -> Result<(), JsonlError> {
let mut prev: Option<String> = None;
for (i, item) in self.iter()?.enumerate() {
let e = item?;
let expected_payload = crate::hash::payload_hash(&e.payload);
if e.payload_hash != expected_payload {
return Err(JsonlError::ChainBroken(format!(
"row {} payload_hash mismatch",
i + 1
)));
}
let expected_event =
crate::hash::event_hash(e.prev_event_hash.as_deref(), &e.payload_hash);
if e.event_hash != expected_event {
return Err(JsonlError::ChainBroken(format!(
"row {} event_hash mismatch",
i + 1
)));
}
if e.prev_event_hash != prev {
return Err(JsonlError::ChainBroken(format!(
"row {} prev_event_hash does not point at previous row",
i + 1
)));
}
prev = Some(e.event_hash.clone());
}
Ok(())
}
}
fn require_append_final_outcome(policy: &PolicyDecision, surface: &str) -> Result<(), JsonlError> {
match policy.final_outcome {
PolicyOutcome::Allow | PolicyOutcome::Warn | PolicyOutcome::BreakGlass => Ok(()),
PolicyOutcome::Quarantine | PolicyOutcome::Reject => Err(JsonlError::Validation(format!(
"{surface} preflight: composed policy outcome {:?} blocks ledger append",
policy.final_outcome,
))),
}
}
fn require_contributor(policy: &PolicyDecision, rule_id: &str) -> Result<(), JsonlError> {
let contains_rule = policy
.contributing
.iter()
.chain(policy.discarded.iter())
.any(|contribution| contribution.rule_id.as_str() == rule_id);
if contains_rule {
Ok(())
} else {
Err(JsonlError::Validation(format!(
"policy decision missing required contributor `{rule_id}`; caller skipped ADR 0026 composition",
)))
}
}
fn require_append_contributors(policy: &PolicyDecision) -> Result<(), JsonlError> {
require_contributor(policy, APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID)?;
require_contributor(policy, APPEND_ATTESTATION_REQUIRED_RULE_ID)?;
require_contributor(policy, APPEND_RUNTIME_MODE_RULE_ID)?;
Ok(())
}
fn require_append_signed_contributors(policy: &PolicyDecision) -> Result<(), JsonlError> {
require_contributor(policy, APPEND_SIGNED_KEY_STATE_CURRENT_USE_RULE_ID)?;
require_contributor(policy, APPEND_SIGNED_TRUST_TIER_MINIMUM_RULE_ID)?;
Ok(())
}
fn require_event_source_attestation(
policy: &PolicyDecision,
source: &EventSource,
) -> Result<(), JsonlError> {
if !matches!(source, EventSource::User) {
return Ok(());
}
let attestation = policy
.contributing
.iter()
.chain(policy.discarded.iter())
.find(|contribution| {
contribution.rule_id.as_str() == APPEND_ATTESTATION_REQUIRED_RULE_ID
})
.ok_or_else(|| {
JsonlError::Validation(format!(
"ledger.append preflight: required attestation contributor `{APPEND_ATTESTATION_REQUIRED_RULE_ID}` is absent from the policy decision for EventSource::User"
))
})?;
if attestation.outcome == PolicyOutcome::Allow {
Ok(())
} else {
Err(JsonlError::Validation(format!(
"ledger.append preflight: attestation contributor `{APPEND_ATTESTATION_REQUIRED_RULE_ID}` returned {:?} for EventSource::User; ADR 0026 ยง4 forbids BreakGlass substituting for attestation",
attestation.outcome,
)))
}
}
fn require_schema_migration_contributors(policy: &PolicyDecision) -> Result<(), JsonlError> {
require_contributor(policy, SCHEMA_MIGRATION_AUTHORITY_CLASS_RULE_ID)?;
require_contributor(policy, SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID)?;
require_contributor(
policy,
SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID,
)?;
Ok(())
}
fn require_schema_migration_attestation_not_break_glassed(
policy: &PolicyDecision,
) -> Result<(), JsonlError> {
let attestation = policy
.contributing
.iter()
.chain(policy.discarded.iter())
.find(|contribution| {
contribution.rule_id.as_str() == SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID
})
.ok_or_else(|| {
JsonlError::Validation(format!(
"ledger.schema_migration preflight: required attestation contributor `{SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID}` is absent from the policy decision"
))
})?;
if attestation.outcome == PolicyOutcome::Allow {
Ok(())
} else {
Err(JsonlError::Validation(format!(
"ledger.schema_migration preflight: attestation contributor `{SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID}` returned {:?}; ADR 0026 ยง4 forbids BreakGlass substituting for operator attestation at the migration authority root",
attestation.outcome,
)))
}
}
fn require_schema_migration_current_use_not_break_glassed(
policy: &PolicyDecision,
) -> Result<(), JsonlError> {
let current_use = policy
.contributing
.iter()
.chain(policy.discarded.iter())
.find(|contribution| {
contribution.rule_id.as_str()
== SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID
})
.ok_or_else(|| {
JsonlError::Validation(format!(
"ledger.schema_migration preflight: required current-use contributor `{SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID}` is absent from the policy decision"
))
})?;
if current_use.outcome == PolicyOutcome::Allow {
Ok(())
} else {
Err(JsonlError::Validation(format!(
"ledger.schema_migration preflight: current-use contributor `{SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID}` returned {:?}; ADR 0023 forbids historical-only or revoked signing keys at the migration authority root",
current_use.outcome,
)))
}
}
fn require_append_signed_key_state_not_break_glassed(
policy: &PolicyDecision,
) -> Result<(), JsonlError> {
let key_state = policy
.contributing
.iter()
.chain(policy.discarded.iter())
.find(|contribution| {
contribution.rule_id.as_str() == APPEND_SIGNED_KEY_STATE_CURRENT_USE_RULE_ID
})
.ok_or_else(|| {
JsonlError::Validation(format!(
"ledger.append_signed preflight: required current-use contributor `{APPEND_SIGNED_KEY_STATE_CURRENT_USE_RULE_ID}` is absent from the policy decision"
))
})?;
if key_state.outcome == PolicyOutcome::Allow {
Ok(())
} else {
Err(JsonlError::Validation(format!(
"ledger.append_signed preflight: current-use contributor `{APPEND_SIGNED_KEY_STATE_CURRENT_USE_RULE_ID}` returned {:?}; ADR 0023 forbids historical-only or revoked signing keys at the trusted ledger root",
key_state.outcome,
)))
}
}
#[must_use]
pub fn append_policy_decision_test_allow() -> PolicyDecision {
use cortex_core::{compose_policy_outcomes, PolicyContribution};
compose_policy_outcomes(
vec![
PolicyContribution::new(
APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID,
PolicyOutcome::Allow,
"test fixture: event source tier gate satisfied",
)
.expect("static test contribution is valid"),
PolicyContribution::new(
APPEND_ATTESTATION_REQUIRED_RULE_ID,
PolicyOutcome::Allow,
"test fixture: attestation requirement satisfied",
)
.expect("static test contribution is valid"),
PolicyContribution::new(
APPEND_RUNTIME_MODE_RULE_ID,
PolicyOutcome::Allow,
"test fixture: runtime mode permits unsigned append",
)
.expect("static test contribution is valid"),
],
None,
)
}
#[must_use]
pub fn schema_migration_v1_to_v2_policy_decision_test_allow() -> PolicyDecision {
use cortex_core::{compose_policy_outcomes, PolicyContribution};
compose_policy_outcomes(
vec![
PolicyContribution::new(
SCHEMA_MIGRATION_AUTHORITY_CLASS_RULE_ID,
PolicyOutcome::Allow,
"test fixture: operator authority class satisfied",
)
.expect("static test contribution is valid"),
PolicyContribution::new(
SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID,
PolicyOutcome::Allow,
"test fixture: operator attestation present",
)
.expect("static test contribution is valid"),
PolicyContribution::new(
SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID,
PolicyOutcome::Allow,
"test fixture: signing key state is current-use",
)
.expect("static test contribution is valid"),
],
None,
)
}
#[must_use]
pub fn append_signed_policy_decision_test_allow() -> PolicyDecision {
use cortex_core::{compose_policy_outcomes, PolicyContribution};
compose_policy_outcomes(
vec![
PolicyContribution::new(
APPEND_SIGNED_KEY_STATE_CURRENT_USE_RULE_ID,
PolicyOutcome::Allow,
"test fixture: signing key state is current-use",
)
.expect("static test contribution is valid"),
PolicyContribution::new(
APPEND_SIGNED_TRUST_TIER_MINIMUM_RULE_ID,
PolicyOutcome::Allow,
"test fixture: signing principal trust tier satisfies minimum",
)
.expect("static test contribution is valid"),
],
None,
)
}
pub struct JsonlIter {
path: PathBuf,
reader: BufReader<Box<dyn Read>>,
line: usize,
}
impl std::fmt::Debug for JsonlIter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("JsonlIter")
.field("path", &self.path)
.field("line", &self.line)
.finish()
}
}
impl Iterator for JsonlIter {
type Item = Result<Event, JsonlError>;
fn next(&mut self) -> Option<Self::Item> {
loop {
let mut buf = String::new();
let n = match self.reader.read_line(&mut buf) {
Ok(n) => n,
Err(source) => {
return Some(Err(JsonlError::Io {
path: self.path.clone(),
source,
}));
}
};
if n == 0 {
return None; }
self.line += 1;
let trimmed = buf.trim();
if trimmed.is_empty() {
continue;
}
return Some(
serde_json::from_str::<SignedRow>(trimmed)
.map(|row| row.event)
.map_err(|source| JsonlError::Decode {
path: self.path.clone(),
line: self.line,
source,
}),
);
}
}
}
pub struct SignedJsonlIter {
path: PathBuf,
reader: BufReader<Box<dyn Read>>,
line: usize,
}
impl std::fmt::Debug for SignedJsonlIter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SignedJsonlIter")
.field("path", &self.path)
.field("line", &self.line)
.finish()
}
}
impl Iterator for SignedJsonlIter {
type Item = Result<SignedRow, JsonlError>;
fn next(&mut self) -> Option<Self::Item> {
loop {
let mut buf = String::new();
let n = match self.reader.read_line(&mut buf) {
Ok(n) => n,
Err(source) => {
return Some(Err(JsonlError::Io {
path: self.path.clone(),
source,
}));
}
};
if n == 0 {
return None;
}
self.line += 1;
let trimmed = buf.trim();
if trimmed.is_empty() {
continue;
}
return Some(
serde_json::from_str::<SignedRow>(trimmed).map_err(|source| JsonlError::Decode {
path: self.path.clone(),
line: self.line,
source,
}),
);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::TimeZone;
use cortex_core::{
compose_policy_outcomes, Event, EventId, EventSource, EventType, PolicyContribution,
SchemaMigrationV1ToV2Payload, SCHEMA_MIGRATION_V1_TO_V2_EVENT_KIND,
SCHEMA_MIGRATION_V1_TO_V2_TARGET, SCHEMA_VERSION,
};
use tempfile::tempdir;
fn fixture_event(seq: u64) -> Event {
Event {
id: EventId::new(),
schema_version: SCHEMA_VERSION,
observed_at: chrono::Utc.with_ymd_and_hms(2026, 1, 1, 12, 0, 0).unwrap(),
recorded_at: chrono::Utc.with_ymd_and_hms(2026, 1, 1, 12, 0, 1).unwrap(),
source: EventSource::User,
event_type: EventType::UserMessage,
trace_id: None,
session_id: None,
domain_tags: vec![],
payload: serde_json::json!({"seq": seq, "text": format!("event {seq}")}),
payload_hash: String::new(),
prev_event_hash: None,
event_hash: String::new(),
}
}
fn allow_policy() -> PolicyDecision {
append_policy_decision_test_allow()
}
fn migration_allow_policy() -> PolicyDecision {
schema_migration_v1_to_v2_policy_decision_test_allow()
}
#[test]
fn append_n_reopen_and_verify() {
let dir = tempdir().unwrap();
let path = dir.path().join("events.jsonl");
let mut log = JsonlLog::open(&path).unwrap();
let mut heads = Vec::new();
let policy = allow_policy();
for i in 0..25u64 {
let head = log.append(fixture_event(i), &policy).unwrap();
heads.push(head);
}
assert_eq!(log.len(), 25);
assert_eq!(log.head(), Some(heads.last().unwrap().as_str()));
let log2 = JsonlLog::open(&path).unwrap();
assert_eq!(log2.len(), 25);
assert_eq!(log2.head(), Some(heads.last().unwrap().as_str()));
log2.verify_chain().expect("chain verifies after reopen");
let mut prev: Option<String> = None;
let mut count = 0;
for item in log2.iter().unwrap() {
let e = item.unwrap();
assert_eq!(e.prev_event_hash, prev);
prev = Some(e.event_hash.clone());
count += 1;
}
assert_eq!(count, 25);
}
#[test]
fn empty_log_verifies() {
let dir = tempdir().unwrap();
let path = dir.path().join("empty.jsonl");
let log = JsonlLog::open(&path).unwrap();
assert_eq!(log.len(), 0);
assert!(log.head().is_none());
log.verify_chain().expect("empty chain is valid");
}
#[test]
fn append_persists_after_drop() {
let dir = tempdir().unwrap();
let path = dir.path().join("persist.jsonl");
{
let mut log = JsonlLog::open(&path).unwrap();
let policy = allow_policy();
log.append(fixture_event(0), &policy).unwrap();
log.append(fixture_event(1), &policy).unwrap();
}
let log2 = JsonlLog::open(&path).unwrap();
assert_eq!(log2.len(), 2);
}
#[test]
fn corrupted_payload_fails_verify() {
let dir = tempdir().unwrap();
let path = dir.path().join("corrupt.jsonl");
let mut log = JsonlLog::open(&path).unwrap();
let policy = allow_policy();
log.append(fixture_event(0), &policy).unwrap();
log.append(fixture_event(1), &policy).unwrap();
log.append(fixture_event(2), &policy).unwrap();
let lines: Vec<String> = std::fs::read_to_string(&path)
.unwrap()
.lines()
.map(|s| s.to_string())
.collect();
let mut bad: Event = serde_json::from_str(&lines[1]).unwrap();
bad.payload = serde_json::json!({"tampered": true});
let mut new_content = String::new();
new_content.push_str(&lines[0]);
new_content.push('\n');
new_content.push_str(&serde_json::to_string(&bad).unwrap());
new_content.push('\n');
new_content.push_str(&lines[2]);
new_content.push('\n');
std::fs::write(&path, new_content).unwrap();
let log2 = JsonlLog::open(&path).unwrap();
let err = log2.verify_chain().unwrap_err();
assert!(matches!(err, JsonlError::ChainBroken(_)));
}
#[test]
fn append_after_reopen_continues_chain() {
let dir = tempdir().unwrap();
let path = dir.path().join("continue.jsonl");
let head_before;
{
let mut log = JsonlLog::open(&path).unwrap();
let policy = allow_policy();
log.append(fixture_event(0), &policy).unwrap();
head_before = log.append(fixture_event(1), &policy).unwrap();
}
let mut log2 = JsonlLog::open(&path).unwrap();
assert_eq!(log2.head(), Some(head_before.as_str()));
let head_after = log2.append(fixture_event(2), &allow_policy()).unwrap();
assert_ne!(head_after, head_before);
log2.verify_chain().expect("continued chain verifies");
}
#[test]
fn schema_migration_v1_to_v2_event_emitted_after_v1_head() {
let dir = tempdir().unwrap();
let path = dir.path().join("schema-boundary.jsonl");
let mut log = JsonlLog::open(&path).unwrap();
let previous_v1_head = log.append(fixture_event(0), &allow_policy()).unwrap();
let payload = SchemaMigrationV1ToV2Payload::new(
previous_v1_head.clone(),
"script-digest",
None,
"fixture-digest",
);
let boundary_head = log
.append_schema_migration_v1_to_v2(payload, &migration_allow_policy())
.expect("boundary event appends");
assert_eq!(log.len(), 2);
assert_eq!(log.head(), Some(boundary_head.as_str()));
log.verify_chain().expect("boundary chain verifies");
let rows = log.iter().unwrap().collect::<Result<Vec<_>, _>>().unwrap();
let boundary = rows.last().expect("boundary row exists");
assert_eq!(boundary.schema_version, SCHEMA_MIGRATION_V1_TO_V2_TARGET);
assert_eq!(boundary.event_type, EventType::SystemNote);
assert_eq!(boundary.source, EventSource::Runtime);
assert_eq!(
boundary.prev_event_hash.as_deref(),
Some(previous_v1_head.as_str())
);
assert_eq!(
boundary.payload["kind"],
SCHEMA_MIGRATION_V1_TO_V2_EVENT_KIND
);
assert_eq!(
boundary.payload["payload"]["previous_v1_head_hash"],
previous_v1_head
);
}
#[test]
fn schema_migration_v1_to_v2_event_rejects_wrong_previous_head() {
let dir = tempdir().unwrap();
let path = dir.path().join("schema-boundary-mismatch.jsonl");
let mut log = JsonlLog::open(&path).unwrap();
log.append(fixture_event(0), &allow_policy()).unwrap();
let payload =
SchemaMigrationV1ToV2Payload::new("not-current-head", "script-digest", None, "fixture");
let err = log
.append_schema_migration_v1_to_v2(payload, &migration_allow_policy())
.expect_err("wrong previous head must fail");
assert!(matches!(err, JsonlError::Validation(_)));
assert_eq!(log.len(), 1);
log.verify_chain()
.expect("rejected boundary append must not corrupt chain");
}
#[test]
fn schema_migration_v1_to_v2_refuses_missing_authority_class_contributor() {
let dir = tempdir().unwrap();
let path = dir.path().join("schema-boundary-missing-auth.jsonl");
let mut log = JsonlLog::open(&path).unwrap();
let previous_v1_head = log.append(fixture_event(0), &allow_policy()).unwrap();
let payload = SchemaMigrationV1ToV2Payload::new(
previous_v1_head,
"script-digest",
None,
"fixture-digest",
);
let policy = compose_policy_outcomes(
vec![
PolicyContribution::new(
SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID,
PolicyOutcome::Allow,
"fixture: attestation present",
)
.unwrap(),
PolicyContribution::new(
SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID,
PolicyOutcome::Allow,
"fixture: current use",
)
.unwrap(),
],
None,
);
let err = log
.append_schema_migration_v1_to_v2(payload, &policy)
.expect_err("missing authority-class contributor must fail");
assert!(matches!(err, JsonlError::Validation(_)));
assert_eq!(log.len(), 1);
log.verify_chain()
.expect("rejected boundary append must not corrupt chain");
}
#[test]
fn schema_migration_v1_to_v2_refuses_reject_outcome() {
let dir = tempdir().unwrap();
let path = dir.path().join("schema-boundary-reject.jsonl");
let mut log = JsonlLog::open(&path).unwrap();
let previous_v1_head = log.append(fixture_event(0), &allow_policy()).unwrap();
let payload = SchemaMigrationV1ToV2Payload::new(
previous_v1_head,
"script-digest",
None,
"fixture-digest",
);
let policy = compose_policy_outcomes(
vec![
PolicyContribution::new(
SCHEMA_MIGRATION_AUTHORITY_CLASS_RULE_ID,
PolicyOutcome::Reject,
"fixture: non-operator authority class",
)
.unwrap(),
PolicyContribution::new(
SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID,
PolicyOutcome::Allow,
"fixture: attestation present",
)
.unwrap(),
PolicyContribution::new(
SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID,
PolicyOutcome::Allow,
"fixture: current use",
)
.unwrap(),
],
None,
);
assert_eq!(policy.final_outcome, PolicyOutcome::Reject);
let err = log
.append_schema_migration_v1_to_v2(payload, &policy)
.expect_err("reject outcome must fail closed");
assert!(matches!(err, JsonlError::Validation(_)));
assert_eq!(log.len(), 1);
}
#[test]
fn schema_migration_v1_to_v2_refuses_quarantine_outcome() {
let dir = tempdir().unwrap();
let path = dir.path().join("schema-boundary-quarantine.jsonl");
let mut log = JsonlLog::open(&path).unwrap();
let previous_v1_head = log.append(fixture_event(0), &allow_policy()).unwrap();
let payload = SchemaMigrationV1ToV2Payload::new(
previous_v1_head,
"script-digest",
None,
"fixture-digest",
);
let policy = compose_policy_outcomes(
vec![
PolicyContribution::new(
SCHEMA_MIGRATION_AUTHORITY_CLASS_RULE_ID,
PolicyOutcome::Quarantine,
"fixture: under-trust authority class",
)
.unwrap(),
PolicyContribution::new(
SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID,
PolicyOutcome::Allow,
"fixture: attestation present",
)
.unwrap(),
PolicyContribution::new(
SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID,
PolicyOutcome::Allow,
"fixture: current use",
)
.unwrap(),
],
None,
);
assert_eq!(policy.final_outcome, PolicyOutcome::Quarantine);
let err = log
.append_schema_migration_v1_to_v2(payload, &policy)
.expect_err("quarantine outcome must fail closed");
assert!(matches!(err, JsonlError::Validation(_)));
assert_eq!(log.len(), 1);
}
#[test]
fn schema_migration_v1_to_v2_refuses_attestation_break_glass() {
let dir = tempdir().unwrap();
let path = dir.path().join("schema-boundary-break-glass.jsonl");
let mut log = JsonlLog::open(&path).unwrap();
let previous_v1_head = log.append(fixture_event(0), &allow_policy()).unwrap();
let payload = SchemaMigrationV1ToV2Payload::new(
previous_v1_head,
"script-digest",
None,
"fixture-digest",
);
let policy = compose_policy_outcomes(
vec![
PolicyContribution::new(
SCHEMA_MIGRATION_AUTHORITY_CLASS_RULE_ID,
PolicyOutcome::Allow,
"fixture: operator authority class",
)
.unwrap(),
PolicyContribution::new(
SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID,
PolicyOutcome::BreakGlass,
"fixture: operator attempted break-glass on attestation",
)
.unwrap(),
PolicyContribution::new(
SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID,
PolicyOutcome::Allow,
"fixture: current use",
)
.unwrap(),
],
None,
);
let err = log
.append_schema_migration_v1_to_v2(payload, &policy)
.expect_err("BreakGlass on attestation must fail");
assert!(matches!(err, JsonlError::Validation(_)));
assert_eq!(log.len(), 1);
}
#[test]
fn schema_migration_v1_to_v2_refuses_historical_key_current_use() {
let dir = tempdir().unwrap();
let path = dir.path().join("schema-boundary-historical-key.jsonl");
let mut log = JsonlLog::open(&path).unwrap();
let previous_v1_head = log.append(fixture_event(0), &allow_policy()).unwrap();
let payload = SchemaMigrationV1ToV2Payload::new(
previous_v1_head,
"script-digest",
None,
"fixture-digest",
);
let policy = compose_policy_outcomes(
vec![
PolicyContribution::new(
SCHEMA_MIGRATION_AUTHORITY_CLASS_RULE_ID,
PolicyOutcome::Allow,
"fixture: operator authority class",
)
.unwrap(),
PolicyContribution::new(
SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID,
PolicyOutcome::Allow,
"fixture: attestation present",
)
.unwrap(),
PolicyContribution::new(
SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID,
PolicyOutcome::Reject,
"fixture: signing key retired before attestation time",
)
.unwrap(),
],
None,
);
let err = log
.append_schema_migration_v1_to_v2(payload, &policy)
.expect_err("historical-only signing key must fail");
assert!(matches!(err, JsonlError::Validation(_)));
assert_eq!(log.len(), 1);
}
#[test]
fn append_refuses_policy_decision_missing_contributors() {
let dir = tempdir().unwrap();
let path = dir.path().join("missing-contributor.jsonl");
let mut log = JsonlLog::open(&path).unwrap();
let policy = compose_policy_outcomes(
vec![PolicyContribution::new(
APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID,
PolicyOutcome::Allow,
"fixture: tier gate only",
)
.unwrap()],
None,
);
let err = log
.append(fixture_event(0), &policy)
.expect_err("missing contributor must fail");
assert!(matches!(err, JsonlError::Validation(_)));
assert_eq!(log.len(), 0);
}
#[test]
fn append_refuses_reject_outcome() {
let dir = tempdir().unwrap();
let path = dir.path().join("reject-outcome.jsonl");
let mut log = JsonlLog::open(&path).unwrap();
let policy = compose_policy_outcomes(
vec![
PolicyContribution::new(
APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID,
PolicyOutcome::Reject,
"fixture: tier gate refuses",
)
.unwrap(),
PolicyContribution::new(
APPEND_ATTESTATION_REQUIRED_RULE_ID,
PolicyOutcome::Allow,
"fixture: attestation present",
)
.unwrap(),
PolicyContribution::new(
APPEND_RUNTIME_MODE_RULE_ID,
PolicyOutcome::Allow,
"fixture: runtime mode permits unsigned",
)
.unwrap(),
],
None,
);
assert_eq!(policy.final_outcome, PolicyOutcome::Reject);
let err = log
.append(fixture_event(0), &policy)
.expect_err("reject outcome must fail closed");
assert!(matches!(err, JsonlError::Validation(_)));
assert_eq!(log.len(), 0);
}
#[test]
fn append_refuses_user_event_when_attestation_contributor_not_allow() {
let dir = tempdir().unwrap();
let path = dir.path().join("user-attestation.jsonl");
let mut log = JsonlLog::open(&path).unwrap();
let policy = compose_policy_outcomes(
vec![
PolicyContribution::new(
APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID,
PolicyOutcome::Allow,
"fixture: tier gate allows",
)
.unwrap(),
PolicyContribution::new(
APPEND_ATTESTATION_REQUIRED_RULE_ID,
PolicyOutcome::Warn,
"fixture: attestation warning",
)
.unwrap(),
PolicyContribution::new(
APPEND_RUNTIME_MODE_RULE_ID,
PolicyOutcome::Allow,
"fixture: runtime mode permits unsigned",
)
.unwrap(),
],
None,
);
let err = log
.append(fixture_event(0), &policy)
.expect_err("User event without Allow attestation must fail");
assert!(matches!(err, JsonlError::Validation(_)));
assert_eq!(log.len(), 0);
}
#[test]
fn append_allows_warn_outcome() {
let dir = tempdir().unwrap();
let path = dir.path().join("warn-outcome.jsonl");
let mut log = JsonlLog::open(&path).unwrap();
let policy = compose_policy_outcomes(
vec![
PolicyContribution::new(
APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID,
PolicyOutcome::Allow,
"fixture: tier gate allows",
)
.unwrap(),
PolicyContribution::new(
APPEND_ATTESTATION_REQUIRED_RULE_ID,
PolicyOutcome::Allow,
"fixture: attestation present",
)
.unwrap(),
PolicyContribution::new(
APPEND_RUNTIME_MODE_RULE_ID,
PolicyOutcome::Warn,
"fixture: runtime mode is local-development",
)
.unwrap(),
],
None,
);
assert_eq!(policy.final_outcome, PolicyOutcome::Warn);
log.append(fixture_event(0), &policy)
.expect("warn outcome must still append");
assert_eq!(log.len(), 1);
}
}