1use std::collections::{BTreeSet, HashMap, HashSet};
7
8use fsqlite_error::{FrankenError, Result};
9use tracing::{debug, error, info, warn};
10
11const BEAD_ID: &str = "bd-1hi.19";
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
23pub enum ReplicationRole {
24 Leader,
26 Follower,
28}
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
32pub enum ReplicationMode {
33 #[default]
35 LeaderCommitClock,
36 MultiWriter,
38}
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
46pub struct ObjectId([u8; 16]);
47
48impl ObjectId {
49 #[must_use]
50 pub const fn from_bytes(b: [u8; 16]) -> Self {
51 Self(b)
52 }
53
54 #[must_use]
55 pub const fn as_bytes(&self) -> &[u8; 16] {
56 &self.0
57 }
58}
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
62pub enum ReplicatedObjectKind {
63 CommitCapsule,
64 CommitMarker,
65 IndexSegment,
66 ReadWitness,
67 WriteWitness,
68 WitnessDelta,
69 WitnessIndexSegment,
70 DependencyEdge,
71 CommitProof,
72 AbortWitness,
73 MergeWitness,
74 CheckpointChunk,
75 SnapshotManifest,
76 DecodeProof,
77}
78
79#[derive(Debug, Clone, PartialEq, Eq)]
81pub struct CommitMarker {
82 pub commit_seq: u64,
83 pub capsule_id: ObjectId,
84 pub timestamp_ns: u64,
85}
86
87#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
89pub struct IdempotencyKey([u8; 16]);
90
91impl IdempotencyKey {
92 #[must_use]
94 pub fn from_marker(marker: &CommitMarker) -> Self {
95 let mut hasher = blake3::Hasher::new();
96 hasher.update(b"fsqlite:repl:idempotency:v1");
97 hasher.update(&marker.commit_seq.to_le_bytes());
98 hasher.update(marker.capsule_id.as_bytes());
99 let hash = hasher.finalize();
100 let mut out = [0_u8; 16];
101 out.copy_from_slice(&hash.as_bytes()[..16]);
102 Self(out)
103 }
104}
105
106#[derive(Debug, Default)]
108pub struct CommitDeduplicator {
109 seen: HashSet<IdempotencyKey>,
110}
111
112impl CommitDeduplicator {
113 pub fn should_accept(&mut self, marker: &CommitMarker) -> bool {
115 self.seen.insert(IdempotencyKey::from_marker(marker))
116 }
117
118 #[must_use]
120 pub fn seen_count(&self) -> usize {
121 self.seen.len()
122 }
123}
124
125#[derive(Debug, Clone, PartialEq, Eq)]
131pub struct ReplicaTip {
132 pub root_manifest_id: ObjectId,
134 pub marker_position: u64,
136 pub index_segment_tips: Vec<ObjectId>,
138}
139
140#[derive(Debug, Clone, PartialEq, Eq)]
142pub struct MissingObjects {
143 pub needed: BTreeSet<ObjectId>,
145 pub to_offer: BTreeSet<ObjectId>,
147}
148
149#[derive(Debug, Clone, Copy, PartialEq, Eq)]
151pub enum AntiEntropyPhase {
152 ExchangeTips,
154 ComputeMissing,
156 RequestSymbols,
158 StreamUntilDecode,
160 PersistAndUpdate,
162 Complete,
164}
165
166#[derive(Debug)]
168pub struct AntiEntropySession {
169 phase: AntiEntropyPhase,
170 local_tip: Option<ReplicaTip>,
171 remote_tip: Option<ReplicaTip>,
172 missing: Option<MissingObjects>,
173 decoded_objects: HashSet<ObjectId>,
174}
175
176impl AntiEntropySession {
177 #[must_use]
179 pub fn new() -> Self {
180 debug!(bead_id = BEAD_ID, "starting anti-entropy session");
181 Self {
182 phase: AntiEntropyPhase::ExchangeTips,
183 local_tip: None,
184 remote_tip: None,
185 missing: None,
186 decoded_objects: HashSet::new(),
187 }
188 }
189
190 #[must_use]
192 pub const fn phase(&self) -> AntiEntropyPhase {
193 self.phase
194 }
195
196 pub fn exchange_tips(&mut self, local: ReplicaTip, remote: ReplicaTip) -> Result<()> {
198 if self.phase != AntiEntropyPhase::ExchangeTips {
199 return Err(FrankenError::Internal(format!(
200 "anti-entropy: expected ExchangeTips, got {:?}",
201 self.phase
202 )));
203 }
204 debug!(
205 bead_id = BEAD_ID,
206 local_pos = local.marker_position,
207 remote_pos = remote.marker_position,
208 "exchanged tips"
209 );
210 self.local_tip = Some(local);
211 self.remote_tip = Some(remote);
212 self.phase = AntiEntropyPhase::ComputeMissing;
213 Ok(())
214 }
215
216 pub fn compute_missing(
218 &mut self,
219 local_objects: &BTreeSet<ObjectId>,
220 remote_objects: &BTreeSet<ObjectId>,
221 ) -> Result<&MissingObjects> {
222 if self.phase != AntiEntropyPhase::ComputeMissing {
223 return Err(FrankenError::Internal(format!(
224 "anti-entropy: expected ComputeMissing, got {:?}",
225 self.phase
226 )));
227 }
228
229 let needed: BTreeSet<ObjectId> =
230 remote_objects.difference(local_objects).copied().collect();
231 let to_offer: BTreeSet<ObjectId> =
232 local_objects.difference(remote_objects).copied().collect();
233
234 debug!(
235 bead_id = BEAD_ID,
236 needed_count = needed.len(),
237 to_offer_count = to_offer.len(),
238 "computed missing objects"
239 );
240
241 self.missing = Some(MissingObjects { needed, to_offer });
242 self.phase = AntiEntropyPhase::RequestSymbols;
243 Ok(self.missing.as_ref().expect("just set"))
244 }
245
246 #[must_use]
248 pub fn objects_to_request(&self) -> Option<&BTreeSet<ObjectId>> {
249 self.missing.as_ref().map(|m| &m.needed)
250 }
251
252 pub fn record_decoded(&mut self, object_id: ObjectId) -> Result<()> {
254 if self.phase != AntiEntropyPhase::RequestSymbols
255 && self.phase != AntiEntropyPhase::StreamUntilDecode
256 {
257 return Err(FrankenError::Internal(format!(
258 "anti-entropy: expected RequestSymbols/StreamUntilDecode, got {:?}",
259 self.phase
260 )));
261 }
262 self.phase = AntiEntropyPhase::StreamUntilDecode;
263 self.decoded_objects.insert(object_id);
264
265 if let Some(missing) = &self.missing {
267 if missing
268 .needed
269 .iter()
270 .all(|id| self.decoded_objects.contains(id))
271 {
272 debug!(
273 bead_id = BEAD_ID,
274 decoded_count = self.decoded_objects.len(),
275 "all missing objects decoded"
276 );
277 self.phase = AntiEntropyPhase::PersistAndUpdate;
278 }
279 }
280 Ok(())
281 }
282
283 pub fn finalize(&mut self) -> Result<()> {
285 if self.phase != AntiEntropyPhase::PersistAndUpdate {
286 return Err(FrankenError::Internal(format!(
287 "anti-entropy: expected PersistAndUpdate, got {:?}",
288 self.phase
289 )));
290 }
291 info!(
292 bead_id = BEAD_ID,
293 decoded_count = self.decoded_objects.len(),
294 "anti-entropy session complete — persisted"
295 );
296 self.phase = AntiEntropyPhase::Complete;
297 Ok(())
298 }
299
300 #[must_use]
302 pub const fn is_converged(&self) -> bool {
303 matches!(self.phase, AntiEntropyPhase::Complete)
304 }
305}
306
307impl Default for AntiEntropySession {
308 fn default() -> Self {
309 Self::new()
310 }
311}
312
313#[derive(Debug, Clone, PartialEq, Eq)]
319pub struct QuorumPolicy {
320 pub required: u32,
322 pub total: u32,
324}
325
326impl QuorumPolicy {
327 #[must_use]
329 pub const fn local_only() -> Self {
330 Self {
331 required: 1,
332 total: 1,
333 }
334 }
335
336 #[must_use]
338 pub const fn two_of_three() -> Self {
339 Self {
340 required: 2,
341 total: 3,
342 }
343 }
344
345 pub fn new(required: u32, total: u32) -> Result<Self> {
347 if required == 0 || required > total {
348 return Err(FrankenError::Internal(format!(
349 "invalid quorum: required={required}, total={total}"
350 )));
351 }
352 Ok(Self { required, total })
353 }
354}
355
356#[derive(Debug)]
358pub struct QuorumTracker {
359 policy: QuorumPolicy,
360 accepted: HashSet<u32>,
361}
362
363impl QuorumTracker {
364 #[must_use]
366 pub fn new(policy: QuorumPolicy) -> Self {
367 Self {
368 policy,
369 accepted: HashSet::new(),
370 }
371 }
372
373 pub fn record_acceptance(&mut self, store_id: u32) {
375 self.accepted.insert(store_id);
376 debug!(
377 bead_id = BEAD_ID,
378 store_id,
379 accepted = self.accepted.len(),
380 required = self.policy.required,
381 "store accepted symbols"
382 );
383 }
384
385 #[must_use]
387 #[allow(clippy::cast_possible_truncation)]
388 pub fn is_satisfied(&self) -> bool {
389 self.accepted.len() as u32 >= self.policy.required
390 }
391
392 #[must_use]
394 pub fn accepted_count(&self) -> usize {
395 self.accepted.len()
396 }
397
398 #[must_use]
400 pub const fn policy(&self) -> &QuorumPolicy {
401 &self.policy
402 }
403}
404
405#[derive(Debug, Clone)]
411pub struct ConsistentHashRing {
412 ring: Vec<(u64, u32)>,
414 vnodes: u32,
416}
417
418impl ConsistentHashRing {
419 #[must_use]
421 pub fn new(node_ids: &[u32], vnodes: u32) -> Self {
422 let mut ring = Vec::with_capacity(node_ids.len() * vnodes as usize);
423 for &nid in node_ids {
424 for v in 0..vnodes {
425 let hash = Self::hash_vnode(nid, v);
426 ring.push((hash, nid));
427 }
428 }
429 ring.sort_unstable_by_key(|&(h, _)| h);
430 Self { ring, vnodes }
431 }
432
433 #[must_use]
435 pub fn route(&self, object_id: &ObjectId, esi: u32) -> Option<u32> {
436 if self.ring.is_empty() {
437 return None;
438 }
439 let key = Self::hash_symbol(object_id, esi);
440 let idx = self.ring.partition_point(|&(h, _)| h < key);
442 let idx = if idx >= self.ring.len() { 0 } else { idx };
443 Some(self.ring[idx].1)
444 }
445
446 #[must_use]
448 pub fn add_node(&mut self, node_id: u32) -> Self {
449 let mut node_ids: BTreeSet<u32> = self.ring.iter().map(|&(_, n)| n).collect();
450 node_ids.insert(node_id);
451 let ids: Vec<u32> = node_ids.into_iter().collect();
452 Self::new(&ids, self.vnodes)
453 }
454
455 #[must_use]
457 pub fn node_count(&self) -> usize {
458 let nodes: HashSet<u32> = self.ring.iter().map(|&(_, n)| n).collect();
459 nodes.len()
460 }
461
462 fn hash_vnode(node_id: u32, vnode: u32) -> u64 {
463 let mut buf = [0u8; 8];
464 buf[..4].copy_from_slice(&node_id.to_le_bytes());
465 buf[4..8].copy_from_slice(&vnode.to_le_bytes());
466 xxhash_rust::xxh3::xxh3_64(&buf)
467 }
468
469 fn hash_symbol(object_id: &ObjectId, esi: u32) -> u64 {
470 let mut buf = [0u8; 20];
471 buf[..16].copy_from_slice(object_id.as_bytes());
472 buf[16..20].copy_from_slice(&esi.to_le_bytes());
473 xxhash_rust::xxh3::xxh3_64(&buf)
474 }
475}
476
477#[derive(Debug, Clone, PartialEq, Eq)]
483pub struct AuthenticatedSymbol {
484 pub object_id: ObjectId,
485 pub esi: u32,
486 pub data: Vec<u8>,
487 pub auth_tag: [u8; 16],
489}
490
491impl AuthenticatedSymbol {
492 #[must_use]
494 pub fn compute_auth_tag(object_id: &ObjectId, esi: u32, data: &[u8]) -> [u8; 16] {
495 let mut hasher = blake3::Hasher::new();
496 hasher.update(b"fsqlite:repl:auth:v1");
497 hasher.update(object_id.as_bytes());
498 hasher.update(&esi.to_le_bytes());
499 hasher.update(data);
500 let hash = hasher.finalize();
501 let mut tag = [0u8; 16];
502 tag.copy_from_slice(&hash.as_bytes()[..16]);
503 tag
504 }
505
506 #[must_use]
508 pub fn verify(&self) -> bool {
509 let expected = Self::compute_auth_tag(&self.object_id, self.esi, &self.data);
510 self.auth_tag == expected
511 }
512
513 #[must_use]
515 pub fn new(object_id: ObjectId, esi: u32, data: Vec<u8>) -> Self {
516 let auth_tag = Self::compute_auth_tag(&object_id, esi, &data);
517 Self {
518 object_id,
519 esi,
520 data,
521 auth_tag,
522 }
523 }
524}
525
526#[derive(Debug, Clone)]
532pub struct ReplicationConfig {
533 pub role: ReplicationRole,
534 pub mode: ReplicationMode,
535 pub quorum: QuorumPolicy,
536 pub security_enabled: bool,
537 pub multi_writer_explicit: bool,
538}
539
540impl Default for ReplicationConfig {
541 fn default() -> Self {
542 Self {
543 role: ReplicationRole::Leader,
544 mode: ReplicationMode::LeaderCommitClock,
545 quorum: QuorumPolicy::local_only(),
546 security_enabled: false,
547 multi_writer_explicit: false,
548 }
549 }
550}
551
552pub fn validate_config(config: &ReplicationConfig) -> Result<()> {
554 if config.mode == ReplicationMode::MultiWriter && !config.multi_writer_explicit {
555 error!(
556 bead_id = BEAD_ID,
557 "multi-writer mode requires explicit configuration"
558 );
559 return Err(FrankenError::Internal(
560 "multi-writer replication mode requires explicit opt-in via multi_writer_explicit=true"
561 .into(),
562 ));
563 }
564 info!(
565 bead_id = BEAD_ID,
566 role = ?config.role,
567 mode = ?config.mode,
568 quorum_required = config.quorum.required,
569 quorum_total = config.quorum.total,
570 security = config.security_enabled,
571 "replication config validated"
572 );
573 Ok(())
574}
575
576#[derive(Debug)]
583pub struct CommitPublicationGate {
584 tracker: QuorumTracker,
585 marker: Option<CommitMarker>,
586 published: bool,
587}
588
589impl CommitPublicationGate {
590 #[must_use]
592 pub fn new(marker: CommitMarker, policy: QuorumPolicy) -> Self {
593 Self {
594 tracker: QuorumTracker::new(policy),
595 marker: Some(marker),
596 published: false,
597 }
598 }
599
600 pub fn record_store_acceptance(&mut self, store_id: u32) {
602 self.tracker.record_acceptance(store_id);
603 }
604
605 pub fn try_publish(&mut self) -> Option<&CommitMarker> {
608 if self.published {
609 return self.marker.as_ref();
610 }
611 if self.tracker.is_satisfied() {
612 self.published = true;
613 info!(
614 bead_id = BEAD_ID,
615 accepted = self.tracker.accepted_count(),
616 required = self.tracker.policy().required,
617 "quorum satisfied — marker published"
618 );
619 self.marker.as_ref()
620 } else {
621 warn!(
622 bead_id = BEAD_ID,
623 accepted = self.tracker.accepted_count(),
624 required = self.tracker.policy().required,
625 "quorum not yet satisfied — marker withheld"
626 );
627 None
628 }
629 }
630
631 #[must_use]
633 pub const fn is_published(&self) -> bool {
634 self.published
635 }
636}
637
638pub fn filter_authenticated_symbols(
644 symbols: &[AuthenticatedSymbol],
645) -> (Vec<&AuthenticatedSymbol>, Vec<&AuthenticatedSymbol>) {
646 let mut accepted = Vec::new();
647 let mut rejected = Vec::new();
648
649 for sym in symbols {
650 if sym.verify() {
651 accepted.push(sym);
652 } else {
653 debug!(
654 bead_id = BEAD_ID,
655 esi = sym.esi,
656 "rejected unauthenticated symbol"
657 );
658 rejected.push(sym);
659 }
660 }
661
662 if !rejected.is_empty() {
663 warn!(
664 bead_id = BEAD_ID,
665 rejected_count = rejected.len(),
666 accepted_count = accepted.len(),
667 "filtered out unauthenticated symbols"
668 );
669 }
670
671 (accepted, rejected)
672}
673
674#[derive(Debug, Clone, PartialEq, Eq)]
680pub struct TraceEvent {
681 pub node_id: u32,
682 pub commit_seq: u64,
683 pub object_id: ObjectId,
684 pub event_type: TraceEventType,
685}
686
687#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
689pub enum TraceEventType {
690 Published,
691 Received,
692 Applied,
693}
694
695#[derive(Debug, Clone, PartialEq, Eq)]
697pub struct SheafCheckResult {
698 pub is_consistent: bool,
699 pub anomalies: Vec<SheafAnomaly>,
700}
701
702#[derive(Debug, Clone, PartialEq, Eq)]
704pub struct SheafAnomaly {
705 pub description: String,
706 pub commit_seq: u64,
707 pub involved_nodes: Vec<u32>,
708}
709
710#[must_use]
715pub fn sheaf_consistency_check(events: &[TraceEvent]) -> SheafCheckResult {
716 let mut node_events: HashMap<(u64, u32), HashSet<TraceEventType>> = HashMap::new();
718 for ev in events {
719 node_events
720 .entry((ev.commit_seq, ev.node_id))
721 .or_default()
722 .insert(ev.event_type);
723 }
724
725 let commit_seqs: BTreeSet<u64> = events.iter().map(|e| e.commit_seq).collect();
727 let all_nodes: BTreeSet<u32> = events.iter().map(|e| e.node_id).collect();
728
729 let mut anomalies = Vec::new();
730
731 for &seq in &commit_seqs {
732 let has_end_to_end = all_nodes.iter().any(|&nid| {
734 let key = (seq, nid);
735 if let Some(types) = node_events.get(&key) {
736 types.contains(&TraceEventType::Published)
737 && types.contains(&TraceEventType::Applied)
738 } else {
739 false
740 }
741 });
742
743 if !has_end_to_end {
744 let involved: Vec<u32> = all_nodes
746 .iter()
747 .filter(|&&nid| node_events.contains_key(&(seq, nid)))
748 .copied()
749 .collect();
750
751 if !involved.is_empty() {
752 anomalies.push(SheafAnomaly {
753 description: format!(
754 "phantom commit at seq {seq}: no single node has both Published and Applied"
755 ),
756 commit_seq: seq,
757 involved_nodes: involved,
758 });
759 }
760 }
761 }
762
763 let is_consistent = anomalies.is_empty();
764
765 if is_consistent {
766 debug!(
767 bead_id = BEAD_ID,
768 commit_count = commit_seqs.len(),
769 "sheaf consistency check passed"
770 );
771 } else {
772 warn!(
773 bead_id = BEAD_ID,
774 anomaly_count = anomalies.len(),
775 "sheaf consistency check found anomalies"
776 );
777 }
778
779 SheafCheckResult {
780 is_consistent,
781 anomalies,
782 }
783}
784
785#[must_use]
791pub fn export_tla_trace(events: &[TraceEvent]) -> String {
792 use std::fmt::Write;
793 let mut out = String::new();
794 let _ = writeln!(out, "---- MODULE ReplicationTrace ----");
795 let _ = writeln!(out, "EXTENDS Integers, Sequences, FiniteSets");
796 let _ = writeln!(out);
797 let _ = writeln!(out, "VARIABLES committed, applied");
798 let _ = writeln!(out);
799 let _ = writeln!(out, "Init ==");
800 let _ = writeln!(out, " /\\ committed = {{}}");
801 let _ = writeln!(out, " /\\ applied = {{}}");
802 let _ = writeln!(out);
803
804 for (i, ev) in events.iter().enumerate() {
805 let _ = writeln!(
806 out,
807 "\\* Step {i}: node={}, seq={}",
808 ev.node_id, ev.commit_seq
809 );
810 match ev.event_type {
811 TraceEventType::Published => {
812 let _ = writeln!(
813 out,
814 "Step{i} == committed' = committed \\cup {{{}}}",
815 ev.commit_seq
816 );
817 }
818 TraceEventType::Applied => {
819 let _ = writeln!(
820 out,
821 "Step{i} == applied' = applied \\cup {{{}}}",
822 ev.commit_seq
823 );
824 }
825 TraceEventType::Received => {
826 let _ = writeln!(out, "\\* Received event (no state change in this model)");
827 }
828 }
829 let _ = writeln!(out);
830 }
831
832 let _ = writeln!(out, "====");
833 out
834}
835
836#[cfg(test)]
841#[allow(clippy::too_many_lines)]
842mod tests {
843 use super::*;
844
845 fn make_oid(seed: u8) -> ObjectId {
846 let mut b = [0u8; 16];
847 b[0] = seed;
848 ObjectId::from_bytes(b)
849 }
850
851 #[test]
854 fn test_bd_1hi_19_unit_compliance_gate() {
855 assert_eq!(BEAD_ID, "bd-1hi.19");
856 let _ = ReplicationRole::Leader;
858 let _ = ReplicationRole::Follower;
859 let _ = ReplicationMode::LeaderCommitClock;
860 let _ = ReplicationMode::MultiWriter;
861 let _ = AntiEntropyPhase::ExchangeTips;
862 let _ = QuorumPolicy::local_only();
863 }
864
865 #[test]
866 fn prop_bd_1hi_19_structure_compliance() {
867 let mut session = AntiEntropySession::new();
869 assert_eq!(session.phase(), AntiEntropyPhase::ExchangeTips);
870
871 let local = ReplicaTip {
872 root_manifest_id: make_oid(1),
873 marker_position: 10,
874 index_segment_tips: vec![],
875 };
876 let remote = ReplicaTip {
877 root_manifest_id: make_oid(2),
878 marker_position: 12,
879 index_segment_tips: vec![],
880 };
881 session.exchange_tips(local, remote).unwrap();
882 assert_eq!(session.phase(), AntiEntropyPhase::ComputeMissing);
883 }
884
885 #[test]
886 fn test_e2e_bd_1hi_19_compliance() {
887 let config = ReplicationConfig::default();
889 validate_config(&config).unwrap();
890
891 let mut session = AntiEntropySession::new();
892 let local = ReplicaTip {
893 root_manifest_id: make_oid(1),
894 marker_position: 5,
895 index_segment_tips: vec![],
896 };
897 let remote = ReplicaTip {
898 root_manifest_id: make_oid(2),
899 marker_position: 7,
900 index_segment_tips: vec![],
901 };
902 session.exchange_tips(local, remote).unwrap();
903
904 let local_objects: BTreeSet<ObjectId> = [make_oid(10), make_oid(20)].into();
905 let remote_objects: BTreeSet<ObjectId> = [make_oid(20), make_oid(30)].into();
906 let missing = session
907 .compute_missing(&local_objects, &remote_objects)
908 .unwrap();
909 assert!(missing.needed.contains(&make_oid(30)));
910
911 session.record_decoded(make_oid(30)).unwrap();
912 assert_eq!(session.phase(), AntiEntropyPhase::PersistAndUpdate);
913 session.finalize().unwrap();
914 assert!(session.is_converged());
915 }
916
917 #[test]
920 fn test_leader_follower_replication() {
921 let config = ReplicationConfig {
922 role: ReplicationRole::Leader,
923 mode: ReplicationMode::LeaderCommitClock,
924 ..Default::default()
925 };
926 validate_config(&config).unwrap();
927
928 let follower_config = ReplicationConfig {
929 role: ReplicationRole::Follower,
930 mode: ReplicationMode::LeaderCommitClock,
931 ..Default::default()
932 };
933 validate_config(&follower_config).unwrap();
934 }
935
936 #[test]
939 fn test_anti_entropy_exchange_tips() {
940 let mut session = AntiEntropySession::new();
941 let local = ReplicaTip {
942 root_manifest_id: make_oid(1),
943 marker_position: 10,
944 index_segment_tips: vec![make_oid(100)],
945 };
946 let remote = ReplicaTip {
947 root_manifest_id: make_oid(2),
948 marker_position: 15,
949 index_segment_tips: vec![make_oid(200)],
950 };
951 session.exchange_tips(local, remote).unwrap();
952 assert_eq!(session.phase(), AntiEntropyPhase::ComputeMissing);
953 }
954
955 #[test]
956 fn test_anti_entropy_compute_missing() {
957 let mut session = AntiEntropySession::new();
958 session
959 .exchange_tips(
960 ReplicaTip {
961 root_manifest_id: make_oid(1),
962 marker_position: 0,
963 index_segment_tips: vec![],
964 },
965 ReplicaTip {
966 root_manifest_id: make_oid(2),
967 marker_position: 0,
968 index_segment_tips: vec![],
969 },
970 )
971 .unwrap();
972
973 let local: BTreeSet<ObjectId> = [make_oid(1), make_oid(2), make_oid(3)].into();
974 let remote: BTreeSet<ObjectId> = [make_oid(2), make_oid(3), make_oid(4)].into();
975
976 let missing = session.compute_missing(&local, &remote).unwrap();
977 assert_eq!(missing.needed, [make_oid(4)].into());
978 assert_eq!(missing.to_offer, [make_oid(1)].into());
979 }
980
981 #[test]
982 fn test_anti_entropy_stream_until_decode() {
983 let mut session = AntiEntropySession::new();
984 session
985 .exchange_tips(
986 ReplicaTip {
987 root_manifest_id: make_oid(1),
988 marker_position: 0,
989 index_segment_tips: vec![],
990 },
991 ReplicaTip {
992 root_manifest_id: make_oid(2),
993 marker_position: 0,
994 index_segment_tips: vec![],
995 },
996 )
997 .unwrap();
998
999 let local: BTreeSet<ObjectId> = [make_oid(1)].into();
1000 let remote: BTreeSet<ObjectId> = [make_oid(1), make_oid(2), make_oid(3)].into();
1001 session.compute_missing(&local, &remote).unwrap();
1002
1003 session.record_decoded(make_oid(2)).unwrap();
1005 assert_eq!(session.phase(), AntiEntropyPhase::StreamUntilDecode);
1006 session.record_decoded(make_oid(3)).unwrap();
1007 assert_eq!(session.phase(), AntiEntropyPhase::PersistAndUpdate);
1008 }
1009
1010 #[test]
1011 fn test_anti_entropy_convergence() {
1012 let mut session = AntiEntropySession::new();
1013 session
1014 .exchange_tips(
1015 ReplicaTip {
1016 root_manifest_id: make_oid(1),
1017 marker_position: 0,
1018 index_segment_tips: vec![],
1019 },
1020 ReplicaTip {
1021 root_manifest_id: make_oid(2),
1022 marker_position: 0,
1023 index_segment_tips: vec![],
1024 },
1025 )
1026 .unwrap();
1027
1028 let local: BTreeSet<ObjectId> = [make_oid(1), make_oid(2)].into();
1029 let remote: BTreeSet<ObjectId> = [make_oid(2), make_oid(3)].into();
1030 session.compute_missing(&local, &remote).unwrap();
1031 session.record_decoded(make_oid(3)).unwrap();
1032 session.finalize().unwrap();
1033 assert!(session.is_converged());
1034 }
1035
1036 #[test]
1039 fn test_quorum_local_only() {
1040 let policy = QuorumPolicy::local_only();
1041 let mut tracker = QuorumTracker::new(policy);
1042 assert!(!tracker.is_satisfied());
1043 tracker.record_acceptance(0);
1044 assert!(tracker.is_satisfied());
1045 }
1046
1047 #[test]
1048 fn test_quorum_2_of_3() {
1049 let policy = QuorumPolicy::two_of_three();
1050 let mut tracker = QuorumTracker::new(policy);
1051 assert!(!tracker.is_satisfied());
1052 tracker.record_acceptance(0);
1053 assert!(!tracker.is_satisfied()); tracker.record_acceptance(1);
1055 assert!(tracker.is_satisfied()); tracker.record_acceptance(2);
1057 assert!(tracker.is_satisfied()); }
1059
1060 #[test]
1061 fn test_quorum_blocks_marker_publication() {
1062 let marker = CommitMarker {
1063 commit_seq: 42,
1064 capsule_id: make_oid(10),
1065 timestamp_ns: 1_000_000,
1066 };
1067 let policy = QuorumPolicy::two_of_three();
1068 let mut gate = CommitPublicationGate::new(marker, policy);
1069
1070 assert!(!gate.is_published());
1072 assert!(gate.try_publish().is_none());
1073
1074 gate.record_store_acceptance(0);
1075 assert!(gate.try_publish().is_none()); gate.record_store_acceptance(1);
1078 let published = gate.try_publish();
1079 assert!(published.is_some());
1080 assert_eq!(published.unwrap().commit_seq, 42);
1081 assert!(gate.is_published());
1082 }
1083
1084 #[test]
1087 fn test_symbol_routing_consistent_hash() {
1088 let ring = ConsistentHashRing::new(&[1, 2, 3], 100);
1089 assert_eq!(ring.node_count(), 3);
1090
1091 let oid = make_oid(42);
1092 let node = ring.route(&oid, 0).unwrap();
1093 assert!([1, 2, 3].contains(&node));
1094
1095 let node2 = ring.route(&oid, 0).unwrap();
1097 assert_eq!(node, node2);
1098 }
1099
1100 #[test]
1101 fn test_symbol_routing_add_node_minimal_reroute() {
1102 let mut ring3 = ConsistentHashRing::new(&[1, 2, 3], 100);
1103 let ring4 = ring3.add_node(4);
1104 assert_eq!(ring4.node_count(), 4);
1105
1106 let oid = make_oid(1);
1108 let mut rerouted = 0_u32;
1109 for esi in 0..1000 {
1110 let n3 = ring3.route(&oid, esi).unwrap();
1111 let n4 = ring4.route(&oid, esi).unwrap();
1112 if n3 != n4 {
1113 rerouted += 1;
1114 }
1115 }
1116 assert!(rerouted < 500, "too many reroutes: {rerouted}/1000");
1119 }
1120
1121 #[test]
1124 fn test_authenticated_symbols_verified() {
1125 let sym = AuthenticatedSymbol::new(make_oid(1), 0, vec![1, 2, 3]);
1126 assert!(sym.verify());
1127
1128 let mut bad = sym.clone();
1130 bad.data[0] = 99;
1131 assert!(!bad.verify());
1132
1133 let mut bad2 = sym;
1135 bad2.auth_tag[0] ^= 0xFF;
1136 assert!(!bad2.verify());
1137 }
1138
1139 #[test]
1140 fn test_unauthenticated_fallback() {
1141 let good1 = AuthenticatedSymbol::new(make_oid(1), 0, vec![10, 20]);
1142 let good2 = AuthenticatedSymbol::new(make_oid(1), 1, vec![30, 40]);
1143 let mut bad = AuthenticatedSymbol::new(make_oid(1), 2, vec![50, 60]);
1144 bad.auth_tag[0] ^= 0xFF; let all = [good1, good2, bad];
1147 let (accepted, rejected) = filter_authenticated_symbols(&all);
1148 assert_eq!(accepted.len(), 2);
1149 assert_eq!(rejected.len(), 1);
1150 assert_eq!(rejected[0].esi, 2);
1151 }
1152
1153 #[test]
1156 fn test_sheaf_consistency_check_clean() {
1157 let events = vec![
1158 TraceEvent {
1159 node_id: 1,
1160 commit_seq: 1,
1161 object_id: make_oid(10),
1162 event_type: TraceEventType::Published,
1163 },
1164 TraceEvent {
1165 node_id: 1,
1166 commit_seq: 1,
1167 object_id: make_oid(10),
1168 event_type: TraceEventType::Applied,
1169 },
1170 ];
1171 let result = sheaf_consistency_check(&events);
1172 assert!(result.is_consistent);
1173 assert!(result.anomalies.is_empty());
1174 }
1175
1176 #[test]
1177 fn test_sheaf_consistency_check_phantom() {
1178 let events = vec![
1180 TraceEvent {
1181 node_id: 1,
1182 commit_seq: 1,
1183 object_id: make_oid(10),
1184 event_type: TraceEventType::Published,
1185 },
1186 TraceEvent {
1187 node_id: 2,
1188 commit_seq: 1,
1189 object_id: make_oid(10),
1190 event_type: TraceEventType::Applied,
1191 },
1192 ];
1193 let result = sheaf_consistency_check(&events);
1194 assert!(!result.is_consistent);
1195 assert_eq!(result.anomalies.len(), 1);
1196 assert_eq!(result.anomalies[0].commit_seq, 1);
1197 }
1198
1199 #[test]
1202 fn test_tla_export() {
1203 let events = vec![
1204 TraceEvent {
1205 node_id: 1,
1206 commit_seq: 1,
1207 object_id: make_oid(10),
1208 event_type: TraceEventType::Published,
1209 },
1210 TraceEvent {
1211 node_id: 2,
1212 commit_seq: 1,
1213 object_id: make_oid(10),
1214 event_type: TraceEventType::Applied,
1215 },
1216 ];
1217 let tla = export_tla_trace(&events);
1218 assert!(tla.contains("MODULE ReplicationTrace"));
1219 assert!(tla.contains("committed"));
1220 assert!(tla.contains("applied"));
1221 assert!(tla.contains("===="));
1222 }
1223
1224 #[test]
1227 fn test_multiwriter_not_default() {
1228 let config = ReplicationConfig {
1229 mode: ReplicationMode::MultiWriter,
1230 multi_writer_explicit: false,
1231 ..Default::default()
1232 };
1233 let result = validate_config(&config);
1234 assert!(result.is_err());
1235 }
1236
1237 #[test]
1238 fn test_multiwriter_explicit_ok() {
1239 let config = ReplicationConfig {
1240 mode: ReplicationMode::MultiWriter,
1241 multi_writer_explicit: true,
1242 ..Default::default()
1243 };
1244 validate_config(&config).unwrap();
1245 }
1246
1247 #[test]
1250 fn prop_anti_entropy_convergence() {
1251 for seed in 0..20_u8 {
1253 let local: BTreeSet<ObjectId> = (0..seed).map(|i| make_oid(i * 2)).collect();
1254 let remote: BTreeSet<ObjectId> = (0..seed).map(|i| make_oid(i * 2 + 1)).collect();
1255
1256 let mut session = AntiEntropySession::new();
1257 session
1258 .exchange_tips(
1259 ReplicaTip {
1260 root_manifest_id: make_oid(100),
1261 marker_position: 0,
1262 index_segment_tips: vec![],
1263 },
1264 ReplicaTip {
1265 root_manifest_id: make_oid(200),
1266 marker_position: 0,
1267 index_segment_tips: vec![],
1268 },
1269 )
1270 .unwrap();
1271 let missing = session.compute_missing(&local, &remote).unwrap();
1272 for &oid in &missing.needed.clone() {
1273 session.record_decoded(oid).unwrap();
1274 }
1275 if session.phase() == AntiEntropyPhase::PersistAndUpdate {
1276 session.finalize().unwrap();
1277 }
1278 assert!(
1280 session.is_converged() || session.phase() == AntiEntropyPhase::RequestSymbols,
1281 "failed to converge for seed={seed}"
1282 );
1283 }
1284 }
1285
1286 #[test]
1287 fn prop_quorum_safety() {
1288 for m in 1..=5_u32 {
1290 for n in m..=5 {
1291 let policy = QuorumPolicy::new(m, n).unwrap();
1292 let mut tracker = QuorumTracker::new(policy);
1293 for i in 0..m - 1 {
1294 tracker.record_acceptance(i);
1295 assert!(
1296 !tracker.is_satisfied(),
1297 "should not be satisfied with {} of {} (need {})",
1298 i + 1,
1299 n,
1300 m
1301 );
1302 }
1303 tracker.record_acceptance(m - 1);
1304 assert!(
1305 tracker.is_satisfied(),
1306 "should be satisfied with {m} of {n}"
1307 );
1308 }
1309 }
1310 }
1311
1312 #[test]
1315 fn test_ecs_replication_ordering() {
1316 let markers = [
1318 CommitMarker {
1319 commit_seq: 1,
1320 capsule_id: make_oid(1),
1321 timestamp_ns: 100,
1322 },
1323 CommitMarker {
1324 commit_seq: 2,
1325 capsule_id: make_oid(2),
1326 timestamp_ns: 200,
1327 },
1328 CommitMarker {
1329 commit_seq: 3,
1330 capsule_id: make_oid(3),
1331 timestamp_ns: 300,
1332 },
1333 ];
1334
1335 for w in markers.windows(2) {
1337 assert!(w[0].commit_seq < w[1].commit_seq);
1338 }
1339 }
1340
1341 #[test]
1342 fn test_ecs_replication_commit_capsules() {
1343 let local_capsules: BTreeSet<ObjectId> = [make_oid(1), make_oid(2)].into();
1345 let remote_capsules: BTreeSet<ObjectId> = [make_oid(1), make_oid(2), make_oid(3)].into();
1346
1347 let mut session = AntiEntropySession::new();
1348 session
1349 .exchange_tips(
1350 ReplicaTip {
1351 root_manifest_id: make_oid(10),
1352 marker_position: 1,
1353 index_segment_tips: vec![],
1354 },
1355 ReplicaTip {
1356 root_manifest_id: make_oid(11),
1357 marker_position: 2,
1358 index_segment_tips: vec![],
1359 },
1360 )
1361 .unwrap();
1362
1363 let missing = session
1364 .compute_missing(&local_capsules, &remote_capsules)
1365 .unwrap();
1366 assert_eq!(missing.needed, [make_oid(3)].into());
1367 }
1368
1369 #[test]
1370 fn test_ecs_replication_dedup() {
1371 let marker = CommitMarker {
1373 commit_seq: 77,
1374 capsule_id: make_oid(9),
1375 timestamp_ns: 1_234,
1376 };
1377 let mut dedup = CommitDeduplicator::default();
1378
1379 assert!(dedup.should_accept(&marker));
1380 assert!(!dedup.should_accept(&marker));
1381 assert_eq!(dedup.seen_count(), 1);
1382 }
1383
1384 #[test]
1387 fn test_e2e_3_node_replication() {
1388 let mut leader_objects: BTreeSet<ObjectId> = BTreeSet::new();
1390
1391 for i in 0..10_u8 {
1393 leader_objects.insert(make_oid(i));
1394 }
1395
1396 let follower1_objects: BTreeSet<ObjectId> = BTreeSet::new();
1398
1399 let mut session = AntiEntropySession::new();
1401 session
1402 .exchange_tips(
1403 ReplicaTip {
1404 root_manifest_id: make_oid(0),
1405 marker_position: 0,
1406 index_segment_tips: vec![],
1407 },
1408 ReplicaTip {
1409 root_manifest_id: make_oid(9),
1410 marker_position: 10,
1411 index_segment_tips: vec![],
1412 },
1413 )
1414 .unwrap();
1415 let missing = session
1416 .compute_missing(&follower1_objects, &leader_objects)
1417 .unwrap();
1418 assert_eq!(missing.needed.len(), 10);
1419
1420 for &oid in &missing.needed.clone() {
1421 session.record_decoded(oid).unwrap();
1422 }
1423 session.finalize().unwrap();
1424 assert!(session.is_converged());
1425 }
1426
1427 #[test]
1428 fn test_e2e_node_failure_recovery() {
1429 let policy = QuorumPolicy::two_of_three();
1431 let marker = CommitMarker {
1432 commit_seq: 1,
1433 capsule_id: make_oid(1),
1434 timestamp_ns: 1000,
1435 };
1436 let mut gate = CommitPublicationGate::new(marker, policy);
1437
1438 gate.record_store_acceptance(0);
1440 gate.record_store_acceptance(2);
1443
1444 assert!(gate.try_publish().is_some());
1446 }
1447
1448 #[test]
1449 fn test_e2e_lossy_replication_convergence() {
1450 fn delivered_with_loss(oid: &ObjectId, round: u32, loss_per_mille: u64) -> bool {
1452 let mut material = [0_u8; 20];
1453 material[..16].copy_from_slice(oid.as_bytes());
1454 material[16..].copy_from_slice(&round.to_le_bytes());
1455 xxhash_rust::xxh3::xxh3_64(&material) % 1000 >= loss_per_mille
1456 }
1457
1458 let leader_objects: BTreeSet<ObjectId> = (0_u8..100).map(make_oid).collect();
1459 let mut follower_objects: BTreeSet<ObjectId> = BTreeSet::new();
1460
1461 for round in 0_u32..32 {
1462 if follower_objects == leader_objects {
1463 break;
1464 }
1465
1466 let mut session = AntiEntropySession::new();
1467 session
1468 .exchange_tips(
1469 ReplicaTip {
1470 root_manifest_id: make_oid(1),
1471 marker_position: follower_objects.len() as u64,
1472 index_segment_tips: vec![],
1473 },
1474 ReplicaTip {
1475 root_manifest_id: make_oid(2),
1476 marker_position: leader_objects.len() as u64,
1477 index_segment_tips: vec![],
1478 },
1479 )
1480 .unwrap();
1481
1482 let missing = session
1483 .compute_missing(&follower_objects, &leader_objects)
1484 .unwrap()
1485 .needed
1486 .clone();
1487
1488 for oid in missing {
1489 if delivered_with_loss(&oid, round, 100) {
1490 session.record_decoded(oid).unwrap();
1491 follower_objects.insert(oid);
1492 }
1493 }
1494
1495 if session.phase() == AntiEntropyPhase::PersistAndUpdate {
1496 session.finalize().unwrap();
1497 }
1498 }
1499
1500 assert_eq!(follower_objects, leader_objects);
1501 }
1502}