use crate::event::StoredEvent;
use crate::id::{EntityIdType, EventId};
use crate::store::index::IndexEntry;
use crate::store::Store;
use std::collections::HashSet;
#[cfg(feature = "payload-encryption")]
use crate::event::Event;
mod by_hash;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum AncestryBoundary {
ReachedGenesis,
LimitReached,
MissingParent {
child: EventId,
},
ReadFailure {
event_id: EventId,
},
Cycle {
event_id: EventId,
},
NoAnchor,
}
#[derive(Clone, Debug)]
pub struct AncestorWalk {
pub ancestors: Vec<StoredEvent<serde_json::Value>>,
pub boundary: AncestryBoundary,
#[cfg(feature = "payload-encryption")]
#[cfg_attr(
all(docsrs, not(batpak_stable_docs)),
doc(cfg(feature = "payload-encryption"))
)]
pub shredded: Vec<EventId>,
}
impl AncestorWalk {
#[must_use]
pub fn reached_genesis(&self) -> bool {
matches!(self.boundary, AncestryBoundary::ReachedGenesis)
}
#[must_use]
pub fn truncated_at(&self) -> Option<EventId> {
match self.boundary {
AncestryBoundary::MissingParent { child } => Some(child),
AncestryBoundary::ReachedGenesis
| AncestryBoundary::LimitReached
| AncestryBoundary::ReadFailure { .. }
| AncestryBoundary::Cycle { .. }
| AncestryBoundary::NoAnchor => None,
}
}
#[cfg(feature = "payload-encryption")]
#[cfg_attr(
all(docsrs, not(batpak_stable_docs)),
doc(cfg(feature = "payload-encryption"))
)]
#[must_use]
pub fn is_shredded(&self, event_id: EventId) -> bool {
self.shredded.contains(&event_id)
}
#[cfg(feature = "payload-encryption")]
#[cfg_attr(
all(docsrs, not(batpak_stable_docs)),
doc(cfg(feature = "payload-encryption"))
)]
#[must_use]
pub fn shredded_ancestors(&self) -> &[EventId] {
&self.shredded
}
}
pub(super) enum NextLink<Cursor> {
Continue(Cursor),
Genesis,
MissingParent,
}
pub(super) type StepOutcome<Cursor> =
Result<(StoredEvent<serde_json::Value>, NextLink<Cursor>), EventId>;
pub(super) fn collect_ancestors<State: crate::store::StoreState, Cursor, Step>(
store: &Store<State>,
mut cursor: Option<Cursor>,
limit: usize,
mut step: Step,
) -> (Vec<StoredEvent<serde_json::Value>>, AncestryBoundary)
where
Step: FnMut(&Store<State>, Cursor) -> StepOutcome<Cursor>,
{
let mut results = Vec::new();
let mut visited: HashSet<u128> = HashSet::new();
let boundary = loop {
if results.len() >= limit {
break AncestryBoundary::LimitReached;
}
let Some(current) = cursor.take() else {
break AncestryBoundary::NoAnchor;
};
let (stored, next) = match step(store, current) {
Ok(recorded) => recorded,
Err(event_id) => break AncestryBoundary::ReadFailure { event_id },
};
let id = stored.event.event_id();
if !visited.insert(id.as_u128()) {
tracing::error!(
cycle_at = %format!("{:#034x}", id.as_u128()),
"ancestry walk hit a cycle — store corruption; returning prefix"
);
break AncestryBoundary::Cycle { event_id: id };
}
results.push(stored);
match next {
NextLink::Continue(parent) => cursor = Some(parent),
NextLink::Genesis => break AncestryBoundary::ReachedGenesis,
NextLink::MissingParent => break AncestryBoundary::MissingParent { child: id },
}
};
(results, boundary)
}
pub(super) fn read_entry_and_event<State: crate::store::StoreState>(
store: &Store<State>,
event_id: u128,
) -> Option<(IndexEntry, StoredEvent<serde_json::Value>)> {
let entry = store.index.get_by_id(event_id)?;
let stored = match store.reader.read_entry(&entry.disk_pos) {
Ok(stored) => stored,
Err(error) => {
tracing::error!(
event_id = %format!("{event_id:#034x}"),
%error,
"ancestry walk failed to read an index-proven event — store corruption; returning truncated prefix"
);
return None;
}
};
Some((entry, stored))
}
pub(super) fn resolve_next_link(
entry: &IndexEntry,
entity_stream: &[IndexEntry],
) -> NextLink<u128> {
let prev = entry.hash_chain.prev_hash;
if prev == [0_u8; 32] {
NextLink::Genesis
} else {
match parent_event_id_by_hash(entity_stream, prev) {
Some(parent_id) => NextLink::Continue(parent_id),
None => NextLink::MissingParent,
}
}
}
#[cfg(feature = "payload-encryption")]
pub(super) fn step_ancestor_key_aware<State: crate::store::StoreState>(
store: &Store<State>,
current_id: u128,
entity_stream: &[IndexEntry],
shredded: &mut Vec<EventId>,
) -> StepOutcome<u128> {
use crate::store::read_api::PayloadPlaintext;
let Some(entry) = store.index.get_by_id(current_id) else {
return Err(EventId::from(current_id));
};
let raw = match store.reader.read_entry_raw(&entry.disk_pos) {
Ok(raw) => raw,
Err(error) => {
tracing::error!(
event_id = %format!("{current_id:#034x}"),
%error,
"ancestry walk failed to read an index-proven event — store corruption; returning truncated prefix"
);
return Err(EventId::from(current_id));
}
};
let next = resolve_next_link(&entry, entity_stream);
let StoredEvent { coordinate, event } = raw;
let Event {
header,
payload: payload_bytes,
hash_chain,
} = event;
let Some(meta) = header.payload_encryption.clone() else {
return finish_value(coordinate, header, hash_chain, &payload_bytes, next);
};
let event_id = header.event_id;
match store.open_encrypted_payload_bytes(
&coordinate,
header.event_kind,
event_id,
&meta,
&payload_bytes,
) {
Ok(PayloadPlaintext::Plaintext(plaintext)) => {
finish_value(coordinate, header, hash_chain, &plaintext, next)
}
Ok(PayloadPlaintext::Shredded) => {
shredded.push(event_id);
tracing::debug!(
target: "batpak::ancestry",
event_id = %format!("{:#034x}", event_id.as_u128()),
"ancestry walk reached a crypto-shredded ancestor; including it (Null placeholder) \
and continuing to its parent — the chain links are intact, only the payload is gone"
);
let stored = StoredEvent {
coordinate,
event: Event {
header,
payload: serde_json::Value::Null,
hash_chain,
},
};
Ok((stored, next))
}
Err(error) => {
tracing::error!(
event_id = %format!("{:#034x}", event_id.as_u128()),
%error,
"ancestry walk failed to decrypt an index-proven encrypted event; returning truncated prefix"
);
Err(event_id)
}
}
}
#[cfg(feature = "payload-encryption")]
fn finish_value(
coordinate: crate::coordinate::Coordinate,
header: crate::event::EventHeader,
hash_chain: Option<crate::event::HashChain>,
bytes: &[u8],
next: NextLink<u128>,
) -> StepOutcome<u128> {
let event_id = header.event_id;
match crate::encoding::from_bytes::<serde_json::Value>(bytes) {
Ok(value) => Ok((
StoredEvent {
coordinate,
event: Event {
header,
payload: value,
hash_chain,
},
},
next,
)),
Err(error) => {
tracing::error!(
event_id = %format!("{:#034x}", event_id.as_u128()),
%error,
"ancestry walk failed to decode a plaintext payload; returning truncated prefix"
);
Err(event_id)
}
}
}
pub(crate) fn parent_event_id_by_hash(
entity_stream: &[IndexEntry],
parent_hash: [u8; 32],
) -> Option<u128> {
entity_stream
.iter()
.find(|candidate| candidate.hash_chain.event_hash == parent_hash)
.map(|candidate| candidate.event_id)
}
#[cfg(feature = "payload-encryption")]
pub(crate) fn walk_ancestors_outcome<State: crate::store::StoreState>(
store: &Store<State>,
event_id: u128,
limit: usize,
) -> AncestorWalk {
let mut shredded: Vec<EventId> = Vec::new();
let (ancestors, boundary) =
by_hash::walk_ancestors_outcome_by_hash(store, event_id, limit, &mut shredded);
AncestorWalk {
ancestors,
boundary,
shredded,
}
}
#[cfg(not(feature = "payload-encryption"))]
pub(crate) fn walk_ancestors_outcome<State: crate::store::StoreState>(
store: &Store<State>,
event_id: u128,
limit: usize,
) -> AncestorWalk {
let (ancestors, boundary) = by_hash::walk_ancestors_outcome_by_hash(store, event_id, limit);
AncestorWalk {
ancestors,
boundary,
}
}
#[cfg(all(test, feature = "payload-encryption"))]
mod shredded_disposition_tests {
use super::{AncestorWalk, AncestryBoundary};
use crate::id::EventId;
fn walk_with_shredded(shredded: Vec<EventId>) -> AncestorWalk {
AncestorWalk {
ancestors: Vec::new(),
boundary: AncestryBoundary::ReachedGenesis,
shredded,
}
}
#[test]
fn is_shredded_reports_exact_membership() {
let present = EventId::from(0x5151_u128);
let absent = EventId::from(0x9999_u128);
let walk = walk_with_shredded(vec![present]);
assert!(
walk.is_shredded(present),
"a shredded ancestor id must report is_shredded == true"
);
assert!(
!walk.is_shredded(absent),
"an id absent from the shredded set must report is_shredded == false"
);
}
#[test]
fn shredded_ancestors_returns_the_exact_recorded_slice() {
let a = EventId::from(0x1111_u128);
let b = EventId::from(0x2222_u128);
let walk = walk_with_shredded(vec![a, b]);
assert_eq!(
walk.shredded_ancestors(),
&[a, b][..],
"shredded_ancestors must return exactly the recorded ids, not an empty or default slice"
);
assert!(
walk_with_shredded(Vec::new())
.shredded_ancestors()
.is_empty(),
"an intact chain reports an empty shredded slice"
);
}
}