use std::fmt;
use std::fs::File;
use std::io::{BufRead, BufReader, Read};
use std::path::{Path, PathBuf};
use std::str::FromStr;
use chrono::{DateTime, SecondsFormat, Utc};
use thiserror::Error;
use crate::hash::{event_hash, payload_hash, HEX_HASH_LEN};
use crate::jsonl::JsonlError;
use crate::signed_row::SignedRow;
pub const ANCHOR_FORMAT_HEADER_V1: &str = "# cortex-ledger-anchor-format: 1";
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct LedgerAnchor {
pub timestamp: DateTime<Utc>,
pub event_count: u64,
pub chain_head_hash: String,
}
impl LedgerAnchor {
pub fn new(
timestamp: DateTime<Utc>,
event_count: u64,
chain_head_hash: impl Into<String>,
) -> Result<Self, AnchorParseError> {
let chain_head_hash = chain_head_hash.into();
validate_chain_head_hash(&chain_head_hash)?;
Ok(Self {
timestamp,
event_count,
chain_head_hash,
})
}
#[must_use]
pub fn to_anchor_text(&self) -> String {
format!(
"{ANCHOR_FORMAT_HEADER_V1}\n{} {} {}\n",
self.timestamp.to_rfc3339_opts(SecondsFormat::Secs, true),
self.event_count,
self.chain_head_hash
)
}
}
impl fmt::Display for LedgerAnchor {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&self.to_anchor_text())
}
}
impl FromStr for LedgerAnchor {
type Err = AnchorParseError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
parse_anchor(s)
}
}
pub fn parse_anchor(input: &str) -> Result<LedgerAnchor, AnchorParseError> {
let mut lines = input.lines();
let Some(header) = lines.next() else {
return Err(AnchorParseError::MissingHeader);
};
if header != ANCHOR_FORMAT_HEADER_V1 {
return Err(AnchorParseError::UnknownFormatHeader {
observed: header.to_string(),
});
}
let Some(body) = lines.next() else {
return Err(AnchorParseError::MissingBody);
};
if body.trim() != body {
return Err(AnchorParseError::MalformedBody {
reason: "body line must not have leading or trailing whitespace".to_string(),
});
}
if lines.next().is_some() {
return Err(AnchorParseError::TrailingContent);
}
let parts: Vec<&str> = body.split(' ').collect();
if parts.len() != 3 || parts.iter().any(|part| part.is_empty()) {
return Err(AnchorParseError::MalformedBody {
reason: "expected exactly: <timestamp> <event_count> <chain_head_hash>".to_string(),
});
}
let timestamp = DateTime::parse_from_rfc3339(parts[0])
.map_err(|source| AnchorParseError::InvalidTimestamp {
value: parts[0].to_string(),
message: source.to_string(),
})?
.with_timezone(&Utc);
let event_count =
parts[1]
.parse::<u64>()
.map_err(|source| AnchorParseError::InvalidEventCount {
value: parts[1].to_string(),
message: source.to_string(),
})?;
let chain_head_hash = parts[2].to_string();
validate_chain_head_hash(&chain_head_hash)?;
Ok(LedgerAnchor {
timestamp,
event_count,
chain_head_hash,
})
}
pub fn parse_anchor_history(input: &str) -> Result<Vec<LedgerAnchor>, AnchorParseError> {
let mut lines = input.lines();
let mut anchors = Vec::new();
loop {
let Some(header) = lines.next() else {
break;
};
let Some(body) = lines.next() else {
return Err(AnchorParseError::MissingBody);
};
anchors.push(parse_anchor(&format!("{header}\n{body}\n"))?);
}
if anchors.is_empty() {
return Err(AnchorParseError::MissingHeader);
}
Ok(anchors)
}
pub fn verify_anchor(
path: impl AsRef<Path>,
anchor: &LedgerAnchor,
) -> Result<AnchorVerification, AnchorVerifyError> {
let path = path.as_ref().to_path_buf();
let file = File::open(&path).map_err(|source| JsonlError::Io {
path: path.clone(),
source,
})?;
let mut prev_event_hash: Option<String> = None;
let mut db_count = 0u64;
let mut hash_at_anchor_position: Option<String> = None;
for (i, line_result) in BufReader::new(file).lines().enumerate() {
let line = i + 1;
let line_text = line_result.map_err(|source| JsonlError::Io {
path: path.clone(),
source,
})?;
let trimmed = line_text.trim();
if trimmed.is_empty() {
continue;
}
let row: SignedRow =
serde_json::from_str(trimmed).map_err(|source| JsonlError::Decode {
path: path.clone(),
line,
source,
})?;
let event = row.event;
db_count += 1;
let expected_payload_hash = payload_hash(&event.payload);
if event.payload_hash != expected_payload_hash {
return Err(AnchorVerifyError::ChainBroken {
path,
line,
reason: format!(
"payload_hash mismatch: observed {}, expected {expected_payload_hash}",
event.payload_hash
),
});
}
let expected_event_hash = event_hash(event.prev_event_hash.as_deref(), &event.payload_hash);
if event.event_hash != expected_event_hash {
return Err(AnchorVerifyError::ChainBroken {
path,
line,
reason: format!(
"event_hash mismatch: observed {}, expected {expected_event_hash}",
event.event_hash
),
});
}
if event.prev_event_hash != prev_event_hash {
return Err(AnchorVerifyError::ChainBroken {
path,
line,
reason: format!(
"prev_event_hash mismatch: observed {:?}, expected {:?}",
event.prev_event_hash, prev_event_hash
),
});
}
if db_count == anchor.event_count {
hash_at_anchor_position = Some(event.event_hash.clone());
}
prev_event_hash = Some(event.event_hash);
}
if db_count < anchor.event_count {
return Err(AnchorVerifyError::Truncated {
path,
db_count,
anchor_event_count: anchor.event_count,
});
}
let observed = hash_at_anchor_position.ok_or_else(|| AnchorVerifyError::MissingPosition {
path: path.clone(),
anchor_event_count: anchor.event_count,
})?;
if observed != anchor.chain_head_hash {
return Err(AnchorVerifyError::PositionHashMismatch {
path,
event_count: anchor.event_count,
observed,
expected: anchor.chain_head_hash.clone(),
});
}
Ok(AnchorVerification {
path,
db_count,
db_head_hash: prev_event_hash,
anchor: anchor.clone(),
})
}
pub fn current_anchor(
path: impl AsRef<Path>,
timestamp: DateTime<Utc>,
) -> Result<LedgerAnchor, AnchorVerifyError> {
let path = path.as_ref().to_path_buf();
let file = File::open(&path).map_err(|source| JsonlError::Io {
path: path.clone(),
source,
})?;
let mut prev_event_hash: Option<String> = None;
let mut db_count = 0u64;
for (i, line_result) in BufReader::new(file).lines().enumerate() {
let line = i + 1;
let line_text = line_result.map_err(|source| JsonlError::Io {
path: path.clone(),
source,
})?;
let trimmed = line_text.trim();
if trimmed.is_empty() {
continue;
}
let row: SignedRow =
serde_json::from_str(trimmed).map_err(|source| JsonlError::Decode {
path: path.clone(),
line,
source,
})?;
let event = row.event;
db_count += 1;
let expected_payload_hash = payload_hash(&event.payload);
if event.payload_hash != expected_payload_hash {
return Err(AnchorVerifyError::ChainBroken {
path,
line,
reason: format!(
"payload_hash mismatch: observed {}, expected {expected_payload_hash}",
event.payload_hash
),
});
}
let expected_event_hash = event_hash(event.prev_event_hash.as_deref(), &event.payload_hash);
if event.event_hash != expected_event_hash {
return Err(AnchorVerifyError::ChainBroken {
path,
line,
reason: format!(
"event_hash mismatch: observed {}, expected {expected_event_hash}",
event.event_hash
),
});
}
if event.prev_event_hash != prev_event_hash {
return Err(AnchorVerifyError::ChainBroken {
path,
line,
reason: format!(
"prev_event_hash mismatch: observed {:?}, expected {:?}",
event.prev_event_hash, prev_event_hash
),
});
}
prev_event_hash = Some(event.event_hash);
}
let Some(chain_head_hash) = prev_event_hash else {
return Err(AnchorVerifyError::EmptyLedger { path });
};
LedgerAnchor::new(timestamp, db_count, chain_head_hash)
.map_err(|source| AnchorVerifyError::InternalAnchorBuild { path, source })
}
pub fn verify_anchor_history(
ledger_path: impl AsRef<Path>,
history_path: impl AsRef<Path>,
) -> Result<AnchorHistoryVerification, AnchorHistoryVerifyError> {
let history_path = history_path.as_ref().to_path_buf();
let mut text = String::new();
File::open(&history_path)
.map_err(|source| AnchorHistoryVerifyError::ReadHistory {
path: history_path.clone(),
source,
})?
.read_to_string(&mut text)
.map_err(|source| AnchorHistoryVerifyError::ReadHistory {
path: history_path.clone(),
source,
})?;
let anchors =
parse_anchor_history(&text).map_err(|source| AnchorHistoryVerifyError::Parse {
path: history_path.clone(),
source,
})?;
let mut previous_event_count = None;
for (index, anchor) in anchors.iter().enumerate() {
if let Some(previous_event_count) = previous_event_count {
if anchor.event_count < previous_event_count {
return Err(AnchorHistoryVerifyError::NonMonotonic {
path: history_path,
anchor_index: index + 1,
previous_event_count,
event_count: anchor.event_count,
});
}
}
previous_event_count = Some(anchor.event_count);
}
let mut latest_verification = None;
for (index, anchor) in anchors.iter().enumerate() {
let verification = verify_anchor(&ledger_path, anchor).map_err(|source| {
AnchorHistoryVerifyError::Anchor {
path: history_path.clone(),
anchor_index: index + 1,
source: Box::new(source),
}
})?;
latest_verification = Some(verification);
}
let latest_verification = latest_verification.expect("non-empty anchor history was parsed");
Ok(AnchorHistoryVerification {
path: latest_verification.path,
history_path,
db_count: latest_verification.db_count,
db_head_hash: latest_verification.db_head_hash,
anchors_verified: anchors.len(),
latest_anchor: latest_verification.anchor,
})
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AnchorVerification {
pub path: PathBuf,
pub db_count: u64,
pub db_head_hash: Option<String>,
pub anchor: LedgerAnchor,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AnchorHistoryVerification {
pub path: PathBuf,
pub history_path: PathBuf,
pub db_count: u64,
pub db_head_hash: Option<String>,
pub anchors_verified: usize,
pub latest_anchor: LedgerAnchor,
}
#[derive(Debug, Clone, PartialEq, Eq, Error)]
pub enum AnchorParseError {
#[error("missing ledger anchor format header")]
MissingHeader,
#[error("unknown ledger anchor format header: {observed}")]
UnknownFormatHeader {
observed: String,
},
#[error("missing ledger anchor body")]
MissingBody,
#[error("malformed ledger anchor body: {reason}")]
MalformedBody {
reason: String,
},
#[error("ledger anchor has trailing content")]
TrailingContent,
#[error("invalid ledger anchor timestamp {value}: {message}")]
InvalidTimestamp {
value: String,
message: String,
},
#[error("invalid ledger anchor event_count {value}: {message}")]
InvalidEventCount {
value: String,
message: String,
},
#[error("invalid ledger anchor chain_head_hash: {value}")]
InvalidChainHeadHash {
value: String,
},
}
#[derive(Debug, Error)]
pub enum AnchorHistoryVerifyError {
#[error("failed to read anchor history {path:?}: {source}")]
ReadHistory {
path: PathBuf,
source: std::io::Error,
},
#[error("invalid anchor history {path:?}: {source}")]
Parse {
path: PathBuf,
source: AnchorParseError,
},
#[error(
"anchor history is non-monotonic at record {anchor_index}: event_count {event_count} follows {previous_event_count}"
)]
NonMonotonic {
path: PathBuf,
anchor_index: usize,
previous_event_count: u64,
event_count: u64,
},
#[error("anchor history record {anchor_index} failed verification in {path:?}: {source}")]
Anchor {
path: PathBuf,
anchor_index: usize,
source: Box<AnchorVerifyError>,
},
}
#[derive(Debug, Error)]
pub enum AnchorVerifyError {
#[error(transparent)]
Jsonl(#[from] JsonlError),
#[error("cannot anchor empty ledger {path:?}")]
EmptyLedger {
path: PathBuf,
},
#[error("failed to build current ledger anchor for {path:?}: {source}")]
InternalAnchorBuild {
path: PathBuf,
source: AnchorParseError,
},
#[error("ledger chain broken at line {line} in {path:?}: {reason}")]
ChainBroken {
path: PathBuf,
line: usize,
reason: String,
},
#[error(
"ledger is shorter than anchor: db_count {db_count}, anchor_event_count {anchor_event_count}"
)]
Truncated {
path: PathBuf,
db_count: u64,
anchor_event_count: u64,
},
#[error("ledger anchor position {anchor_event_count} has no event hash")]
MissingPosition {
path: PathBuf,
anchor_event_count: u64,
},
#[error(
"anchor hash mismatch at event_count {event_count}: observed {observed}, expected {expected}"
)]
PositionHashMismatch {
path: PathBuf,
event_count: u64,
observed: String,
expected: String,
},
}
fn validate_chain_head_hash(value: &str) -> Result<(), AnchorParseError> {
if value.len() != HEX_HASH_LEN
|| !value
.bytes()
.all(|b| b.is_ascii_digit() || (b'a'..=b'f').contains(&b))
{
return Err(AnchorParseError::InvalidChainHeadHash {
value: value.to_string(),
});
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::TimeZone;
use cortex_core::{Event, EventId, EventSource, EventType, SCHEMA_VERSION};
use tempfile::tempdir;
use crate::{JsonlLog, SignedRow};
fn fixture_event(seq: u64) -> Event {
Event {
id: EventId::new(),
schema_version: SCHEMA_VERSION,
observed_at: Utc.with_ymd_and_hms(2026, 5, 5, 12, 0, 0).unwrap(),
recorded_at: Utc.with_ymd_and_hms(2026, 5, 5, 12, 0, 1).unwrap(),
source: EventSource::User,
event_type: EventType::UserMessage,
trace_id: None,
session_id: Some("s-anchor".into()),
domain_tags: vec![],
payload: serde_json::json!({"seq": seq}),
payload_hash: String::new(),
prev_event_hash: None,
event_hash: String::new(),
}
}
fn write_fixture_log(count: u64) -> (tempfile::TempDir, std::path::PathBuf, Vec<String>) {
let dir = tempdir().unwrap();
let path = dir.path().join("anchor.jsonl");
let mut log = JsonlLog::open(&path).unwrap();
let policy = crate::jsonl::append_policy_decision_test_allow();
let mut heads = Vec::new();
for seq in 0..count {
heads.push(log.append(fixture_event(seq), &policy).unwrap());
}
(dir, path, heads)
}
fn rewrite_rows(path: &std::path::Path, rows: &[SignedRow]) {
let text = rows
.iter()
.map(|row| serde_json::to_string(row).unwrap())
.collect::<Vec<_>>()
.join("\n");
std::fs::write(path, format!("{text}\n")).unwrap();
}
fn write_history(path: &std::path::Path, anchors: &[LedgerAnchor]) {
let mut text = String::new();
for anchor in anchors {
text.push_str(&anchor.to_anchor_text());
}
std::fs::write(path, text).unwrap();
}
fn read_rows(path: &std::path::Path) -> Vec<SignedRow> {
std::fs::read_to_string(path)
.unwrap()
.lines()
.map(|line| serde_json::from_str(line).unwrap())
.collect()
}
#[test]
fn anchor_format_round_trips() {
let anchor = LedgerAnchor::new(
Utc.with_ymd_and_hms(2026, 5, 5, 12, 30, 0).unwrap(),
7,
"a".repeat(HEX_HASH_LEN),
)
.unwrap();
let text = anchor.to_anchor_text();
assert_eq!(
text,
format!(
"{ANCHOR_FORMAT_HEADER_V1}\n2026-05-05T12:30:00Z 7 {}\n",
"a".repeat(HEX_HASH_LEN)
)
);
assert_eq!(parse_anchor(&text).unwrap(), anchor);
}
#[test]
fn anchor_unknown_format_header_fails_closed() {
let text = format!(
"# cortex-ledger-anchor-format: 2\n2026-05-05T12:30:00Z 7 {}\n",
"a".repeat(HEX_HASH_LEN)
);
let err = parse_anchor(&text).unwrap_err();
assert!(matches!(err, AnchorParseError::UnknownFormatHeader { .. }));
}
#[test]
fn anchor_trailing_content_fails_closed() {
let text = format!(
"{ANCHOR_FORMAT_HEADER_V1}\n2026-05-05T12:30:00Z 7 {}\nextra\n",
"a".repeat(HEX_HASH_LEN)
);
let err = parse_anchor(&text).unwrap_err();
assert_eq!(err, AnchorParseError::TrailingContent);
}
#[test]
fn anchor_history_format_round_trips() {
let anchors = vec![
LedgerAnchor::new(
Utc.with_ymd_and_hms(2026, 5, 5, 12, 30, 0).unwrap(),
1,
"a".repeat(HEX_HASH_LEN),
)
.unwrap(),
LedgerAnchor::new(
Utc.with_ymd_and_hms(2026, 5, 5, 12, 31, 0).unwrap(),
2,
"b".repeat(HEX_HASH_LEN),
)
.unwrap(),
];
let text = anchors
.iter()
.map(LedgerAnchor::to_anchor_text)
.collect::<String>();
assert_eq!(parse_anchor_history(&text).unwrap(), anchors);
}
#[test]
fn anchor_history_unknown_format_header_fails_closed() {
let text = format!(
"# cortex-ledger-anchor-format: 2\n2026-05-05T12:30:00Z 7 {}\n",
"a".repeat(HEX_HASH_LEN)
);
let err = parse_anchor_history(&text).unwrap_err();
assert!(matches!(err, AnchorParseError::UnknownFormatHeader { .. }));
}
#[test]
fn anchor_history_truncated_record_fails_closed() {
let err = parse_anchor_history(ANCHOR_FORMAT_HEADER_V1).unwrap_err();
assert_eq!(err, AnchorParseError::MissingBody);
}
#[test]
fn anchor_correct_head_passes() {
let (_dir, path, heads) = write_fixture_log(3);
let anchor = LedgerAnchor::new(
Utc.with_ymd_and_hms(2026, 5, 5, 12, 30, 0).unwrap(),
3,
heads[2].clone(),
)
.unwrap();
let verified = verify_anchor(&path, &anchor).unwrap();
assert_eq!(verified.db_count, 3);
assert_eq!(verified.db_head_hash.as_deref(), Some(heads[2].as_str()));
}
#[test]
fn current_anchor_scans_clean_chain_head() {
let (_dir, path, heads) = write_fixture_log(3);
let timestamp = Utc.with_ymd_and_hms(2026, 5, 5, 12, 45, 0).unwrap();
let anchor = current_anchor(&path, timestamp).unwrap();
assert_eq!(anchor.timestamp, timestamp);
assert_eq!(anchor.event_count, 3);
assert_eq!(anchor.chain_head_hash, heads[2]);
verify_anchor(&path, &anchor).unwrap();
}
#[test]
fn current_anchor_rejects_empty_ledger() {
let dir = tempdir().unwrap();
let path = dir.path().join("empty.jsonl");
std::fs::write(&path, "").unwrap();
let err = current_anchor(&path, Utc.with_ymd_and_hms(2026, 5, 5, 12, 45, 0).unwrap())
.unwrap_err();
assert!(matches!(err, AnchorVerifyError::EmptyLedger { .. }));
}
#[test]
fn anchor_detects_tail_truncation() {
let (_dir, path, heads) = write_fixture_log(3);
let anchor = LedgerAnchor::new(
Utc.with_ymd_and_hms(2026, 5, 5, 12, 30, 0).unwrap(),
3,
heads[2].clone(),
)
.unwrap();
let rows = read_rows(&path);
rewrite_rows(&path, &rows[..2]);
let err = verify_anchor(&path, &anchor).unwrap_err();
assert!(matches!(
err,
AnchorVerifyError::Truncated {
db_count: 2,
anchor_event_count: 3,
..
}
));
}
#[test]
fn anchor_wrong_event_count_fails() {
let (_dir, path, heads) = write_fixture_log(3);
let anchor = LedgerAnchor::new(
Utc.with_ymd_and_hms(2026, 5, 5, 12, 30, 0).unwrap(),
2,
heads[2].clone(),
)
.unwrap();
let err = verify_anchor(&path, &anchor).unwrap_err();
assert!(matches!(
err,
AnchorVerifyError::PositionHashMismatch { event_count: 2, .. }
));
}
#[test]
fn anchor_tampered_line_fails() {
let (_dir, path, heads) = write_fixture_log(3);
let anchor = LedgerAnchor::new(
Utc.with_ymd_and_hms(2026, 5, 5, 12, 30, 0).unwrap(),
3,
heads[2].clone(),
)
.unwrap();
let mut rows = read_rows(&path);
rows[1].event.payload = serde_json::json!({"seq": 99});
rewrite_rows(&path, &rows);
let err = verify_anchor(&path, &anchor).unwrap_err();
assert!(matches!(
err,
AnchorVerifyError::ChainBroken { line: 2, .. }
));
}
#[test]
fn anchor_history_correct_passes() {
let (dir, ledger_path, heads) = write_fixture_log(3);
let history_path = dir.path().join("ANCHOR_HISTORY");
let anchors = vec![
LedgerAnchor::new(
Utc.with_ymd_and_hms(2026, 5, 5, 12, 30, 0).unwrap(),
1,
heads[0].clone(),
)
.unwrap(),
LedgerAnchor::new(
Utc.with_ymd_and_hms(2026, 5, 5, 12, 31, 0).unwrap(),
3,
heads[2].clone(),
)
.unwrap(),
];
write_history(&history_path, &anchors);
let verified = verify_anchor_history(&ledger_path, &history_path).unwrap();
assert_eq!(verified.anchors_verified, 2);
assert_eq!(verified.latest_anchor.event_count, 3);
assert_eq!(verified.db_count, 3);
}
#[test]
fn anchor_history_non_monotonic_event_count_fails_closed() {
let (dir, ledger_path, heads) = write_fixture_log(3);
let history_path = dir.path().join("ANCHOR_HISTORY");
let anchors = vec![
LedgerAnchor::new(
Utc.with_ymd_and_hms(2026, 5, 5, 12, 30, 0).unwrap(),
3,
heads[2].clone(),
)
.unwrap(),
LedgerAnchor::new(
Utc.with_ymd_and_hms(2026, 5, 5, 12, 31, 0).unwrap(),
2,
heads[1].clone(),
)
.unwrap(),
];
write_history(&history_path, &anchors);
let err = verify_anchor_history(&ledger_path, &history_path).unwrap_err();
assert!(matches!(
err,
AnchorHistoryVerifyError::NonMonotonic {
anchor_index: 2,
previous_event_count: 3,
event_count: 2,
..
}
));
}
#[test]
fn anchor_history_detects_tail_truncation() {
let (dir, ledger_path, heads) = write_fixture_log(3);
let history_path = dir.path().join("ANCHOR_HISTORY");
let anchor = LedgerAnchor::new(
Utc.with_ymd_and_hms(2026, 5, 5, 12, 30, 0).unwrap(),
3,
heads[2].clone(),
)
.unwrap();
write_history(&history_path, &[anchor]);
let rows = read_rows(&ledger_path);
rewrite_rows(&ledger_path, &rows[..2]);
let err = verify_anchor_history(&ledger_path, &history_path).unwrap_err();
match err {
AnchorHistoryVerifyError::Anchor { source, .. } => assert!(matches!(
source.as_ref(),
AnchorVerifyError::Truncated {
db_count: 2,
anchor_event_count: 3,
..
}
)),
other => panic!("expected truncated anchor history verification error, got {other:?}"),
}
}
}