use std::collections::{BTreeSet, HashMap, HashSet};
use fsqlite_error::{FrankenError, Result};
use tracing::{debug, error, info, warn};
const BEAD_ID: &str = "bd-1hi.19";
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ReplicationRole {
Leader,
Follower,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
pub enum ReplicationMode {
#[default]
LeaderCommitClock,
MultiWriter,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct ObjectId([u8; 16]);
impl ObjectId {
#[must_use]
pub const fn from_bytes(b: [u8; 16]) -> Self {
Self(b)
}
#[must_use]
pub const fn as_bytes(&self) -> &[u8; 16] {
&self.0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ReplicatedObjectKind {
CommitCapsule,
CommitMarker,
IndexSegment,
ReadWitness,
WriteWitness,
WitnessDelta,
WitnessIndexSegment,
DependencyEdge,
CommitProof,
AbortWitness,
MergeWitness,
CheckpointChunk,
SnapshotManifest,
DecodeProof,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CommitMarker {
pub commit_seq: u64,
pub capsule_id: ObjectId,
pub timestamp_ns: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct IdempotencyKey([u8; 16]);
impl IdempotencyKey {
#[must_use]
pub fn from_marker(marker: &CommitMarker) -> Self {
let mut hasher = blake3::Hasher::new();
hasher.update(b"fsqlite:repl:idempotency:v1");
hasher.update(&marker.commit_seq.to_le_bytes());
hasher.update(marker.capsule_id.as_bytes());
let hash = hasher.finalize();
let mut out = [0_u8; 16];
out.copy_from_slice(&hash.as_bytes()[..16]);
Self(out)
}
}
#[derive(Debug, Default)]
pub struct CommitDeduplicator {
seen: HashSet<IdempotencyKey>,
}
impl CommitDeduplicator {
pub fn should_accept(&mut self, marker: &CommitMarker) -> bool {
self.seen.insert(IdempotencyKey::from_marker(marker))
}
#[must_use]
pub fn seen_count(&self) -> usize {
self.seen.len()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReplicaTip {
pub root_manifest_id: ObjectId,
pub marker_position: u64,
pub index_segment_tips: Vec<ObjectId>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MissingObjects {
pub needed: BTreeSet<ObjectId>,
pub to_offer: BTreeSet<ObjectId>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AntiEntropyPhase {
ExchangeTips,
ComputeMissing,
RequestSymbols,
StreamUntilDecode,
PersistAndUpdate,
Complete,
}
#[derive(Debug)]
pub struct AntiEntropySession {
phase: AntiEntropyPhase,
local_tip: Option<ReplicaTip>,
remote_tip: Option<ReplicaTip>,
missing: Option<MissingObjects>,
decoded_objects: HashSet<ObjectId>,
}
impl AntiEntropySession {
#[must_use]
pub fn new() -> Self {
debug!(bead_id = BEAD_ID, "starting anti-entropy session");
Self {
phase: AntiEntropyPhase::ExchangeTips,
local_tip: None,
remote_tip: None,
missing: None,
decoded_objects: HashSet::new(),
}
}
#[must_use]
pub const fn phase(&self) -> AntiEntropyPhase {
self.phase
}
pub fn exchange_tips(&mut self, local: ReplicaTip, remote: ReplicaTip) -> Result<()> {
if self.phase != AntiEntropyPhase::ExchangeTips {
return Err(FrankenError::Internal(format!(
"anti-entropy: expected ExchangeTips, got {:?}",
self.phase
)));
}
debug!(
bead_id = BEAD_ID,
local_pos = local.marker_position,
remote_pos = remote.marker_position,
"exchanged tips"
);
self.local_tip = Some(local);
self.remote_tip = Some(remote);
self.phase = AntiEntropyPhase::ComputeMissing;
Ok(())
}
pub fn compute_missing(
&mut self,
local_objects: &BTreeSet<ObjectId>,
remote_objects: &BTreeSet<ObjectId>,
) -> Result<&MissingObjects> {
if self.phase != AntiEntropyPhase::ComputeMissing {
return Err(FrankenError::Internal(format!(
"anti-entropy: expected ComputeMissing, got {:?}",
self.phase
)));
}
let needed: BTreeSet<ObjectId> =
remote_objects.difference(local_objects).copied().collect();
let to_offer: BTreeSet<ObjectId> =
local_objects.difference(remote_objects).copied().collect();
debug!(
bead_id = BEAD_ID,
needed_count = needed.len(),
to_offer_count = to_offer.len(),
"computed missing objects"
);
self.missing = Some(MissingObjects { needed, to_offer });
self.phase = AntiEntropyPhase::RequestSymbols;
Ok(self.missing.as_ref().expect("just set"))
}
#[must_use]
pub fn objects_to_request(&self) -> Option<&BTreeSet<ObjectId>> {
self.missing.as_ref().map(|m| &m.needed)
}
pub fn record_decoded(&mut self, object_id: ObjectId) -> Result<()> {
if self.phase != AntiEntropyPhase::RequestSymbols
&& self.phase != AntiEntropyPhase::StreamUntilDecode
{
return Err(FrankenError::Internal(format!(
"anti-entropy: expected RequestSymbols/StreamUntilDecode, got {:?}",
self.phase
)));
}
self.phase = AntiEntropyPhase::StreamUntilDecode;
self.decoded_objects.insert(object_id);
if let Some(missing) = &self.missing {
if missing
.needed
.iter()
.all(|id| self.decoded_objects.contains(id))
{
debug!(
bead_id = BEAD_ID,
decoded_count = self.decoded_objects.len(),
"all missing objects decoded"
);
self.phase = AntiEntropyPhase::PersistAndUpdate;
}
}
Ok(())
}
pub fn finalize(&mut self) -> Result<()> {
if self.phase != AntiEntropyPhase::PersistAndUpdate {
return Err(FrankenError::Internal(format!(
"anti-entropy: expected PersistAndUpdate, got {:?}",
self.phase
)));
}
info!(
bead_id = BEAD_ID,
decoded_count = self.decoded_objects.len(),
"anti-entropy session complete — persisted"
);
self.phase = AntiEntropyPhase::Complete;
Ok(())
}
#[must_use]
pub const fn is_converged(&self) -> bool {
matches!(self.phase, AntiEntropyPhase::Complete)
}
}
impl Default for AntiEntropySession {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct QuorumPolicy {
pub required: u32,
pub total: u32,
}
impl QuorumPolicy {
#[must_use]
pub const fn local_only() -> Self {
Self {
required: 1,
total: 1,
}
}
#[must_use]
pub const fn two_of_three() -> Self {
Self {
required: 2,
total: 3,
}
}
pub fn new(required: u32, total: u32) -> Result<Self> {
if required == 0 || required > total {
return Err(FrankenError::Internal(format!(
"invalid quorum: required={required}, total={total}"
)));
}
Ok(Self { required, total })
}
}
#[derive(Debug)]
pub struct QuorumTracker {
policy: QuorumPolicy,
accepted: HashSet<u32>,
}
impl QuorumTracker {
#[must_use]
pub fn new(policy: QuorumPolicy) -> Self {
Self {
policy,
accepted: HashSet::new(),
}
}
pub fn record_acceptance(&mut self, store_id: u32) {
self.accepted.insert(store_id);
debug!(
bead_id = BEAD_ID,
store_id,
accepted = self.accepted.len(),
required = self.policy.required,
"store accepted symbols"
);
}
#[must_use]
#[allow(clippy::cast_possible_truncation)]
pub fn is_satisfied(&self) -> bool {
self.accepted.len() as u32 >= self.policy.required
}
#[must_use]
pub fn accepted_count(&self) -> usize {
self.accepted.len()
}
#[must_use]
pub const fn policy(&self) -> &QuorumPolicy {
&self.policy
}
}
#[derive(Debug, Clone)]
pub struct ConsistentHashRing {
ring: Vec<(u64, u32)>,
vnodes: u32,
}
impl ConsistentHashRing {
#[must_use]
pub fn new(node_ids: &[u32], vnodes: u32) -> Self {
let mut ring = Vec::with_capacity(node_ids.len() * vnodes as usize);
for &nid in node_ids {
for v in 0..vnodes {
let hash = Self::hash_vnode(nid, v);
ring.push((hash, nid));
}
}
ring.sort_unstable_by_key(|&(h, _)| h);
Self { ring, vnodes }
}
#[must_use]
pub fn route(&self, object_id: &ObjectId, esi: u32) -> Option<u32> {
if self.ring.is_empty() {
return None;
}
let key = Self::hash_symbol(object_id, esi);
let idx = self.ring.partition_point(|&(h, _)| h < key);
let idx = if idx >= self.ring.len() { 0 } else { idx };
Some(self.ring[idx].1)
}
#[must_use]
pub fn add_node(&mut self, node_id: u32) -> Self {
let mut node_ids: BTreeSet<u32> = self.ring.iter().map(|&(_, n)| n).collect();
node_ids.insert(node_id);
let ids: Vec<u32> = node_ids.into_iter().collect();
Self::new(&ids, self.vnodes)
}
#[must_use]
pub fn node_count(&self) -> usize {
let nodes: HashSet<u32> = self.ring.iter().map(|&(_, n)| n).collect();
nodes.len()
}
fn hash_vnode(node_id: u32, vnode: u32) -> u64 {
let mut buf = [0u8; 8];
buf[..4].copy_from_slice(&node_id.to_le_bytes());
buf[4..8].copy_from_slice(&vnode.to_le_bytes());
xxhash_rust::xxh3::xxh3_64(&buf)
}
fn hash_symbol(object_id: &ObjectId, esi: u32) -> u64 {
let mut buf = [0u8; 20];
buf[..16].copy_from_slice(object_id.as_bytes());
buf[16..20].copy_from_slice(&esi.to_le_bytes());
xxhash_rust::xxh3::xxh3_64(&buf)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AuthenticatedSymbol {
pub object_id: ObjectId,
pub esi: u32,
pub data: Vec<u8>,
pub auth_tag: [u8; 16],
}
impl AuthenticatedSymbol {
#[must_use]
pub fn compute_auth_tag(object_id: &ObjectId, esi: u32, data: &[u8]) -> [u8; 16] {
let mut hasher = blake3::Hasher::new();
hasher.update(b"fsqlite:repl:auth:v1");
hasher.update(object_id.as_bytes());
hasher.update(&esi.to_le_bytes());
hasher.update(data);
let hash = hasher.finalize();
let mut tag = [0u8; 16];
tag.copy_from_slice(&hash.as_bytes()[..16]);
tag
}
#[must_use]
pub fn verify(&self) -> bool {
let expected = Self::compute_auth_tag(&self.object_id, self.esi, &self.data);
self.auth_tag == expected
}
#[must_use]
pub fn new(object_id: ObjectId, esi: u32, data: Vec<u8>) -> Self {
let auth_tag = Self::compute_auth_tag(&object_id, esi, &data);
Self {
object_id,
esi,
data,
auth_tag,
}
}
}
#[derive(Debug, Clone)]
pub struct ReplicationConfig {
pub role: ReplicationRole,
pub mode: ReplicationMode,
pub quorum: QuorumPolicy,
pub security_enabled: bool,
pub multi_writer_explicit: bool,
}
impl Default for ReplicationConfig {
fn default() -> Self {
Self {
role: ReplicationRole::Leader,
mode: ReplicationMode::LeaderCommitClock,
quorum: QuorumPolicy::local_only(),
security_enabled: false,
multi_writer_explicit: false,
}
}
}
pub fn validate_config(config: &ReplicationConfig) -> Result<()> {
if config.mode == ReplicationMode::MultiWriter && !config.multi_writer_explicit {
error!(
bead_id = BEAD_ID,
"multi-writer mode requires explicit configuration"
);
return Err(FrankenError::Internal(
"multi-writer replication mode requires explicit opt-in via multi_writer_explicit=true"
.into(),
));
}
info!(
bead_id = BEAD_ID,
role = ?config.role,
mode = ?config.mode,
quorum_required = config.quorum.required,
quorum_total = config.quorum.total,
security = config.security_enabled,
"replication config validated"
);
Ok(())
}
#[derive(Debug)]
pub struct CommitPublicationGate {
tracker: QuorumTracker,
marker: Option<CommitMarker>,
published: bool,
}
impl CommitPublicationGate {
#[must_use]
pub fn new(marker: CommitMarker, policy: QuorumPolicy) -> Self {
Self {
tracker: QuorumTracker::new(policy),
marker: Some(marker),
published: false,
}
}
pub fn record_store_acceptance(&mut self, store_id: u32) {
self.tracker.record_acceptance(store_id);
}
pub fn try_publish(&mut self) -> Option<&CommitMarker> {
if self.published {
return self.marker.as_ref();
}
if self.tracker.is_satisfied() {
self.published = true;
info!(
bead_id = BEAD_ID,
accepted = self.tracker.accepted_count(),
required = self.tracker.policy().required,
"quorum satisfied — marker published"
);
self.marker.as_ref()
} else {
warn!(
bead_id = BEAD_ID,
accepted = self.tracker.accepted_count(),
required = self.tracker.policy().required,
"quorum not yet satisfied — marker withheld"
);
None
}
}
#[must_use]
pub const fn is_published(&self) -> bool {
self.published
}
}
pub fn filter_authenticated_symbols(
symbols: &[AuthenticatedSymbol],
) -> (Vec<&AuthenticatedSymbol>, Vec<&AuthenticatedSymbol>) {
let mut accepted = Vec::new();
let mut rejected = Vec::new();
for sym in symbols {
if sym.verify() {
accepted.push(sym);
} else {
debug!(
bead_id = BEAD_ID,
esi = sym.esi,
"rejected unauthenticated symbol"
);
rejected.push(sym);
}
}
if !rejected.is_empty() {
warn!(
bead_id = BEAD_ID,
rejected_count = rejected.len(),
accepted_count = accepted.len(),
"filtered out unauthenticated symbols"
);
}
(accepted, rejected)
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TraceEvent {
pub node_id: u32,
pub commit_seq: u64,
pub object_id: ObjectId,
pub event_type: TraceEventType,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum TraceEventType {
Published,
Received,
Applied,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SheafCheckResult {
pub is_consistent: bool,
pub anomalies: Vec<SheafAnomaly>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SheafAnomaly {
pub description: String,
pub commit_seq: u64,
pub involved_nodes: Vec<u32>,
}
#[must_use]
pub fn sheaf_consistency_check(events: &[TraceEvent]) -> SheafCheckResult {
let mut node_events: HashMap<(u64, u32), HashSet<TraceEventType>> = HashMap::new();
for ev in events {
node_events
.entry((ev.commit_seq, ev.node_id))
.or_default()
.insert(ev.event_type);
}
let commit_seqs: BTreeSet<u64> = events.iter().map(|e| e.commit_seq).collect();
let all_nodes: BTreeSet<u32> = events.iter().map(|e| e.node_id).collect();
let mut anomalies = Vec::new();
for &seq in &commit_seqs {
let has_end_to_end = all_nodes.iter().any(|&nid| {
let key = (seq, nid);
if let Some(types) = node_events.get(&key) {
types.contains(&TraceEventType::Published)
&& types.contains(&TraceEventType::Applied)
} else {
false
}
});
if !has_end_to_end {
let involved: Vec<u32> = all_nodes
.iter()
.filter(|&&nid| node_events.contains_key(&(seq, nid)))
.copied()
.collect();
if !involved.is_empty() {
anomalies.push(SheafAnomaly {
description: format!(
"phantom commit at seq {seq}: no single node has both Published and Applied"
),
commit_seq: seq,
involved_nodes: involved,
});
}
}
}
let is_consistent = anomalies.is_empty();
if is_consistent {
debug!(
bead_id = BEAD_ID,
commit_count = commit_seqs.len(),
"sheaf consistency check passed"
);
} else {
warn!(
bead_id = BEAD_ID,
anomaly_count = anomalies.len(),
"sheaf consistency check found anomalies"
);
}
SheafCheckResult {
is_consistent,
anomalies,
}
}
#[must_use]
pub fn export_tla_trace(events: &[TraceEvent]) -> String {
use std::fmt::Write;
let mut out = String::new();
let _ = writeln!(out, "---- MODULE ReplicationTrace ----");
let _ = writeln!(out, "EXTENDS Integers, Sequences, FiniteSets");
let _ = writeln!(out);
let _ = writeln!(out, "VARIABLES committed, applied");
let _ = writeln!(out);
let _ = writeln!(out, "Init ==");
let _ = writeln!(out, " /\\ committed = {{}}");
let _ = writeln!(out, " /\\ applied = {{}}");
let _ = writeln!(out);
for (i, ev) in events.iter().enumerate() {
let _ = writeln!(
out,
"\\* Step {i}: node={}, seq={}",
ev.node_id, ev.commit_seq
);
match ev.event_type {
TraceEventType::Published => {
let _ = writeln!(
out,
"Step{i} == committed' = committed \\cup {{{}}}",
ev.commit_seq
);
}
TraceEventType::Applied => {
let _ = writeln!(
out,
"Step{i} == applied' = applied \\cup {{{}}}",
ev.commit_seq
);
}
TraceEventType::Received => {
let _ = writeln!(out, "\\* Received event (no state change in this model)");
}
}
let _ = writeln!(out);
}
let _ = writeln!(out, "====");
out
}
#[cfg(test)]
#[allow(clippy::too_many_lines)]
mod tests {
use super::*;
fn make_oid(seed: u8) -> ObjectId {
let mut b = [0u8; 16];
b[0] = seed;
ObjectId::from_bytes(b)
}
#[test]
fn test_bd_1hi_19_unit_compliance_gate() {
assert_eq!(BEAD_ID, "bd-1hi.19");
let _ = ReplicationRole::Leader;
let _ = ReplicationRole::Follower;
let _ = ReplicationMode::LeaderCommitClock;
let _ = ReplicationMode::MultiWriter;
let _ = AntiEntropyPhase::ExchangeTips;
let _ = QuorumPolicy::local_only();
}
#[test]
fn prop_bd_1hi_19_structure_compliance() {
let mut session = AntiEntropySession::new();
assert_eq!(session.phase(), AntiEntropyPhase::ExchangeTips);
let local = ReplicaTip {
root_manifest_id: make_oid(1),
marker_position: 10,
index_segment_tips: vec![],
};
let remote = ReplicaTip {
root_manifest_id: make_oid(2),
marker_position: 12,
index_segment_tips: vec![],
};
session.exchange_tips(local, remote).unwrap();
assert_eq!(session.phase(), AntiEntropyPhase::ComputeMissing);
}
#[test]
fn test_e2e_bd_1hi_19_compliance() {
let config = ReplicationConfig::default();
validate_config(&config).unwrap();
let mut session = AntiEntropySession::new();
let local = ReplicaTip {
root_manifest_id: make_oid(1),
marker_position: 5,
index_segment_tips: vec![],
};
let remote = ReplicaTip {
root_manifest_id: make_oid(2),
marker_position: 7,
index_segment_tips: vec![],
};
session.exchange_tips(local, remote).unwrap();
let local_objects: BTreeSet<ObjectId> = [make_oid(10), make_oid(20)].into();
let remote_objects: BTreeSet<ObjectId> = [make_oid(20), make_oid(30)].into();
let missing = session
.compute_missing(&local_objects, &remote_objects)
.unwrap();
assert!(missing.needed.contains(&make_oid(30)));
session.record_decoded(make_oid(30)).unwrap();
assert_eq!(session.phase(), AntiEntropyPhase::PersistAndUpdate);
session.finalize().unwrap();
assert!(session.is_converged());
}
#[test]
fn test_leader_follower_replication() {
let config = ReplicationConfig {
role: ReplicationRole::Leader,
mode: ReplicationMode::LeaderCommitClock,
..Default::default()
};
validate_config(&config).unwrap();
let follower_config = ReplicationConfig {
role: ReplicationRole::Follower,
mode: ReplicationMode::LeaderCommitClock,
..Default::default()
};
validate_config(&follower_config).unwrap();
}
#[test]
fn test_anti_entropy_exchange_tips() {
let mut session = AntiEntropySession::new();
let local = ReplicaTip {
root_manifest_id: make_oid(1),
marker_position: 10,
index_segment_tips: vec![make_oid(100)],
};
let remote = ReplicaTip {
root_manifest_id: make_oid(2),
marker_position: 15,
index_segment_tips: vec![make_oid(200)],
};
session.exchange_tips(local, remote).unwrap();
assert_eq!(session.phase(), AntiEntropyPhase::ComputeMissing);
}
#[test]
fn test_anti_entropy_compute_missing() {
let mut session = AntiEntropySession::new();
session
.exchange_tips(
ReplicaTip {
root_manifest_id: make_oid(1),
marker_position: 0,
index_segment_tips: vec![],
},
ReplicaTip {
root_manifest_id: make_oid(2),
marker_position: 0,
index_segment_tips: vec![],
},
)
.unwrap();
let local: BTreeSet<ObjectId> = [make_oid(1), make_oid(2), make_oid(3)].into();
let remote: BTreeSet<ObjectId> = [make_oid(2), make_oid(3), make_oid(4)].into();
let missing = session.compute_missing(&local, &remote).unwrap();
assert_eq!(missing.needed, [make_oid(4)].into());
assert_eq!(missing.to_offer, [make_oid(1)].into());
}
#[test]
fn test_anti_entropy_stream_until_decode() {
let mut session = AntiEntropySession::new();
session
.exchange_tips(
ReplicaTip {
root_manifest_id: make_oid(1),
marker_position: 0,
index_segment_tips: vec![],
},
ReplicaTip {
root_manifest_id: make_oid(2),
marker_position: 0,
index_segment_tips: vec![],
},
)
.unwrap();
let local: BTreeSet<ObjectId> = [make_oid(1)].into();
let remote: BTreeSet<ObjectId> = [make_oid(1), make_oid(2), make_oid(3)].into();
session.compute_missing(&local, &remote).unwrap();
session.record_decoded(make_oid(2)).unwrap();
assert_eq!(session.phase(), AntiEntropyPhase::StreamUntilDecode);
session.record_decoded(make_oid(3)).unwrap();
assert_eq!(session.phase(), AntiEntropyPhase::PersistAndUpdate);
}
#[test]
fn test_anti_entropy_convergence() {
let mut session = AntiEntropySession::new();
session
.exchange_tips(
ReplicaTip {
root_manifest_id: make_oid(1),
marker_position: 0,
index_segment_tips: vec![],
},
ReplicaTip {
root_manifest_id: make_oid(2),
marker_position: 0,
index_segment_tips: vec![],
},
)
.unwrap();
let local: BTreeSet<ObjectId> = [make_oid(1), make_oid(2)].into();
let remote: BTreeSet<ObjectId> = [make_oid(2), make_oid(3)].into();
session.compute_missing(&local, &remote).unwrap();
session.record_decoded(make_oid(3)).unwrap();
session.finalize().unwrap();
assert!(session.is_converged());
}
#[test]
fn test_quorum_local_only() {
let policy = QuorumPolicy::local_only();
let mut tracker = QuorumTracker::new(policy);
assert!(!tracker.is_satisfied());
tracker.record_acceptance(0);
assert!(tracker.is_satisfied());
}
#[test]
fn test_quorum_2_of_3() {
let policy = QuorumPolicy::two_of_three();
let mut tracker = QuorumTracker::new(policy);
assert!(!tracker.is_satisfied());
tracker.record_acceptance(0);
assert!(!tracker.is_satisfied()); tracker.record_acceptance(1);
assert!(tracker.is_satisfied()); tracker.record_acceptance(2);
assert!(tracker.is_satisfied()); }
#[test]
fn test_quorum_blocks_marker_publication() {
let marker = CommitMarker {
commit_seq: 42,
capsule_id: make_oid(10),
timestamp_ns: 1_000_000,
};
let policy = QuorumPolicy::two_of_three();
let mut gate = CommitPublicationGate::new(marker, policy);
assert!(!gate.is_published());
assert!(gate.try_publish().is_none());
gate.record_store_acceptance(0);
assert!(gate.try_publish().is_none());
gate.record_store_acceptance(1);
let published = gate.try_publish();
assert!(published.is_some());
assert_eq!(published.unwrap().commit_seq, 42);
assert!(gate.is_published());
}
#[test]
fn test_symbol_routing_consistent_hash() {
let ring = ConsistentHashRing::new(&[1, 2, 3], 100);
assert_eq!(ring.node_count(), 3);
let oid = make_oid(42);
let node = ring.route(&oid, 0).unwrap();
assert!([1, 2, 3].contains(&node));
let node2 = ring.route(&oid, 0).unwrap();
assert_eq!(node, node2);
}
#[test]
fn test_symbol_routing_add_node_minimal_reroute() {
let mut ring3 = ConsistentHashRing::new(&[1, 2, 3], 100);
let ring4 = ring3.add_node(4);
assert_eq!(ring4.node_count(), 4);
let oid = make_oid(1);
let mut rerouted = 0_u32;
for esi in 0..1000 {
let n3 = ring3.route(&oid, esi).unwrap();
let n4 = ring4.route(&oid, esi).unwrap();
if n3 != n4 {
rerouted += 1;
}
}
assert!(rerouted < 500, "too many reroutes: {rerouted}/1000");
}
#[test]
fn test_authenticated_symbols_verified() {
let sym = AuthenticatedSymbol::new(make_oid(1), 0, vec![1, 2, 3]);
assert!(sym.verify());
let mut bad = sym.clone();
bad.data[0] = 99;
assert!(!bad.verify());
let mut bad2 = sym;
bad2.auth_tag[0] ^= 0xFF;
assert!(!bad2.verify());
}
#[test]
fn test_unauthenticated_fallback() {
let good1 = AuthenticatedSymbol::new(make_oid(1), 0, vec![10, 20]);
let good2 = AuthenticatedSymbol::new(make_oid(1), 1, vec![30, 40]);
let mut bad = AuthenticatedSymbol::new(make_oid(1), 2, vec![50, 60]);
bad.auth_tag[0] ^= 0xFF;
let all = [good1, good2, bad];
let (accepted, rejected) = filter_authenticated_symbols(&all);
assert_eq!(accepted.len(), 2);
assert_eq!(rejected.len(), 1);
assert_eq!(rejected[0].esi, 2);
}
#[test]
fn test_sheaf_consistency_check_clean() {
let events = vec![
TraceEvent {
node_id: 1,
commit_seq: 1,
object_id: make_oid(10),
event_type: TraceEventType::Published,
},
TraceEvent {
node_id: 1,
commit_seq: 1,
object_id: make_oid(10),
event_type: TraceEventType::Applied,
},
];
let result = sheaf_consistency_check(&events);
assert!(result.is_consistent);
assert!(result.anomalies.is_empty());
}
#[test]
fn test_sheaf_consistency_check_phantom() {
let events = vec![
TraceEvent {
node_id: 1,
commit_seq: 1,
object_id: make_oid(10),
event_type: TraceEventType::Published,
},
TraceEvent {
node_id: 2,
commit_seq: 1,
object_id: make_oid(10),
event_type: TraceEventType::Applied,
},
];
let result = sheaf_consistency_check(&events);
assert!(!result.is_consistent);
assert_eq!(result.anomalies.len(), 1);
assert_eq!(result.anomalies[0].commit_seq, 1);
}
#[test]
fn test_tla_export() {
let events = vec![
TraceEvent {
node_id: 1,
commit_seq: 1,
object_id: make_oid(10),
event_type: TraceEventType::Published,
},
TraceEvent {
node_id: 2,
commit_seq: 1,
object_id: make_oid(10),
event_type: TraceEventType::Applied,
},
];
let tla = export_tla_trace(&events);
assert!(tla.contains("MODULE ReplicationTrace"));
assert!(tla.contains("committed"));
assert!(tla.contains("applied"));
assert!(tla.contains("===="));
}
#[test]
fn test_multiwriter_not_default() {
let config = ReplicationConfig {
mode: ReplicationMode::MultiWriter,
multi_writer_explicit: false,
..Default::default()
};
let result = validate_config(&config);
assert!(result.is_err());
}
#[test]
fn test_multiwriter_explicit_ok() {
let config = ReplicationConfig {
mode: ReplicationMode::MultiWriter,
multi_writer_explicit: true,
..Default::default()
};
validate_config(&config).unwrap();
}
#[test]
fn prop_anti_entropy_convergence() {
for seed in 0..20_u8 {
let local: BTreeSet<ObjectId> = (0..seed).map(|i| make_oid(i * 2)).collect();
let remote: BTreeSet<ObjectId> = (0..seed).map(|i| make_oid(i * 2 + 1)).collect();
let mut session = AntiEntropySession::new();
session
.exchange_tips(
ReplicaTip {
root_manifest_id: make_oid(100),
marker_position: 0,
index_segment_tips: vec![],
},
ReplicaTip {
root_manifest_id: make_oid(200),
marker_position: 0,
index_segment_tips: vec![],
},
)
.unwrap();
let missing = session.compute_missing(&local, &remote).unwrap();
for &oid in &missing.needed.clone() {
session.record_decoded(oid).unwrap();
}
if session.phase() == AntiEntropyPhase::PersistAndUpdate {
session.finalize().unwrap();
}
assert!(
session.is_converged() || session.phase() == AntiEntropyPhase::RequestSymbols,
"failed to converge for seed={seed}"
);
}
}
#[test]
fn prop_quorum_safety() {
for m in 1..=5_u32 {
for n in m..=5 {
let policy = QuorumPolicy::new(m, n).unwrap();
let mut tracker = QuorumTracker::new(policy);
for i in 0..m - 1 {
tracker.record_acceptance(i);
assert!(
!tracker.is_satisfied(),
"should not be satisfied with {} of {} (need {})",
i + 1,
n,
m
);
}
tracker.record_acceptance(m - 1);
assert!(
tracker.is_satisfied(),
"should be satisfied with {m} of {n}"
);
}
}
}
#[test]
fn test_ecs_replication_ordering() {
let markers = [
CommitMarker {
commit_seq: 1,
capsule_id: make_oid(1),
timestamp_ns: 100,
},
CommitMarker {
commit_seq: 2,
capsule_id: make_oid(2),
timestamp_ns: 200,
},
CommitMarker {
commit_seq: 3,
capsule_id: make_oid(3),
timestamp_ns: 300,
},
];
for w in markers.windows(2) {
assert!(w[0].commit_seq < w[1].commit_seq);
}
}
#[test]
fn test_ecs_replication_commit_capsules() {
let local_capsules: BTreeSet<ObjectId> = [make_oid(1), make_oid(2)].into();
let remote_capsules: BTreeSet<ObjectId> = [make_oid(1), make_oid(2), make_oid(3)].into();
let mut session = AntiEntropySession::new();
session
.exchange_tips(
ReplicaTip {
root_manifest_id: make_oid(10),
marker_position: 1,
index_segment_tips: vec![],
},
ReplicaTip {
root_manifest_id: make_oid(11),
marker_position: 2,
index_segment_tips: vec![],
},
)
.unwrap();
let missing = session
.compute_missing(&local_capsules, &remote_capsules)
.unwrap();
assert_eq!(missing.needed, [make_oid(3)].into());
}
#[test]
fn test_ecs_replication_dedup() {
let marker = CommitMarker {
commit_seq: 77,
capsule_id: make_oid(9),
timestamp_ns: 1_234,
};
let mut dedup = CommitDeduplicator::default();
assert!(dedup.should_accept(&marker));
assert!(!dedup.should_accept(&marker));
assert_eq!(dedup.seen_count(), 1);
}
#[test]
fn test_e2e_3_node_replication() {
let mut leader_objects: BTreeSet<ObjectId> = BTreeSet::new();
for i in 0..10_u8 {
leader_objects.insert(make_oid(i));
}
let follower1_objects: BTreeSet<ObjectId> = BTreeSet::new();
let mut session = AntiEntropySession::new();
session
.exchange_tips(
ReplicaTip {
root_manifest_id: make_oid(0),
marker_position: 0,
index_segment_tips: vec![],
},
ReplicaTip {
root_manifest_id: make_oid(9),
marker_position: 10,
index_segment_tips: vec![],
},
)
.unwrap();
let missing = session
.compute_missing(&follower1_objects, &leader_objects)
.unwrap();
assert_eq!(missing.needed.len(), 10);
for &oid in &missing.needed.clone() {
session.record_decoded(oid).unwrap();
}
session.finalize().unwrap();
assert!(session.is_converged());
}
#[test]
fn test_e2e_node_failure_recovery() {
let policy = QuorumPolicy::two_of_three();
let marker = CommitMarker {
commit_seq: 1,
capsule_id: make_oid(1),
timestamp_ns: 1000,
};
let mut gate = CommitPublicationGate::new(marker, policy);
gate.record_store_acceptance(0);
gate.record_store_acceptance(2);
assert!(gate.try_publish().is_some());
}
#[test]
fn test_e2e_lossy_replication_convergence() {
fn delivered_with_loss(oid: &ObjectId, round: u32, loss_per_mille: u64) -> bool {
let mut material = [0_u8; 20];
material[..16].copy_from_slice(oid.as_bytes());
material[16..].copy_from_slice(&round.to_le_bytes());
xxhash_rust::xxh3::xxh3_64(&material) % 1000 >= loss_per_mille
}
let leader_objects: BTreeSet<ObjectId> = (0_u8..100).map(make_oid).collect();
let mut follower_objects: BTreeSet<ObjectId> = BTreeSet::new();
for round in 0_u32..32 {
if follower_objects == leader_objects {
break;
}
let mut session = AntiEntropySession::new();
session
.exchange_tips(
ReplicaTip {
root_manifest_id: make_oid(1),
marker_position: follower_objects.len() as u64,
index_segment_tips: vec![],
},
ReplicaTip {
root_manifest_id: make_oid(2),
marker_position: leader_objects.len() as u64,
index_segment_tips: vec![],
},
)
.unwrap();
let missing = session
.compute_missing(&follower_objects, &leader_objects)
.unwrap()
.needed
.clone();
for oid in missing {
if delivered_with_loss(&oid, round, 100) {
session.record_decoded(oid).unwrap();
follower_objects.insert(oid);
}
}
if session.phase() == AntiEntropyPhase::PersistAndUpdate {
session.finalize().unwrap();
}
}
assert_eq!(follower_objects, leader_objects);
}
}