use bytes::{Buf, BufMut, Bytes, BytesMut};
use xxhash_rust::xxh3::Xxh3;
pub const SUBPROTOCOL_CAUSAL: u16 = 0x0400;
pub const SUBPROTOCOL_SNAPSHOT: u16 = 0x0401;
pub const CAUSAL_LINK_SIZE: usize = 32;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct CausalLink {
pub origin_hash: u64,
pub horizon_encoded: u64,
pub sequence: u64,
pub parent_hash: u64,
}
impl CausalLink {
pub fn genesis(origin_hash: u64, horizon_encoded: u64) -> Self {
Self {
origin_hash,
horizon_encoded,
sequence: 0,
parent_hash: 0,
}
}
#[inline]
pub fn next(&self, payload: &[u8], horizon_encoded: u64) -> Option<Self> {
let next_seq = self.sequence.checked_add(1)?;
Some(Self {
origin_hash: self.origin_hash,
horizon_encoded,
sequence: next_seq,
parent_hash: compute_parent_hash(self, payload),
})
}
#[inline]
pub fn to_bytes(&self) -> [u8; CAUSAL_LINK_SIZE] {
let mut buf = [0u8; CAUSAL_LINK_SIZE];
let mut cursor = &mut buf[..];
cursor.put_u64_le(self.origin_hash);
cursor.put_u64_le(self.horizon_encoded);
cursor.put_u64_le(self.sequence);
cursor.put_u64_le(self.parent_hash);
buf
}
#[inline]
pub fn from_bytes(data: &[u8]) -> Option<Self> {
if data.len() < CAUSAL_LINK_SIZE {
return None;
}
let mut cursor = &data[..CAUSAL_LINK_SIZE];
Some(Self {
origin_hash: cursor.get_u64_le(),
horizon_encoded: cursor.get_u64_le(),
sequence: cursor.get_u64_le(),
parent_hash: cursor.get_u64_le(),
})
}
#[inline]
pub fn is_genesis(&self) -> bool {
self.sequence == 0 && self.parent_hash == 0
}
}
#[inline]
pub fn compute_parent_hash(prev_link: &CausalLink, prev_payload: &[u8]) -> u64 {
let link_bytes = prev_link.to_bytes();
let mut hasher = Xxh3::new();
hasher.update(&link_bytes);
hasher.update(prev_payload);
hasher.digest()
}
#[derive(Debug, Clone)]
pub struct CausalEvent {
pub link: CausalLink,
pub payload: Bytes,
pub received_at: u64,
}
pub struct CausalChainBuilder {
origin_hash: u64,
head: CausalLink,
head_payload: Bytes,
}
impl CausalChainBuilder {
pub fn new(origin_hash: u64) -> Self {
let genesis = CausalLink::genesis(origin_hash, 0);
Self {
origin_hash,
head: genesis,
head_payload: Bytes::new(),
}
}
pub fn from_head(head: CausalLink, head_payload: Bytes) -> Self {
Self {
origin_hash: head.origin_hash,
head,
head_payload,
}
}
pub fn append(&mut self, payload: Bytes, horizon_encoded: u64) -> Option<CausalEvent> {
let next_link = self.head.next(&self.head_payload, horizon_encoded)?;
let event = CausalEvent {
link: next_link,
payload: payload.clone(),
received_at: current_timestamp(),
};
self.head = next_link;
self.head_payload = payload;
Some(event)
}
#[inline]
pub fn head(&self) -> &CausalLink {
&self.head
}
#[inline]
pub fn sequence(&self) -> u64 {
self.head.sequence
}
#[inline]
pub fn origin_hash(&self) -> u64 {
self.origin_hash
}
}
pub fn validate_chain_link(
prev_link: &CausalLink,
prev_payload: &[u8],
new_link: &CausalLink,
) -> Result<(), ChainError> {
if new_link.origin_hash != prev_link.origin_hash {
return Err(ChainError::OriginMismatch {
expected: prev_link.origin_hash,
got: new_link.origin_hash,
});
}
let expected_seq = prev_link
.sequence
.checked_add(1)
.ok_or(ChainError::SequenceGap {
expected: u64::MAX,
got: new_link.sequence,
})?;
if new_link.sequence != expected_seq {
return Err(ChainError::SequenceGap {
expected: expected_seq,
got: new_link.sequence,
});
}
let expected_parent = compute_parent_hash(prev_link, prev_payload);
if new_link.parent_hash != expected_parent {
return Err(ChainError::ParentHashMismatch {
expected: expected_parent,
got: new_link.parent_hash,
});
}
Ok(())
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct WriteCausalEventsResult {
pub bytes_written: usize,
pub events_written: usize,
pub events_skipped: usize,
}
pub fn write_causal_events(events: &[CausalEvent], buf: &mut BytesMut) -> WriteCausalEventsResult {
let start = buf.len();
let mut events_written = 0usize;
let mut events_skipped = 0usize;
for event in events {
let total_len = CAUSAL_LINK_SIZE + event.payload.len();
let total_len_u32 = match u32::try_from(total_len) {
Ok(n) => n,
Err(_) => {
tracing::warn!(
payload_len = event.payload.len(),
"write_causal_events: skipping event whose serialized \
size exceeds u32 — caller MUST use \
`events_written` as framing count, not events.len()",
);
events_skipped += 1;
continue;
}
};
buf.put_u32_le(total_len_u32);
buf.put_slice(&event.link.to_bytes());
buf.put_slice(&event.payload);
events_written += 1;
}
WriteCausalEventsResult {
bytes_written: buf.len() - start,
events_written,
events_skipped,
}
}
pub fn read_causal_events(data: Bytes, count: u16) -> Vec<CausalEvent> {
let cap = (count as usize).min(data.len() / (4 + CAUSAL_LINK_SIZE));
let mut events = Vec::with_capacity(cap);
let mut remaining = data;
let mut parse_errors: u64 = 0;
for _ in 0..count {
if remaining.len() < 4 {
parse_errors += 1;
break;
}
let total_len = (&remaining[..4]).get_u32_le() as usize;
remaining.advance(4);
if remaining.len() < total_len || total_len < CAUSAL_LINK_SIZE {
parse_errors += 1;
break;
}
let link = match CausalLink::from_bytes(&remaining[..CAUSAL_LINK_SIZE]) {
Some(l) => l,
None => {
parse_errors += 1;
break;
}
};
remaining.advance(CAUSAL_LINK_SIZE);
let payload_len = total_len - CAUSAL_LINK_SIZE;
let payload = remaining.split_to(payload_len);
events.push(CausalEvent {
link,
payload,
received_at: current_timestamp(),
});
}
if parse_errors > 0 {
tracing::warn!(
parse_errors,
expected = count,
parsed = events.len(),
"read_causal_events dropped malformed events"
);
}
events
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ChainError {
OriginMismatch {
expected: u64,
got: u64,
},
SequenceGap {
expected: u64,
got: u64,
},
ParentHashMismatch {
expected: u64,
got: u64,
},
}
impl std::fmt::Display for ChainError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::OriginMismatch { expected, got } => {
write!(
f,
"origin mismatch: expected {:#x}, got {:#x}",
expected, got
)
}
Self::SequenceGap { expected, got } => {
write!(f, "sequence gap: expected {}, got {}", expected, got)
}
Self::ParentHashMismatch { expected, got } => {
write!(
f,
"parent hash mismatch: expected {:#x}, got {:#x}",
expected, got
)
}
}
}
}
impl std::error::Error for ChainError {}
use crate::adapter::net::current_timestamp;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_causal_link_roundtrip() {
let link = CausalLink {
origin_hash: 0xDEADBEEF,
sequence: 42,
parent_hash: 0x1234567890ABCDEF,
horizon_encoded: 0xCAFE,
};
let bytes = link.to_bytes();
assert_eq!(bytes.len(), CAUSAL_LINK_SIZE);
let parsed = CausalLink::from_bytes(&bytes).unwrap();
assert_eq!(parsed, link);
}
#[test]
fn test_genesis() {
let link = CausalLink::genesis(0xABCD, 0);
assert!(link.is_genesis());
assert_eq!(link.sequence, 0);
assert_eq!(link.parent_hash, 0);
}
#[test]
fn test_chain_next() {
let genesis = CausalLink::genesis(0xABCD, 0);
let payload = b"hello";
let next = genesis.next(payload, 0).unwrap();
assert_eq!(next.sequence, 1);
assert_eq!(next.origin_hash, 0xABCD);
assert_ne!(next.parent_hash, 0); }
#[test]
fn test_chain_builder() {
let mut builder = CausalChainBuilder::new(0xABCD);
assert_eq!(builder.sequence(), 0);
let e1 = builder.append(Bytes::from_static(b"event1"), 0).unwrap();
assert_eq!(e1.link.sequence, 1);
assert_eq!(builder.sequence(), 1);
let e2 = builder.append(Bytes::from_static(b"event2"), 0).unwrap();
assert_eq!(e2.link.sequence, 2);
assert_eq!(
e2.link.parent_hash,
compute_parent_hash(&e1.link, &e1.payload)
);
}
#[test]
fn test_validate_chain_link() {
let mut builder = CausalChainBuilder::new(0xABCD);
let e1 = builder.append(Bytes::from_static(b"event1"), 0).unwrap();
let e2 = builder.append(Bytes::from_static(b"event2"), 0).unwrap();
assert!(validate_chain_link(&e1.link, &e1.payload, &e2.link).is_ok());
}
#[test]
fn test_validate_rejects_origin_mismatch() {
let link1 = CausalLink::genesis(0xAAAA, 0);
let mut link2 = link1.next(b"data", 0).unwrap();
link2.origin_hash = 0xBBBB;
assert!(matches!(
validate_chain_link(&link1, b"data", &link2),
Err(ChainError::OriginMismatch { .. })
));
}
#[test]
fn test_validate_rejects_sequence_gap() {
let link1 = CausalLink::genesis(0xAAAA, 0);
let mut link2 = link1.next(b"data", 0).unwrap();
link2.sequence = 5;
assert!(matches!(
validate_chain_link(&link1, b"data", &link2),
Err(ChainError::SequenceGap { .. })
));
}
#[test]
fn test_validate_rejects_bad_parent_hash() {
let link1 = CausalLink::genesis(0xAAAA, 0);
let mut link2 = link1.next(b"data", 0).unwrap();
link2.parent_hash = 0xBADBADBAD;
assert!(matches!(
validate_chain_link(&link1, b"data", &link2),
Err(ChainError::ParentHashMismatch { .. })
));
}
#[test]
fn test_causal_event_framing_roundtrip() {
let mut builder = CausalChainBuilder::new(0xABCD);
let events: Vec<CausalEvent> = (0..3)
.map(|i| {
builder
.append(Bytes::from(format!("event-{}", i)), 0)
.unwrap()
})
.collect();
let mut buf = BytesMut::new();
let result = write_causal_events(&events, &mut buf);
assert!(result.bytes_written > 0);
assert_eq!(result.events_written, events.len());
assert_eq!(result.events_skipped, 0);
let parsed = read_causal_events(buf.freeze(), 3);
assert_eq!(parsed.len(), 3);
for (orig, parsed) in events.iter().zip(parsed.iter()) {
assert_eq!(parsed.link, orig.link);
assert_eq!(parsed.payload, orig.payload);
}
}
#[test]
fn test_parent_hash_deterministic() {
let link = CausalLink::genesis(0x1234, 0);
let payload = b"test payload";
let h1 = compute_parent_hash(&link, payload);
let h2 = compute_parent_hash(&link, payload);
assert_eq!(h1, h2);
}
#[test]
fn test_parent_hash_differs_on_payload_change() {
let link = CausalLink::genesis(0x1234, 0);
let h1 = compute_parent_hash(&link, b"payload a");
let h2 = compute_parent_hash(&link, b"payload b");
assert_ne!(h1, h2);
}
#[test]
fn compute_parent_hash_streaming_matches_concatenated_oneshot() {
use xxhash_rust::xxh3::xxh3_64;
let link = CausalLink::genesis(0xDEAD_BEEF, 42);
let link_bytes = link.to_bytes();
for size in [0usize, 1, 31, 32, 33, 63, 64, 65, 1024, 8 * 1024] {
let payload: Vec<u8> = (0..size).map(|i| (i as u8).wrapping_mul(31)).collect();
let streaming = compute_parent_hash(&link, &payload);
let mut combined = Vec::with_capacity(CAUSAL_LINK_SIZE + payload.len());
combined.extend_from_slice(&link_bytes);
combined.extend_from_slice(&payload);
let oneshot = xxh3_64(&combined);
assert_eq!(
streaming, oneshot,
"streaming and one-shot xxh3 diverge at payload size {size}"
);
}
}
#[test]
fn test_long_chain_integrity() {
let mut builder = CausalChainBuilder::new(0xFACE);
let mut events = Vec::new();
for i in 0..100 {
let event = builder
.append(Bytes::from(format!("data-{}", i)), 0)
.unwrap();
events.push(event);
}
for i in 1..events.len() {
assert!(
validate_chain_link(&events[i - 1].link, &events[i - 1].payload, &events[i].link)
.is_ok(),
"chain broken at event {}",
i
);
}
}
#[test]
fn test_regression_causal_link_wire_size_is_32() {
let link = CausalLink::genesis(0xDEADBEEF, 0xCAFE);
let bytes = link.to_bytes();
assert_eq!(
bytes.len(),
32,
"CausalLink wire size must be exactly 32 bytes"
);
assert_eq!(bytes.len(), CAUSAL_LINK_SIZE);
let parsed = CausalLink::from_bytes(&bytes).unwrap();
assert_eq!(parsed, link);
}
}