use std::collections::HashMap;
use xxhash_rust::xxh3::xxh3_64;
#[derive(Debug, Clone, Default)]
pub struct ObservedHorizon {
entries: HashMap<u64, u64>,
logical_time: u64,
}
impl ObservedHorizon {
pub fn new() -> Self {
Self::default()
}
pub fn observe(&mut self, origin_hash: u64, sequence: u64) {
let entry = self.entries.entry(origin_hash).or_insert(0);
if sequence > *entry {
*entry = sequence;
self.logical_time = self.logical_time.saturating_add(1);
}
}
pub fn get(&self, origin_hash: u64) -> Option<u64> {
self.entries.get(&origin_hash).copied()
}
pub fn has_observed(&self, origin_hash: u64, sequence: u64) -> bool {
self.entries
.get(&origin_hash)
.is_some_and(|&seq| seq >= sequence)
}
#[inline]
pub fn contains_origin(&self, origin_hash: u64) -> bool {
self.entries.contains_key(&origin_hash)
}
pub fn entity_count(&self) -> usize {
self.entries.len()
}
pub fn logical_time(&self) -> u64 {
self.logical_time
}
pub fn iter(&self) -> impl Iterator<Item = (&u64, &u64)> {
self.entries.iter()
}
pub fn merge(&mut self, other: &ObservedHorizon) {
for (&origin, &seq) in &other.entries {
let entry = self.entries.entry(origin).or_insert(0);
if seq > *entry {
*entry = seq;
}
}
self.logical_time = self.logical_time.max(other.logical_time).saturating_add(1);
}
pub fn encode(&self) -> u64 {
HorizonEncoder::encode(self)
}
}
pub struct HorizonEncoder;
impl HorizonEncoder {
#[inline]
fn bloom_bits_for_origin(origin_hash: u64) -> u64 {
let h = xxh3_64(&origin_hash.to_le_bytes());
let h1 = h as u32;
let h2 = ((h >> 32) as u32) | 1; let p1 = (h1 % 64) as u64;
let p2 = (h1.wrapping_add(h2) % 64) as u64;
let p3 = (h1.wrapping_add(h2.wrapping_mul(2)) % 64) as u64;
(1u64 << p1) | (1u64 << p2) | (1u64 << p3)
}
pub fn encode(horizon: &ObservedHorizon) -> u64 {
let mut bloom: u64 = 0;
for &origin_hash in horizon.entries.keys() {
bloom |= Self::bloom_bits_for_origin(origin_hash);
}
bloom
}
pub fn might_contain(encoded: u64, origin_hash: u64) -> bool {
let bits = Self::bloom_bits_for_origin(origin_hash);
(encoded & bits) == bits
}
pub fn potentially_concurrent(
horizon_a: u64,
origin_b: u64,
horizon_b: u64,
origin_a: u64,
) -> bool {
!Self::might_contain(horizon_a, origin_b) && !Self::might_contain(horizon_b, origin_a)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_empty_horizon() {
let h = ObservedHorizon::new();
assert_eq!(h.entity_count(), 0);
assert_eq!(h.encode(), 0);
}
#[test]
fn test_observe() {
let mut h = ObservedHorizon::new();
h.observe(0xAAAA, 10);
h.observe(0xBBBB, 20);
assert_eq!(h.get(0xAAAA), Some(10));
assert_eq!(h.get(0xBBBB), Some(20));
assert_eq!(h.get(0xCCCC), None);
assert!(h.has_observed(0xAAAA, 5));
assert!(h.has_observed(0xAAAA, 10));
assert!(!h.has_observed(0xAAAA, 11));
}
#[test]
fn observe_saturates_logical_time_at_u64_max() {
let mut h = ObservedHorizon::new();
h.logical_time = u64::MAX;
h.observe(0xAAAA, 1);
assert_eq!(
h.logical_time(),
u64::MAX,
"saturating_add must clamp at u64::MAX, not wrap to 0"
);
assert_eq!(h.get(0xAAAA), Some(1));
}
#[test]
fn test_observe_max_only() {
let mut h = ObservedHorizon::new();
h.observe(0xAAAA, 10);
h.observe(0xAAAA, 5); assert_eq!(h.get(0xAAAA), Some(10));
h.observe(0xAAAA, 15); assert_eq!(h.get(0xAAAA), Some(15));
}
#[test]
fn test_merge() {
let mut h1 = ObservedHorizon::new();
h1.observe(0xAAAA, 10);
h1.observe(0xBBBB, 5);
let mut h2 = ObservedHorizon::new();
h2.observe(0xAAAA, 8);
h2.observe(0xCCCC, 20);
h1.merge(&h2);
assert_eq!(h1.get(0xAAAA), Some(10)); assert_eq!(h1.get(0xBBBB), Some(5));
assert_eq!(h1.get(0xCCCC), Some(20));
}
#[test]
fn test_encode_nonzero() {
let mut h = ObservedHorizon::new();
h.observe(0xAAAA, 42);
let encoded = h.encode();
assert_ne!(encoded, 0);
}
#[test]
fn test_might_contain() {
let mut h = ObservedHorizon::new();
h.observe(0xAAAA, 10);
h.observe(0xBBBB, 20);
let encoded = h.encode();
assert!(HorizonEncoder::might_contain(encoded, 0xAAAA));
assert!(HorizonEncoder::might_contain(encoded, 0xBBBB));
assert!(!HorizonEncoder::might_contain(0, 0xAAAA));
}
#[test]
fn test_potentially_concurrent() {
let mut h_a = ObservedHorizon::new();
h_a.observe(0xAAAA, 10); let enc_a = h_a.encode();
let mut h_b = ObservedHorizon::new();
h_b.observe(0xBBBB, 10); let enc_b = h_b.encode();
assert!(HorizonEncoder::potentially_concurrent(
enc_a, 0xBBBB, enc_b, 0xAAAA
));
h_a.observe(0xBBBB, 10);
let enc_a2 = h_a.encode();
assert!(!HorizonEncoder::potentially_concurrent(
enc_a2, 0xBBBB, enc_b, 0xAAAA
));
}
#[test]
fn might_contain_has_no_false_negatives_for_inserted_origins() {
let mut h = ObservedHorizon::new();
let origins: Vec<u64> = (0..50u64)
.map(|i| 0xDEAD_0000_u64 ^ i.wrapping_mul(0x9E37_79B1))
.collect();
for &o in &origins {
h.observe(o, 1);
}
let encoded = h.encode();
for &o in &origins {
assert!(
HorizonEncoder::might_contain(encoded, o),
"every inserted origin (here 0x{o:016X}) must report `true` — bloom invariant",
);
}
}
#[test]
fn might_contain_fpr_at_n_8_is_well_below_pre_fix_saturation() {
let mut h = ObservedHorizon::new();
let inserted: Vec<u64> = (0..8u64)
.map(|i| 0x1000_0000_u64.wrapping_add(i.wrapping_mul(0xC0FF_EEAB)))
.collect();
for &o in &inserted {
h.observe(o, 1);
}
let encoded = h.encode();
let mut false_positives = 0;
let trials = 1000;
for i in 0..trials {
let probe = 0xBEEF_0000_u64.wrapping_add((i as u64).wrapping_mul(0x9E37_79B9));
if inserted.contains(&probe) {
continue;
}
if HorizonEncoder::might_contain(encoded, probe) {
false_positives += 1;
}
}
let fpr = (false_positives as f64) / (trials as f64);
assert!(
fpr < 0.25,
"FPR at n=8 should be well under the pre-fix 16-bit-bloom \
saturation (~57 % at n=8); observed {:.1} % over {} trials, \
expected analytically ~13 %",
fpr * 100.0,
trials,
);
}
#[test]
fn bloom_sets_at_least_one_bit_per_insert() {
let mut h = ObservedHorizon::new();
h.observe(0xAAAA_BBBB, 1);
let encoded = h.encode();
assert_ne!(encoded, 0, "single insert must set at least one bit");
let bits = encoded.count_ones();
assert!(
(1..=3).contains(&bits),
"single insert sets between 1 and 3 bits (got {bits})",
);
}
#[test]
fn might_contain_fpr_at_documented_ceiling_stays_under_60_percent() {
let mut h = ObservedHorizon::new();
let inserted: Vec<u64> = (0..16u64)
.map(|i| 0x2000_0000_u64.wrapping_add(i.wrapping_mul(0x9E37_79B9)))
.collect();
for &o in &inserted {
h.observe(o, 1);
}
let encoded = h.encode();
let mut false_positives = 0;
let trials = 1000;
for i in 0..trials {
let probe = 0xFEED_0000_u64.wrapping_add((i as u64).wrapping_mul(0xC0FF_EEAB));
if inserted.contains(&probe) {
continue;
}
if HorizonEncoder::might_contain(encoded, probe) {
false_positives += 1;
}
}
let fpr = (false_positives as f64) / (trials as f64);
assert!(
fpr < 0.60,
"FPR at the documented n=16 ceiling should stay under 60 % \
(analytical ~44 %); observed {:.1} % over {} trials",
fpr * 100.0,
trials,
);
}
#[test]
fn disjoint_small_horizons_recognize_concurrency() {
let candidates: Vec<u64> = (0..32u64)
.map(|i| 0x3000_0000_u64.wrapping_add(i.wrapping_mul(0xDEAD_BEEF)))
.collect();
for split in 0..3 {
let base = split * 8;
let left_origins = &candidates[base..base + 4];
let right_origins = &candidates[base + 4..base + 8];
let mut left = ObservedHorizon::new();
for &o in left_origins {
left.observe(o, 1);
}
let mut right = ObservedHorizon::new();
for &o in right_origins {
right.observe(o, 1);
}
let enc_l = left.encode();
let enc_r = right.encode();
let mut left_false_positives = 0;
let mut right_false_positives = 0;
for (i, &o) in right_origins.iter().enumerate() {
if HorizonEncoder::might_contain(enc_l, o) {
left_false_positives += 1;
}
let l = left_origins[i];
if HorizonEncoder::might_contain(enc_r, l) {
right_false_positives += 1;
}
}
assert!(
left_false_positives <= 1 && right_false_positives <= 1,
"split {split}: at n=4 each, expected ≤1 false-positive \
cross-origin probe in either direction, got \
left={left_false_positives} right={right_false_positives}",
);
let any_concurrent =
left_origins
.iter()
.zip(right_origins.iter())
.any(|(&origin_a, &origin_b)| {
HorizonEncoder::potentially_concurrent(enc_l, origin_b, enc_r, origin_a)
});
assert!(
any_concurrent,
"split {split}: at least one cross-origin pair must report \
potentially_concurrent for disjoint small horizons",
);
}
}
#[test]
fn encoded_horizon_round_trips_through_causal_link_wire_format() {
use crate::adapter::net::state::causal::CausalLink;
let mut h = ObservedHorizon::new();
for i in 0..8u64 {
h.observe(
0x4000_0000_u64.wrapping_add(i.wrapping_mul(0xCAFE_F00D)),
i + 1,
);
}
let encoded = h.encode();
assert_ne!(encoded, 0);
let bits = encoded.count_ones();
assert!((1..64).contains(&bits));
let link = CausalLink::genesis(0xDEAD_BEEF, encoded);
let bytes = link.to_bytes();
let parsed = CausalLink::from_bytes(&bytes).unwrap();
assert_eq!(
parsed.horizon_encoded, encoded,
"horizon_encoded must round-trip exactly through CausalLink wire format",
);
for (&origin, _) in h.iter() {
assert!(
HorizonEncoder::might_contain(parsed.horizon_encoded, origin),
"round-tripped horizon must still recognize inserted origin 0x{origin:016X}",
);
}
}
#[test]
fn bloom_does_not_collapse_to_one_bit_for_typical_origins() {
let mut single_bit_count = 0;
for i in 0..256u64 {
let mut h = ObservedHorizon::new();
h.observe(i, 1);
let encoded = h.encode();
if encoded.count_ones() == 1 {
single_bit_count += 1;
}
}
assert!(
single_bit_count <= 1,
"{single_bit_count} of 256 origins encoded to a single-bit \
bloom — Kirsch-Mitzenmacher odd-h2 invariant likely broken",
);
}
}