use crate::adapter::net::continuity::discontinuity::{fork_entity, ForkRecord};
use crate::adapter::net::state::causal::{validate_chain_link, CausalEvent, ChainError};
use crate::adapter::net::state::log::EntityLog;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Side {
Ours,
Theirs,
}
#[derive(Debug, Clone)]
pub enum ConflictResolution {
Winner {
winning_side: Side,
fork_record: ForkRecord,
},
}
#[derive(Debug, Clone)]
pub enum ReconcileOutcome {
AlreadyConverged,
Catchup {
origin_hash: u64,
missing_events: Vec<CausalEvent>,
behind_side: Side,
},
Conflict {
origin_hash: u64,
diverge_seq: u64,
resolution: ConflictResolution,
},
}
pub fn reconcile_entity(
our_log: &EntityLog,
their_events: &[CausalEvent],
split_seq: u64,
) -> Result<ReconcileOutcome, ChainError> {
verify_remote_chain(our_log.origin_hash(), their_events, Some(split_seq))?;
let our_events = our_log.after(split_seq);
if our_events.is_empty() && their_events.is_empty() {
return Ok(ReconcileOutcome::AlreadyConverged);
}
if our_events.is_empty() {
return Ok(ReconcileOutcome::Catchup {
origin_hash: our_log.origin_hash(),
missing_events: their_events.to_vec(),
behind_side: Side::Ours,
});
}
if their_events.is_empty() {
return Ok(ReconcileOutcome::Catchup {
origin_hash: our_log.origin_hash(),
missing_events: our_events.into_iter().cloned().collect(),
behind_side: Side::Theirs,
});
}
let mut diverge_idx = None;
let min_len = our_events.len().min(their_events.len());
for i in 0..min_len {
if our_events[i].link.parent_hash != their_events[i].link.parent_hash
|| our_events[i].link.sequence != their_events[i].link.sequence
{
diverge_idx = Some(i);
break;
}
if our_events[i].payload != their_events[i].payload {
diverge_idx = Some(i);
break;
}
}
Ok(match diverge_idx {
None if our_events.len() == their_events.len() => {
ReconcileOutcome::AlreadyConverged
}
None if our_events.len() > their_events.len() => {
let missing: Vec<CausalEvent> = our_events[their_events.len()..]
.iter()
.map(|e| (*e).clone())
.collect();
ReconcileOutcome::Catchup {
origin_hash: our_log.origin_hash(),
missing_events: missing,
behind_side: Side::Theirs,
}
}
None => {
let missing: Vec<CausalEvent> = their_events[our_events.len()..].to_vec();
ReconcileOutcome::Catchup {
origin_hash: our_log.origin_hash(),
missing_events: missing,
behind_side: Side::Ours,
}
}
Some(idx) => {
let diverge_seq = if idx < our_events.len() {
our_events[idx].link.sequence
} else {
their_events[idx].link.sequence
};
let our_len = our_events.len() - idx;
let their_len = their_events.len() - idx;
let winning_side = if our_len > their_len {
Side::Ours
} else if their_len > our_len {
Side::Theirs
} else {
let our_payload = &our_events[idx].payload;
let their_payload = &their_events[idx].payload;
let our_hash = xxhash_rust::xxh3::xxh3_64(our_payload);
let their_hash = xxhash_rust::xxh3::xxh3_64(their_payload);
let our_link = &our_events[idx].link;
let their_link = &their_events[idx].link;
use std::cmp::Ordering::{Equal, Greater, Less};
match our_hash
.cmp(&their_hash)
.then_with(|| our_payload.as_ref().cmp(their_payload.as_ref()))
.then_with(|| our_link.parent_hash.cmp(&their_link.parent_hash))
.then_with(|| our_link.sequence.cmp(&their_link.sequence))
{
Less => Side::Ours,
Greater => Side::Theirs,
Equal => {
tracing::warn!(
idx,
origin_hash = our_log.origin_hash(),
"reconcile_entity: divergence detected but events \
are byte-identical across payload + link metadata. \
Selecting Side::Ours arbitrarily; investigate \
the divergence detector."
);
Side::Ours
}
}
};
let origin_hash = our_log.origin_hash();
let (_, fork_record, _) = fork_entity(origin_hash, diverge_seq, None);
ReconcileOutcome::Conflict {
origin_hash,
diverge_seq,
resolution: ConflictResolution::Winner {
winning_side,
fork_record,
},
}
}
})
}
pub fn verify_remote_chain(
expected_origin: u64,
events: &[CausalEvent],
expected_first_seq: Option<u64>,
) -> Result<(), ChainError> {
if let Some(first) = events.first() {
if first.link.origin_hash != expected_origin {
return Err(ChainError::OriginMismatch {
expected: expected_origin,
got: first.link.origin_hash,
});
}
if let Some(split_seq) = expected_first_seq {
let expected = split_seq.checked_add(1).ok_or(ChainError::SequenceGap {
expected: u64::MAX,
got: first.link.sequence,
})?;
if first.link.sequence != expected {
return Err(ChainError::SequenceGap {
expected,
got: first.link.sequence,
});
}
}
}
for i in 1..events.len() {
validate_chain_link(&events[i - 1].link, &events[i - 1].payload, &events[i].link)?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::adapter::net::identity::EntityKeypair;
use crate::adapter::net::state::causal::CausalChainBuilder;
use bytes::Bytes;
fn build_divergent_logs(
shared_events: usize,
our_extra: usize,
their_extra: usize,
) -> (EntityLog, Vec<CausalEvent>, u64) {
let kp = EntityKeypair::generate();
let origin = kp.origin_hash();
let mut log = EntityLog::new(kp.entity_id().clone());
let mut builder = CausalChainBuilder::new(origin);
for i in 0..shared_events {
let event = builder
.append(Bytes::from(format!("shared-{}", i)), 0)
.unwrap();
log.append(event).unwrap();
}
let split_seq = builder.sequence();
let mut our_builder = CausalChainBuilder::from_head(
*builder.head(),
Bytes::from(format!("shared-{}", shared_events - 1)),
);
for i in 0..our_extra {
let event = our_builder
.append(Bytes::from(format!("ours-{}", i)), 0)
.unwrap();
log.append(event).unwrap();
}
let mut their_builder = CausalChainBuilder::from_head(
*builder.head(),
Bytes::from(format!("shared-{}", shared_events - 1)),
);
let mut their_events = Vec::new();
for i in 0..their_extra {
let event = their_builder
.append(Bytes::from(format!("theirs-{}", i)), 0)
.unwrap();
their_events.push(event);
}
(log, their_events, split_seq)
}
#[test]
fn test_already_converged() {
let kp = EntityKeypair::generate();
let origin = kp.origin_hash();
let mut log = EntityLog::new(kp.entity_id().clone());
let mut builder = CausalChainBuilder::new(origin);
for i in 0..5 {
let event = builder.append(Bytes::from(format!("e{}", i)), 0).unwrap();
log.append(event).unwrap();
}
let result = reconcile_entity(&log, &[], 5).unwrap();
assert!(matches!(result, ReconcileOutcome::AlreadyConverged));
}
#[test]
fn test_catchup_we_are_behind() {
let kp = EntityKeypair::generate();
let origin = kp.origin_hash();
let log = EntityLog::new(kp.entity_id().clone());
let mut builder = CausalChainBuilder::new(origin);
let their_events: Vec<CausalEvent> = (0..3)
.map(|i| {
builder
.append(Bytes::from(format!("theirs-{}", i)), 0)
.unwrap()
})
.collect();
let result = reconcile_entity(&log, &their_events, 0).unwrap();
match result {
ReconcileOutcome::Catchup {
behind_side,
missing_events,
..
} => {
assert_eq!(behind_side, Side::Ours);
assert_eq!(missing_events.len(), 3);
}
other => panic!("expected Catchup, got {:?}", other),
}
}
#[test]
fn test_catchup_they_are_behind() {
let (log, _, split_seq) = build_divergent_logs(3, 2, 0);
let result = reconcile_entity(&log, &[], split_seq).unwrap();
match result {
ReconcileOutcome::Catchup {
behind_side,
missing_events,
..
} => {
assert_eq!(behind_side, Side::Theirs);
assert_eq!(missing_events.len(), 2);
}
other => panic!("expected Catchup, got {:?}", other),
}
}
#[test]
fn test_conflict_longest_wins() {
let (log, their_events, split_seq) = build_divergent_logs(3, 5, 2);
let result = reconcile_entity(&log, &their_events, split_seq).unwrap();
match result {
ReconcileOutcome::Conflict {
resolution:
ConflictResolution::Winner {
winning_side,
fork_record,
},
..
} => {
assert_eq!(winning_side, Side::Ours); assert!(fork_record.verify());
}
other => panic!("expected Conflict, got {:?}", other),
}
}
#[test]
fn test_conflict_they_win() {
let (log, their_events, split_seq) = build_divergent_logs(3, 1, 4);
let result = reconcile_entity(&log, &their_events, split_seq).unwrap();
match result {
ReconcileOutcome::Conflict {
resolution: ConflictResolution::Winner { winning_side, .. },
..
} => {
assert_eq!(winning_side, Side::Theirs); }
other => panic!("expected Conflict, got {:?}", other),
}
}
#[test]
fn test_conflict_tiebreak_deterministic() {
let (log, their_events, split_seq) = build_divergent_logs(3, 2, 2);
let result = reconcile_entity(&log, &their_events, split_seq).unwrap();
assert!(matches!(
result,
ReconcileOutcome::Conflict {
resolution: ConflictResolution::Winner { .. },
..
}
));
}
#[test]
fn test_verify_remote_chain_valid() {
let kp = EntityKeypair::generate();
let mut builder = CausalChainBuilder::new(kp.origin_hash());
let events: Vec<CausalEvent> = (0..5)
.map(|i| builder.append(Bytes::from(format!("e{}", i)), 0).unwrap())
.collect();
assert!(verify_remote_chain(kp.origin_hash(), &events, None).is_ok());
}
#[test]
fn test_verify_remote_chain_broken() {
let kp = EntityKeypair::generate();
let mut builder = CausalChainBuilder::new(kp.origin_hash());
let mut events: Vec<CausalEvent> = (0..3)
.map(|i| builder.append(Bytes::from(format!("e{}", i)), 0).unwrap())
.collect();
events[1].link.parent_hash = 0xBADBADBAD;
assert!(verify_remote_chain(kp.origin_hash(), &events, None).is_err());
}
#[test]
fn test_verify_remote_chain_empty_is_ok() {
assert!(verify_remote_chain(0xDEADBEEF, &[], None).is_ok());
}
#[test]
fn test_regression_verify_remote_chain_rejects_wrong_start_sequence() {
let kp = EntityKeypair::generate();
let mut builder = CausalChainBuilder::new(kp.origin_hash());
let events: Vec<CausalEvent> = (0..10)
.map(|i| builder.append(Bytes::from(format!("e{i}")), 0).unwrap())
.collect();
let tail: Vec<CausalEvent> = events[4..].to_vec();
let rejected = verify_remote_chain(kp.origin_hash(), &tail, Some(0));
assert!(
matches!(rejected, Err(ChainError::SequenceGap { .. })),
"remote chain starting at seq 5 must be rejected when split_seq = 0",
);
assert!(
verify_remote_chain(kp.origin_hash(), &tail, Some(4)).is_ok(),
"remote chain starting at seq 5 must be accepted when split_seq = 4",
);
}
#[test]
fn test_regression_tiebreak_perspective_independent() {
let (log, their_events, split_seq) = build_divergent_logs(3, 2, 2);
let result = reconcile_entity(&log, &their_events, split_seq).unwrap();
let winning_side = match &result {
ReconcileOutcome::Conflict {
resolution: ConflictResolution::Winner { winning_side, .. },
..
} => *winning_side,
other => panic!("expected Conflict, got {:?}", other),
};
let our_post_split: Vec<CausalEvent> =
log.after(split_seq).iter().map(|e| (*e).clone()).collect();
let _kp = EntityKeypair::from_bytes([0x42u8; 32]); let origin = log.origin_hash();
let mut their_log = EntityLog::new(log.entity_id().clone());
let mut builder = CausalChainBuilder::new(origin);
for i in 0..3 {
let event = builder
.append(Bytes::from(format!("shared-{}", i)), 0)
.unwrap();
their_log.append(event).unwrap();
}
let _their_builder =
CausalChainBuilder::from_head(*builder.head(), Bytes::from("shared-2".to_string()));
for event in &their_events {
their_log.append(event.clone()).unwrap();
}
let other_result = reconcile_entity(&their_log, &our_post_split, split_seq).unwrap();
let other_winning_side = match &other_result {
ReconcileOutcome::Conflict {
resolution: ConflictResolution::Winner { winning_side, .. },
..
} => *winning_side,
other => panic!("expected Conflict from other side, got {:?}", other),
};
assert_ne!(
winning_side, other_winning_side,
"both sides must agree: if we say Ours, they must say Theirs"
);
}
#[test]
fn test_regression_reconcile_rejects_broken_remote_chain() {
let kp = EntityKeypair::generate();
let log = EntityLog::new(kp.entity_id().clone());
let mut builder = CausalChainBuilder::new(kp.origin_hash());
let mut their_events: Vec<CausalEvent> = (0..3)
.map(|i| builder.append(Bytes::from(format!("e{}", i)), 0).unwrap())
.collect();
their_events[2].link.parent_hash = 0xDEADBEEF;
let result = reconcile_entity(&log, &their_events, 0);
assert!(
result.is_err(),
"reconcile_entity must reject events with broken chain linkage"
);
}
#[test]
fn test_regression_verify_remote_chain_rejects_origin_forgery() {
let ours = EntityKeypair::generate();
let theirs = EntityKeypair::generate();
assert_ne!(
ours.origin_hash(),
theirs.origin_hash(),
"precondition: distinct origin hashes"
);
let mut builder = CausalChainBuilder::new(theirs.origin_hash());
let forged: Vec<CausalEvent> = (0..5)
.map(|i| {
builder
.append(Bytes::from(format!("forged-{}", i)), 0)
.unwrap()
})
.collect();
assert!(verify_remote_chain(theirs.origin_hash(), &forged, None).is_ok());
let rejected = verify_remote_chain(ours.origin_hash(), &forged, None);
assert!(
matches!(rejected, Err(ChainError::OriginMismatch { .. })),
"verify_remote_chain must reject a chain whose first event's origin \
doesn't match the expected origin, got {:?}",
rejected
);
}
#[test]
fn test_regression_reconcile_rejects_foreign_origin_chain() {
let ours = EntityKeypair::generate();
let theirs = EntityKeypair::generate();
let our_log = EntityLog::new(ours.entity_id().clone());
let mut foreign_builder = CausalChainBuilder::new(theirs.origin_hash());
let foreign_events: Vec<CausalEvent> = (0..10)
.map(|i| {
foreign_builder
.append(Bytes::from(format!("foreign-{}", i)), 0)
.unwrap()
})
.collect();
let result = reconcile_entity(&our_log, &foreign_events, 0);
assert!(
matches!(result, Err(ChainError::OriginMismatch { .. })),
"reconcile_entity must reject chains anchored to a foreign origin, got {:?}",
result
);
}
}