use crate::adapter::net::state::causal::{compute_parent_hash, CausalLink};
use crate::adapter::net::state::log::EntityLog;
use crate::adapter::net::state::snapshot::StateSnapshot;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ContinuityStatus {
Continuous {
genesis_hash: u64,
head_seq: u64,
head_hash: u64,
},
Forked {
fork_point: u64,
original_hash: u64,
fork_hash: u64,
},
Unverifiable {
last_verified_seq: u64,
gap_start: u64,
},
Migrated {
migration_seq: u64,
source_node: u64,
target_node: u64,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ContinuityProof {
pub origin_hash: u64,
pub from_seq: u64,
pub to_seq: u64,
pub from_hash: u64,
pub to_hash: u64,
}
pub const CONTINUITY_PROOF_SIZE: usize = 40;
pub const MAX_PROOF_VERIFY_SPAN: u64 = 100_000;
impl ContinuityProof {
#[expect(
clippy::unwrap_used,
reason = "events.is_empty() guard above; .last() on a non-empty slice is infallible"
)]
pub fn from_log(log: &EntityLog) -> Option<Self> {
if log.is_empty() {
return None;
}
let events = log.range(0, u64::MAX);
if events.is_empty() {
return None;
}
let first = &events[0];
let last = events.last().unwrap();
let from_hash = compute_parent_hash(&first.link, &first.payload);
let to_hash = compute_parent_hash(&last.link, &last.payload);
Some(Self {
origin_hash: log.origin_hash(),
from_seq: first.link.sequence,
to_seq: last.link.sequence,
from_hash,
to_hash,
})
}
#[expect(
clippy::unwrap_used,
reason = "events.is_empty() guard above; .last() on a non-empty slice is infallible"
)]
pub fn verify_against(&self, log: &EntityLog) -> Result<(), ProofError> {
if self.origin_hash != log.origin_hash() {
return Err(ProofError::OriginMismatch);
}
if self.from_seq > self.to_seq {
return Err(ProofError::InvalidRange {
from_seq: self.from_seq,
to_seq: self.to_seq,
});
}
let span = self.to_seq.saturating_sub(self.from_seq);
if span >= MAX_PROOF_VERIFY_SPAN {
return Err(ProofError::SpanTooLarge {
from_seq: self.from_seq,
to_seq: self.to_seq,
cap: MAX_PROOF_VERIFY_SPAN,
});
}
let events = log.range(self.from_seq, self.to_seq);
if events.is_empty() {
return Err(ProofError::MissingEvent(self.from_seq));
}
let first = &events[0];
if first.link.sequence != self.from_seq {
return Err(ProofError::MissingEvent(self.from_seq));
}
let from_local = compute_parent_hash(&first.link, &first.payload);
if from_local != self.from_hash {
return Err(ProofError::HashMismatch {
seq: self.from_seq,
expected: self.from_hash,
got: from_local,
});
}
for i in 1..events.len() {
let prev = &events[i - 1];
let curr = &events[i];
if curr.link.sequence != prev.link.sequence + 1 {
return Err(ProofError::MissingEvent(prev.link.sequence + 1));
}
let expected_parent = compute_parent_hash(&prev.link, &prev.payload);
if curr.link.parent_hash != expected_parent {
return Err(ProofError::HashMismatch {
seq: curr.link.sequence,
expected: expected_parent,
got: curr.link.parent_hash,
});
}
}
let last = events.last().unwrap();
if last.link.sequence != self.to_seq {
return Err(ProofError::MissingEvent(self.to_seq));
}
let to_local = compute_parent_hash(&last.link, &last.payload);
if to_local != self.to_hash {
return Err(ProofError::HashMismatch {
seq: self.to_seq,
expected: self.to_hash,
got: to_local,
});
}
Ok(())
}
pub fn to_bytes(&self) -> [u8; CONTINUITY_PROOF_SIZE] {
let mut buf = [0u8; CONTINUITY_PROOF_SIZE];
buf[0..8].copy_from_slice(&self.origin_hash.to_le_bytes());
buf[8..16].copy_from_slice(&self.from_seq.to_le_bytes());
buf[16..24].copy_from_slice(&self.to_seq.to_le_bytes());
buf[24..32].copy_from_slice(&self.from_hash.to_le_bytes());
buf[32..40].copy_from_slice(&self.to_hash.to_le_bytes());
buf
}
#[expect(
clippy::unwrap_used,
reason = "data.len() == CONTINUITY_PROOF_SIZE checked above; fixed-offset slices into the buffer convert infallibly to fixed-size arrays"
)]
pub fn from_bytes(data: &[u8]) -> Option<Self> {
if data.len() != CONTINUITY_PROOF_SIZE {
return None;
}
Some(Self {
origin_hash: u64::from_le_bytes(data[0..8].try_into().unwrap()),
from_seq: u64::from_le_bytes(data[8..16].try_into().unwrap()),
to_seq: u64::from_le_bytes(data[16..24].try_into().unwrap()),
from_hash: u64::from_le_bytes(data[24..32].try_into().unwrap()),
to_hash: u64::from_le_bytes(data[32..40].try_into().unwrap()),
})
}
}
#[expect(
clippy::unwrap_used,
reason = "events.is_empty() guard above; .last() on a non-empty slice is infallible"
)]
pub fn assess_continuity(log: &EntityLog, snapshot: Option<&StateSnapshot>) -> ContinuityStatus {
let events = log.range(0, u64::MAX);
if events.is_empty() {
return ContinuityStatus::Continuous {
genesis_hash: 0,
head_seq: log.head_seq(),
head_hash: 0,
};
}
let first = &events[0];
let first_seq = first.link.sequence;
let expected_anchor_hash = if first_seq == 1 {
Some(compute_parent_hash(
&CausalLink::genesis(log.origin_hash(), 0),
&[],
))
} else if let Some(s) = snapshot {
if s.through_seq.checked_add(1) == Some(first_seq) {
match (s.chain_link.sequence, &s.head_payload) {
(0, Some(payload)) if !payload.is_empty() => None,
(0, Some(_)) | (0, None) => Some(compute_parent_hash(&s.chain_link, &[])),
(_, Some(payload)) => Some(compute_parent_hash(&s.chain_link, payload)),
(_, None) => None, }
} else {
None
}
} else {
None
};
let Some(expected_anchor_hash) = expected_anchor_hash else {
return ContinuityStatus::Unverifiable {
last_verified_seq: 0,
gap_start: 0,
};
};
if first.link.parent_hash != expected_anchor_hash {
return ContinuityStatus::Forked {
fork_point: first_seq,
original_hash: expected_anchor_hash,
fork_hash: first.link.parent_hash,
};
}
for i in 1..events.len() {
let prev = &events[i - 1];
let curr = &events[i];
if curr.link.sequence != prev.link.sequence + 1 {
return ContinuityStatus::Unverifiable {
last_verified_seq: prev.link.sequence,
gap_start: prev.link.sequence + 1,
};
}
let expected_parent = compute_parent_hash(&prev.link, &prev.payload);
if curr.link.parent_hash != expected_parent {
return ContinuityStatus::Forked {
fork_point: curr.link.sequence,
original_hash: expected_parent,
fork_hash: curr.link.parent_hash,
};
}
}
let first = &events[0];
let last = events.last().unwrap();
let genesis_hash = compute_parent_hash(&first.link, &first.payload);
let head_hash = compute_parent_hash(&last.link, &last.payload);
ContinuityStatus::Continuous {
genesis_hash,
head_seq: last.link.sequence,
head_hash,
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ProofError {
OriginMismatch,
HashMismatch {
seq: u64,
expected: u64,
got: u64,
},
MissingEvent(u64),
InvalidRange {
from_seq: u64,
to_seq: u64,
},
SpanTooLarge {
from_seq: u64,
to_seq: u64,
cap: u64,
},
}
impl std::fmt::Display for ProofError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::OriginMismatch => write!(f, "origin hash mismatch"),
Self::HashMismatch { seq, expected, got } => {
write!(
f,
"hash mismatch at seq {}: expected {:#x}, got {:#x}",
seq, expected, got
)
}
Self::MissingEvent(seq) => write!(f, "missing event at seq {}", seq),
Self::InvalidRange { from_seq, to_seq } => write!(
f,
"invalid proof range: from_seq ({}) > to_seq ({})",
from_seq, to_seq
),
Self::SpanTooLarge {
from_seq,
to_seq,
cap,
} => write!(
f,
"proof span too large: from_seq={}, to_seq={}, cap={}",
from_seq, to_seq, cap
),
}
}
}
impl std::error::Error for ProofError {}
#[cfg(test)]
mod tests {
use super::*;
use crate::adapter::net::identity::EntityKeypair;
use crate::adapter::net::state::causal::CausalChainBuilder;
use bytes::Bytes;
fn build_log(count: usize) -> (EntityLog, CausalChainBuilder) {
let kp = EntityKeypair::generate();
let origin = kp.origin_hash();
let mut log = EntityLog::new(kp.entity_id().clone());
let mut builder = CausalChainBuilder::new(origin);
for i in 0..count {
let event = builder
.append(Bytes::from(format!("event-{}", i)), 0)
.unwrap();
log.append(event).unwrap();
}
(log, builder)
}
#[test]
fn test_assess_continuous() {
let (log, _) = build_log(10);
let status = assess_continuity(&log, None);
assert!(matches!(
status,
ContinuityStatus::Continuous { head_seq: 10, .. }
));
}
#[test]
fn test_assess_empty_log() {
let kp = EntityKeypair::generate();
let log = EntityLog::new(kp.entity_id().clone());
let status = assess_continuity(&log, None);
assert!(matches!(status, ContinuityStatus::Continuous { .. }));
}
#[test]
fn assess_continuity_unverifiable_when_log_starts_past_genesis_without_snapshot() {
let (mut log, _) = build_log(20);
log.prune_through(10);
assert!(
!log.is_empty(),
"test setup: log must still have events 11..20"
);
let status = assess_continuity(&log, None);
assert!(
matches!(
status,
ContinuityStatus::Unverifiable {
last_verified_seq: 0,
gap_start: 0,
}
),
"pruned log without snapshot must be Unverifiable, got {:?}",
status,
);
}
#[test]
fn assess_continuity_continuous_when_snapshot_bridges_gap() {
use crate::adapter::net::state::horizon::ObservedHorizon;
let (mut log, _) = build_log(20);
let event_at_10 = log.range(10, 10)[0].clone();
log.prune_through(10);
let snapshot = StateSnapshot {
version: 1,
entity_id: log.entity_id().clone(),
through_seq: 10,
chain_link: event_at_10.link,
state: bytes::Bytes::new(),
horizon: ObservedHorizon::default(),
created_at: 0,
bindings_bytes: Vec::new(),
identity_envelope: None,
head_payload: Some(event_at_10.payload.clone()),
};
let status = assess_continuity(&log, Some(&snapshot));
assert!(
matches!(status, ContinuityStatus::Continuous { head_seq: 20, .. }),
"snapshot.through_seq + 1 == first_event.sequence must anchor, got {:?}",
status,
);
}
#[test]
fn assess_continuity_unverifiable_when_snapshot_through_seq_does_not_bridge() {
use crate::adapter::net::state::causal::CausalLink;
use crate::adapter::net::state::horizon::ObservedHorizon;
let (mut log, _) = build_log(20);
log.prune_through(10);
let snapshot = StateSnapshot {
version: 1,
entity_id: log.entity_id().clone(),
through_seq: 5,
chain_link: CausalLink::genesis(log.origin_hash(), 0),
state: bytes::Bytes::new(),
horizon: ObservedHorizon::default(),
created_at: 0,
bindings_bytes: Vec::new(),
identity_envelope: None,
head_payload: None,
};
let status = assess_continuity(&log, Some(&snapshot));
assert!(
matches!(
status,
ContinuityStatus::Unverifiable {
last_verified_seq: 0,
gap_start: 0,
}
),
"mismatched snapshot must not anchor, got {:?}",
status,
);
}
#[test]
fn assess_continuity_forked_when_snapshot_chain_link_mismatches_log_anchor() {
use crate::adapter::net::state::horizon::ObservedHorizon;
let (mut log, _) = build_log(20);
log.prune_through(10);
let snapshot = StateSnapshot {
version: 1,
entity_id: log.entity_id().clone(),
through_seq: 10,
chain_link: CausalLink::genesis(log.origin_hash(), 0),
state: bytes::Bytes::new(),
horizon: ObservedHorizon::default(),
created_at: 0,
bindings_bytes: Vec::new(),
identity_envelope: None,
head_payload: None,
};
let status = assess_continuity(&log, Some(&snapshot));
match status {
ContinuityStatus::Forked { fork_point, .. } => {
assert_eq!(
fork_point, 11,
"snapshot anchor parent_hash mismatch must surface as Forked \
at first_seq (cubic-ai P1), got fork_point={}",
fork_point,
);
}
other => panic!(
"expected Forked at the snapshot anchor — pre-fix this returned \
Continuous because only `through_seq` was checked. Got {:?}",
other
),
}
}
#[test]
fn assess_continuity_unverifiable_when_snapshot_head_payload_is_unpopulated() {
use crate::adapter::net::state::horizon::ObservedHorizon;
let (mut log, _) = build_log(20);
let event_at_10 = log.range(10, 10)[0].clone();
assert!(
!event_at_10.payload.is_empty(),
"test setup: build_log must produce non-empty payloads, \
otherwise empty-vs-real head_payload comparison is moot",
);
log.prune_through(10);
let snapshot = StateSnapshot {
version: 1,
entity_id: log.entity_id().clone(),
through_seq: 10,
chain_link: event_at_10.link,
state: bytes::Bytes::new(),
horizon: ObservedHorizon::default(),
created_at: 0,
bindings_bytes: Vec::new(),
identity_envelope: None,
head_payload: None,
};
let status = assess_continuity(&log, Some(&snapshot));
match status {
ContinuityStatus::Unverifiable {
last_verified_seq,
gap_start,
} => {
assert_eq!(
last_verified_seq, 0,
"anchor unverifiable: nothing has been verified yet"
);
assert_eq!(gap_start, 0, "gap starts at the (un-verifiable) anchor");
}
other => panic!(
"CR-34: empty head_payload for non-genesis snapshot must surface as \
Unverifiable (not Forked); got {:?}",
other
),
}
}
#[test]
fn cr34_genesis_snapshot_with_empty_head_payload_still_validates() {
use crate::adapter::net::state::horizon::ObservedHorizon;
let (log, _) = build_log(5);
let genesis_link = CausalLink::genesis(log.origin_hash(), 0);
let snapshot = StateSnapshot {
version: 1,
entity_id: log.entity_id().clone(),
through_seq: 0,
chain_link: genesis_link,
state: bytes::Bytes::new(),
horizon: ObservedHorizon::default(),
created_at: 0,
bindings_bytes: Vec::new(),
identity_envelope: None,
head_payload: None,
};
let status = assess_continuity(&log, Some(&snapshot));
match status {
ContinuityStatus::Continuous { .. } => {
}
other => panic!(
"CR-34: genesis snapshot (seq=0) with empty head_payload must \
be Continuous (legitimate), got {:?}",
other
),
}
}
#[test]
fn cubic_p2_some_empty_head_payload_validates_as_legitimate() {
use crate::adapter::net::identity::EntityKeypair;
use crate::adapter::net::state::causal::CausalChainBuilder;
use crate::adapter::net::state::horizon::ObservedHorizon;
use crate::adapter::net::state::EntityLog;
let kp = EntityKeypair::generate();
let mut log = EntityLog::new(kp.entity_id().clone());
let mut builder = CausalChainBuilder::new(kp.origin_hash());
for i in 0..20usize {
let payload = if i == 9 {
bytes::Bytes::new()
} else {
bytes::Bytes::from(format!("event-{}", i))
};
let event = builder.append(payload, 0).unwrap();
log.append(event).unwrap();
}
let event_at_10 = log.range(10, 10)[0].clone();
assert!(
event_at_10.payload.is_empty(),
"test setup: event 10 must have an empty payload"
);
log.prune_through(10);
let snapshot = StateSnapshot {
version: 1,
entity_id: log.entity_id().clone(),
through_seq: 10,
chain_link: event_at_10.link,
state: bytes::Bytes::new(),
horizon: ObservedHorizon::default(),
created_at: 0,
bindings_bytes: Vec::new(),
identity_envelope: None,
head_payload: Some(event_at_10.payload.clone()),
};
let status = assess_continuity(&log, Some(&snapshot));
match status {
ContinuityStatus::Continuous { .. } => {
}
other => panic!(
"Cubic P2: Some(Bytes::new()) for a legitimate empty-payload \
head event must be Continuous (NOT Unverifiable). got {:?}",
other
),
}
}
#[test]
fn assess_continuity_unverifiable_when_snapshot_through_seq_is_u64_max() {
use crate::adapter::net::state::horizon::ObservedHorizon;
let (mut log, _) = build_log(20);
log.prune_through(10);
let snapshot = StateSnapshot {
version: 1,
entity_id: log.entity_id().clone(),
through_seq: u64::MAX,
chain_link: CausalLink::genesis(log.origin_hash(), 0),
state: bytes::Bytes::new(),
horizon: ObservedHorizon::default(),
created_at: 0,
bindings_bytes: Vec::new(),
identity_envelope: None,
head_payload: None,
};
let status = assess_continuity(&log, Some(&snapshot));
assert!(
matches!(
status,
ContinuityStatus::Unverifiable {
last_verified_seq: 0,
gap_start: 0,
}
),
"snapshot.through_seq == u64::MAX must never anchor — \
pre-fix saturating_add would have falsely matched a log \
claiming first_seq == u64::MAX. Got: {:?}",
status,
);
}
#[test]
fn assess_continuity_continuous_when_snapshot_chain_link_matches_log_anchor() {
use crate::adapter::net::state::horizon::ObservedHorizon;
let (mut log, _) = build_log(20);
let event_at_10 = log.range(10, 10)[0].clone();
log.prune_through(10);
let snapshot = StateSnapshot {
version: 1,
entity_id: log.entity_id().clone(),
through_seq: 10,
chain_link: event_at_10.link,
state: bytes::Bytes::new(),
horizon: ObservedHorizon::default(),
created_at: 0,
bindings_bytes: Vec::new(),
identity_envelope: None,
head_payload: Some(event_at_10.payload.clone()),
};
let status = assess_continuity(&log, Some(&snapshot));
assert!(
matches!(status, ContinuityStatus::Continuous { head_seq: 20, .. }),
"matching snapshot chain_link + head_payload must anchor cleanly, got {:?}",
status,
);
}
#[test]
fn malformed_genesis_snapshot_with_nonempty_head_payload_is_unverifiable() {
use crate::adapter::net::state::horizon::ObservedHorizon;
let (mut log, _) = build_log(20);
log.prune_through(10);
let genesis_link = CausalLink::genesis(log.origin_hash(), 0);
let snapshot = StateSnapshot {
version: 1,
entity_id: log.entity_id().clone(),
through_seq: 10,
chain_link: genesis_link,
state: bytes::Bytes::new(),
horizon: ObservedHorizon::default(),
created_at: 0,
bindings_bytes: Vec::new(),
identity_envelope: None,
head_payload: Some(bytes::Bytes::from_static(b"not-genesis-payload")),
};
let status = assess_continuity(&log, Some(&snapshot));
match status {
ContinuityStatus::Unverifiable {
last_verified_seq,
gap_start,
} => {
assert_eq!(last_verified_seq, 0);
assert_eq!(gap_start, 0);
}
other => panic!(
"regression: genesis-shaped snapshot (chain_link.sequence=0) \
with non-empty head_payload must surface as Unverifiable \
(not Forked / Continuous). Hashing junk against the genesis \
link and surfacing the mismatch as Forked routes operators \
toward the wrong remediation. got {:?}",
other
),
}
}
#[test]
fn test_proof_roundtrip() {
let (log, _) = build_log(5);
let proof = ContinuityProof::from_log(&log).unwrap();
let bytes = proof.to_bytes();
assert_eq!(bytes.len(), CONTINUITY_PROOF_SIZE);
let parsed = ContinuityProof::from_bytes(&bytes).unwrap();
assert_eq!(parsed, proof);
}
#[test]
fn test_proof_verify_against_same_log() {
let (log, _) = build_log(5);
let proof = ContinuityProof::from_log(&log).unwrap();
assert!(proof.verify_against(&log).is_ok());
}
#[test]
fn test_proof_verify_wrong_origin() {
let (log_a, _) = build_log(5);
let (log_b, _) = build_log(5);
let proof = ContinuityProof::from_log(&log_a).unwrap();
assert_eq!(
proof.verify_against(&log_b).unwrap_err(),
ProofError::OriginMismatch,
);
}
#[test]
fn test_proof_from_empty_log() {
let kp = EntityKeypair::generate();
let log = EntityLog::new(kp.entity_id().clone());
assert!(ContinuityProof::from_log(&log).is_none());
}
#[test]
fn verify_against_rejects_proof_when_middle_events_are_missing() {
let (full_log, _) = build_log(5);
let proof = ContinuityProof::from_log(&full_log).unwrap();
let kp = EntityKeypair::generate();
let mut peer_log = EntityLog::new(full_log.entity_id().clone());
let _ = kp; for ev in full_log.range(1, 5) {
peer_log.append((*ev).clone()).unwrap();
}
peer_log.prune_through(4);
let result = proof.verify_against(&peer_log);
assert!(
matches!(result, Err(ProofError::MissingEvent(1))),
"verify_against must reject with MissingEvent(from_seq=1) when \
from_seq itself is gone (cubic-ai P3), got {:?}",
result,
);
}
#[test]
fn verify_against_rejects_proof_with_reversed_bounds() {
let (log, _) = build_log(5);
let mut proof = ContinuityProof::from_log(&log).unwrap();
std::mem::swap(&mut proof.from_seq, &mut proof.to_seq);
let result = proof.verify_against(&log);
assert!(
matches!(result, Err(ProofError::InvalidRange { .. })),
"verify_against must reject reversed bounds, got {:?}",
result,
);
}
#[test]
fn verify_against_rejects_proof_with_oversized_span() {
let (log, _) = build_log(5);
let mut proof = ContinuityProof::from_log(&log).unwrap();
proof.from_seq = 0;
proof.to_seq = MAX_PROOF_VERIFY_SPAN + 1;
let result = proof.verify_against(&log);
assert!(
matches!(result, Err(ProofError::SpanTooLarge { .. })),
"verify_against must reject spans over MAX_PROOF_VERIFY_SPAN, got {:?}",
result,
);
}
#[test]
fn cr13_max_proof_verify_span_is_capped_at_100k() {
assert_eq!(
MAX_PROOF_VERIFY_SPAN, 100_000,
"CR-13: MAX_PROOF_VERIFY_SPAN must stay at 100_000. \
Raising it requires re-evaluating the per-peer rate-limit \
contract documented on the constant — at ~100ns per event, \
a 1M cap is a 100ms-CPU-per-call attack surface. If you're \
increasing this for legitimate reasons, also document the \
updated per-peer rate-limit requirement and bump the test."
);
}
#[test]
fn verify_against_accepts_intact_chain_with_intermediate_events() {
let (log, _) = build_log(10);
let proof = ContinuityProof::from_log(&log).unwrap();
proof
.verify_against(&log)
.expect("intact chain must verify");
}
}