use crate::store::Store;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
pub const CHAIN_WALK_REPORT_SCHEMA_VERSION: u16 = 1;
pub type ChainWalkHash = [u8; 32];
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ChainWalkMode {
Linear,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ChainWalkStartRef {
EventId(u128),
Receipt {
event_id: u128,
content_hash: ChainWalkHash,
},
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ChainWalkRequest {
pub start: ChainWalkStartRef,
pub end_event_id: Option<u128>,
pub limit: usize,
pub mode: ChainWalkMode,
}
impl ChainWalkRequest {
#[must_use]
pub fn linear(start: ChainWalkStartRef, limit: usize) -> Self {
Self {
start,
end_event_id: None,
limit,
mode: ChainWalkMode::Linear,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum ChainWalkFinding {
InvalidLimit {
limit: usize,
},
MissingStart {
event_id: u128,
},
MissingChainEntry {
event_id: u128,
},
StartHashMismatch {
event_id: u128,
expected: ChainWalkHash,
observed: ChainWalkHash,
},
EntryHashMismatch {
event_id: u128,
expected: ChainWalkHash,
observed: ChainWalkHash,
},
MissingParentLink {
child_event_id: u128,
expected_parent_hash: ChainWalkHash,
},
ParentHashAmbiguous {
child_event_id: u128,
expected_parent_hash: ChainWalkHash,
selected_parent_event_id: u128,
matching_parent_count: u64,
},
OrderingRegression {
child_event_id: u128,
parent_event_id: u128,
child_sequence: u64,
parent_sequence: u64,
},
CycleDetected {
event_id: u128,
},
StoppedEarlyReadFailure {
event_id: u128,
reason: String,
},
TruncatedByLimit {
limit: usize,
next_parent_hash: ChainWalkHash,
},
EndNotReached {
expected_end_event_id: u128,
},
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ChainWalkReportBody {
pub schema_version: u16,
pub mode: ChainWalkMode,
pub checked_count: u64,
pub first_ref: Option<u128>,
pub last_ref: Option<u128>,
pub walk_digest: ChainWalkHash,
pub findings: Vec<ChainWalkFinding>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ChainWalkEvidenceReport {
pub body: ChainWalkReportBody,
pub body_hash: ChainWalkHash,
pub generated_at_unix_ms: Option<u64>,
pub batpak_version: Option<String>,
pub diagnostics: Vec<String>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum ChainWalkReportError {
BodyEncoding {
message: String,
},
}
impl std::fmt::Display for ChainWalkReportError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::BodyEncoding { message } => {
write!(f, "chain walk report body encoding failed: {message}")
}
}
}
}
impl std::error::Error for ChainWalkReportError {}
impl<State> Store<State> {
pub fn chain_walk_evidence(
&self,
request: &ChainWalkRequest,
) -> Result<ChainWalkEvidenceReport, ChainWalkReportError> {
let (start_event_id, expected_start_hash) = match request.start {
ChainWalkStartRef::EventId(event_id) => (event_id, None),
ChainWalkStartRef::Receipt {
event_id,
content_hash,
} => (event_id, Some(content_hash)),
};
if request.limit == 0 {
return build_report(
request.mode,
&[],
vec![ChainWalkFinding::InvalidLimit { limit: 0 }],
);
}
let start_entry = match self.index.get_by_id(start_event_id) {
Some(entry) => entry,
None => {
return build_report(
request.mode,
&[],
vec![ChainWalkFinding::MissingStart {
event_id: start_event_id,
}],
);
}
};
let entity_stream = self.index.stream(start_entry.coord.entity());
let mut findings = Vec::new();
let mut checked = Vec::new();
let mut visited = HashSet::new();
let mut cursor = Some(start_event_id);
let mut reached_end = false;
let mut pending_parent_hash_for_limit: Option<ChainWalkHash> = None;
while checked.len() < request.limit {
let Some(current_id) = cursor.take() else {
break;
};
if !visited.insert(current_id) {
findings.push(ChainWalkFinding::CycleDetected {
event_id: current_id,
});
break;
}
let Some(entry) = self.index.get_by_id(current_id) else {
findings.push(ChainWalkFinding::MissingChainEntry {
event_id: current_id,
});
break;
};
let stored = match self.reader.read_entry_raw(&entry.disk_pos) {
Ok(stored) => stored,
Err(error) => {
findings.push(ChainWalkFinding::StoppedEarlyReadFailure {
event_id: current_id,
reason: error.to_string(),
});
break;
}
};
if checked.is_empty() {
if let Some(expected_hash) = expected_start_hash {
if expected_hash != entry.hash_chain.event_hash {
findings.push(ChainWalkFinding::StartHashMismatch {
event_id: current_id,
expected: expected_hash,
observed: entry.hash_chain.event_hash,
});
}
}
}
let computed_hash = observed_content_hash(&stored);
if computed_hash != entry.hash_chain.event_hash {
findings.push(ChainWalkFinding::EntryHashMismatch {
event_id: current_id,
expected: entry.hash_chain.event_hash,
observed: computed_hash,
});
checked.push((current_id, entry.hash_chain.event_hash));
break;
}
checked.push((current_id, entry.hash_chain.event_hash));
if request.end_event_id == Some(current_id) {
reached_end = true;
break;
}
if entry.hash_chain.prev_hash == [0_u8; 32] {
break;
}
pending_parent_hash_for_limit = Some(entry.hash_chain.prev_hash);
let Some(parent_id) = resolve_parent_event_id_by_hash(
&entity_stream,
entry.hash_chain.prev_hash,
entry.global_sequence,
) else {
findings.push(ChainWalkFinding::MissingParentLink {
child_event_id: current_id,
expected_parent_hash: entry.hash_chain.prev_hash,
});
break;
};
if parent_id.matching_parent_count > 1 {
findings.push(ChainWalkFinding::ParentHashAmbiguous {
child_event_id: current_id,
expected_parent_hash: entry.hash_chain.prev_hash,
selected_parent_event_id: parent_id.event_id,
matching_parent_count: parent_id.matching_parent_count,
});
}
if let Some(parent_entry) = self.index.get_by_id(parent_id.event_id) {
if parent_entry.global_sequence >= entry.global_sequence {
findings.push(ChainWalkFinding::OrderingRegression {
child_event_id: current_id,
parent_event_id: parent_id.event_id,
child_sequence: entry.global_sequence,
parent_sequence: parent_entry.global_sequence,
});
break;
}
}
cursor = Some(parent_id.event_id);
}
if findings.is_empty() && checked.len() == request.limit {
if let Some(next_parent_hash) = pending_parent_hash_for_limit {
findings.push(ChainWalkFinding::TruncatedByLimit {
limit: request.limit,
next_parent_hash,
});
}
}
if findings.is_empty() && !reached_end {
if let Some(expected_end_event_id) = request.end_event_id {
if checked.last().map(|(event_id, _)| *event_id) != Some(expected_end_event_id) {
findings.push(ChainWalkFinding::EndNotReached {
expected_end_event_id,
});
}
}
}
build_report(request.mode, &checked, findings)
}
}
struct ParentHashResolution {
event_id: u128,
matching_parent_count: u64,
}
fn resolve_parent_event_id_by_hash(
entity_stream: &[crate::store::IndexEntry],
parent_hash: ChainWalkHash,
child_sequence: u64,
) -> Option<ParentHashResolution> {
let mut matches: Vec<&crate::store::IndexEntry> = entity_stream
.iter()
.filter(|candidate| {
candidate.hash_chain.event_hash == parent_hash
&& candidate.global_sequence < child_sequence
})
.collect();
matches.sort_by_key(|candidate| candidate.global_sequence);
let selected = matches.last()?;
Some(ParentHashResolution {
event_id: selected.event_id,
matching_parent_count: matches.len() as u64,
})
}
fn build_report(
mode: ChainWalkMode,
checked: &[(u128, ChainWalkHash)],
mut findings: Vec<ChainWalkFinding>,
) -> Result<ChainWalkEvidenceReport, ChainWalkReportError> {
crate::evidence::sort_findings(&mut findings);
let checked_count = checked.len() as u64;
let first_ref = checked.first().map(|(event_id, _)| *event_id);
let last_ref = checked.last().map(|(event_id, _)| *event_id);
let walk_digest = walk_digest(checked)?;
let body = ChainWalkReportBody {
schema_version: CHAIN_WALK_REPORT_SCHEMA_VERSION,
mode,
checked_count,
first_ref,
last_ref,
walk_digest,
findings,
};
let body_hash = report_body_hash(&body)?;
Ok(ChainWalkEvidenceReport {
body,
body_hash,
generated_at_unix_ms: None,
batpak_version: None,
diagnostics: Vec::new(),
})
}
fn walk_digest(checked: &[(u128, ChainWalkHash)]) -> Result<ChainWalkHash, ChainWalkReportError> {
let bytes = crate::canonical::to_bytes(checked).map_err(|error| {
ChainWalkReportError::BodyEncoding {
message: error.to_string(),
}
})?;
Ok(crate::evidence::content_hash(&bytes))
}
fn report_body_hash(body: &ChainWalkReportBody) -> Result<ChainWalkHash, ChainWalkReportError> {
crate::evidence::report_body_hash(body, |message| ChainWalkReportError::BodyEncoding {
message,
})
}
fn observed_content_hash(stored: &crate::event::StoredEvent<Vec<u8>>) -> ChainWalkHash {
#[cfg(feature = "blake3")]
{
crate::event::hash::compute_hash(&stored.event.payload)
}
#[cfg(not(feature = "blake3"))]
{
let _ = stored;
[0_u8; 32]
}
}
#[cfg(test)]
mod tests {
use super::{build_report, ChainWalkFinding, ChainWalkMode};
use std::error::Error;
#[test]
fn chain_walk_report_sorts_findings_structurally() -> Result<(), Box<dyn Error>> {
let report = build_report(
ChainWalkMode::Linear,
&[],
vec![
ChainWalkFinding::MissingStart { event_id: 9 },
ChainWalkFinding::MissingStart { event_id: 3 },
],
)?;
assert_eq!(
report.body.findings,
vec![
ChainWalkFinding::MissingStart { event_id: 3 },
ChainWalkFinding::MissingStart { event_id: 9 },
],
"PROPERTY: chain walk findings must be sorted in deterministic structural order"
);
Ok(())
}
}