Skip to main content

fsqlite_core/
ecs_replication.rs

1//! ECS-native replication architecture (§3.4.7, bd-1hi.19).
2//!
3//! High-level replication framework: roles, modes, anti-entropy convergence,
4//! quorum durability, consistent-hash symbol routing, and authenticated symbols.
5
6use std::collections::{BTreeSet, HashMap, HashSet};
7
8use fsqlite_error::{FrankenError, Result};
9use tracing::{debug, error, info, warn};
10
11// ---------------------------------------------------------------------------
12// Constants
13// ---------------------------------------------------------------------------
14
15const BEAD_ID: &str = "bd-1hi.19";
16
17// ---------------------------------------------------------------------------
18// Replication roles and modes
19// ---------------------------------------------------------------------------
20
21/// Replication role for a node.
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
23pub enum ReplicationRole {
24    /// Publishes authoritative commit-marker stream. Accepts MVCC writes.
25    Leader,
26    /// Replicates objects + markers, serves reads.
27    Follower,
28}
29
30/// Replication mode (§3.4.7 spec).
31#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
32pub enum ReplicationMode {
33    /// One leader publishes markers. V1 default.
34    #[default]
35    LeaderCommitClock,
36    /// Multiple nodes publish capsules. Experimental, not V1 default.
37    MultiWriter,
38}
39
40// ---------------------------------------------------------------------------
41// Replicated object types
42// ---------------------------------------------------------------------------
43
44/// Object ID — 16-byte content-addressed identifier.
45#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
46pub struct ObjectId([u8; 16]);
47
48impl ObjectId {
49    #[must_use]
50    pub const fn from_bytes(b: [u8; 16]) -> Self {
51        Self(b)
52    }
53
54    #[must_use]
55    pub const fn as_bytes(&self) -> &[u8; 16] {
56        &self.0
57    }
58}
59
60/// Categories of ECS objects that are replicated.
61#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
62pub enum ReplicatedObjectKind {
63    CommitCapsule,
64    CommitMarker,
65    IndexSegment,
66    ReadWitness,
67    WriteWitness,
68    WitnessDelta,
69    WitnessIndexSegment,
70    DependencyEdge,
71    CommitProof,
72    AbortWitness,
73    MergeWitness,
74    CheckpointChunk,
75    SnapshotManifest,
76    DecodeProof,
77}
78
79/// A commit marker record — the commit clock.
80#[derive(Debug, Clone, PartialEq, Eq)]
81pub struct CommitMarker {
82    pub commit_seq: u64,
83    pub capsule_id: ObjectId,
84    pub timestamp_ns: u64,
85}
86
87/// Idempotency key for commit-level replication deduplication.
88#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
89pub struct IdempotencyKey([u8; 16]);
90
91impl IdempotencyKey {
92    /// Derive an idempotency key from commit identity fields.
93    #[must_use]
94    pub fn from_marker(marker: &CommitMarker) -> Self {
95        let mut hasher = blake3::Hasher::new();
96        hasher.update(b"fsqlite:repl:idempotency:v1");
97        hasher.update(&marker.commit_seq.to_le_bytes());
98        hasher.update(marker.capsule_id.as_bytes());
99        let hash = hasher.finalize();
100        let mut out = [0_u8; 16];
101        out.copy_from_slice(&hash.as_bytes()[..16]);
102        Self(out)
103    }
104}
105
106/// Tracks replicated commits and suppresses duplicates by idempotency key.
107#[derive(Debug, Default)]
108pub struct CommitDeduplicator {
109    seen: HashSet<IdempotencyKey>,
110}
111
112impl CommitDeduplicator {
113    /// Returns true if the marker is new and should be replicated/applied.
114    pub fn should_accept(&mut self, marker: &CommitMarker) -> bool {
115        self.seen.insert(IdempotencyKey::from_marker(marker))
116    }
117
118    /// Number of unique commits seen.
119    #[must_use]
120    pub fn seen_count(&self) -> usize {
121        self.seen.len()
122    }
123}
124
125// ---------------------------------------------------------------------------
126// Anti-entropy protocol
127// ---------------------------------------------------------------------------
128
129/// Tip information exchanged between replicas.
130#[derive(Debug, Clone, PartialEq, Eq)]
131pub struct ReplicaTip {
132    /// Latest root manifest object ID.
133    pub root_manifest_id: ObjectId,
134    /// Latest marker stream position (commit sequence number).
135    pub marker_position: u64,
136    /// Optional index segment tips.
137    pub index_segment_tips: Vec<ObjectId>,
138}
139
140/// Result of computing missing objects between two replicas.
141#[derive(Debug, Clone, PartialEq, Eq)]
142pub struct MissingObjects {
143    /// Objects present in remote but not local.
144    pub needed: BTreeSet<ObjectId>,
145    /// Objects present locally but not remote.
146    pub to_offer: BTreeSet<ObjectId>,
147}
148
149/// Anti-entropy convergence protocol state.
150#[derive(Debug, Clone, Copy, PartialEq, Eq)]
151pub enum AntiEntropyPhase {
152    /// Step 1: Exchange tips.
153    ExchangeTips,
154    /// Step 2: Compute missing objects.
155    ComputeMissing,
156    /// Step 3: Request symbols for missing objects.
157    RequestSymbols,
158    /// Step 4: Stream symbols until decode.
159    StreamUntilDecode,
160    /// Step 5: Persist and update.
161    PersistAndUpdate,
162    /// Converged.
163    Complete,
164}
165
166/// Anti-entropy session between two replicas.
167#[derive(Debug)]
168pub struct AntiEntropySession {
169    phase: AntiEntropyPhase,
170    local_tip: Option<ReplicaTip>,
171    remote_tip: Option<ReplicaTip>,
172    missing: Option<MissingObjects>,
173    decoded_objects: HashSet<ObjectId>,
174}
175
176impl AntiEntropySession {
177    /// Create a new anti-entropy session.
178    #[must_use]
179    pub fn new() -> Self {
180        debug!(bead_id = BEAD_ID, "starting anti-entropy session");
181        Self {
182            phase: AntiEntropyPhase::ExchangeTips,
183            local_tip: None,
184            remote_tip: None,
185            missing: None,
186            decoded_objects: HashSet::new(),
187        }
188    }
189
190    /// Current phase.
191    #[must_use]
192    pub const fn phase(&self) -> AntiEntropyPhase {
193        self.phase
194    }
195
196    /// Step 1: Set local and remote tips.
197    pub fn exchange_tips(&mut self, local: ReplicaTip, remote: ReplicaTip) -> Result<()> {
198        if self.phase != AntiEntropyPhase::ExchangeTips {
199            return Err(FrankenError::Internal(format!(
200                "anti-entropy: expected ExchangeTips, got {:?}",
201                self.phase
202            )));
203        }
204        debug!(
205            bead_id = BEAD_ID,
206            local_pos = local.marker_position,
207            remote_pos = remote.marker_position,
208            "exchanged tips"
209        );
210        self.local_tip = Some(local);
211        self.remote_tip = Some(remote);
212        self.phase = AntiEntropyPhase::ComputeMissing;
213        Ok(())
214    }
215
216    /// Step 2: Compute missing objects from local and remote object sets.
217    pub fn compute_missing(
218        &mut self,
219        local_objects: &BTreeSet<ObjectId>,
220        remote_objects: &BTreeSet<ObjectId>,
221    ) -> Result<&MissingObjects> {
222        if self.phase != AntiEntropyPhase::ComputeMissing {
223            return Err(FrankenError::Internal(format!(
224                "anti-entropy: expected ComputeMissing, got {:?}",
225                self.phase
226            )));
227        }
228
229        let needed: BTreeSet<ObjectId> =
230            remote_objects.difference(local_objects).copied().collect();
231        let to_offer: BTreeSet<ObjectId> =
232            local_objects.difference(remote_objects).copied().collect();
233
234        debug!(
235            bead_id = BEAD_ID,
236            needed_count = needed.len(),
237            to_offer_count = to_offer.len(),
238            "computed missing objects"
239        );
240
241        self.missing = Some(MissingObjects { needed, to_offer });
242        self.phase = AntiEntropyPhase::RequestSymbols;
243        Ok(self.missing.as_ref().expect("just set"))
244    }
245
246    /// Step 3: Return the set of object IDs we need symbols for.
247    #[must_use]
248    pub fn objects_to_request(&self) -> Option<&BTreeSet<ObjectId>> {
249        self.missing.as_ref().map(|m| &m.needed)
250    }
251
252    /// Step 4: Record that we received enough symbols and decoded an object.
253    pub fn record_decoded(&mut self, object_id: ObjectId) -> Result<()> {
254        if self.phase != AntiEntropyPhase::RequestSymbols
255            && self.phase != AntiEntropyPhase::StreamUntilDecode
256        {
257            return Err(FrankenError::Internal(format!(
258                "anti-entropy: expected RequestSymbols/StreamUntilDecode, got {:?}",
259                self.phase
260            )));
261        }
262        self.phase = AntiEntropyPhase::StreamUntilDecode;
263        self.decoded_objects.insert(object_id);
264
265        // Check if all needed objects are decoded.
266        if let Some(missing) = &self.missing {
267            if missing
268                .needed
269                .iter()
270                .all(|id| self.decoded_objects.contains(id))
271            {
272                debug!(
273                    bead_id = BEAD_ID,
274                    decoded_count = self.decoded_objects.len(),
275                    "all missing objects decoded"
276                );
277                self.phase = AntiEntropyPhase::PersistAndUpdate;
278            }
279        }
280        Ok(())
281    }
282
283    /// Step 5: Finalize — persist and update local state.
284    pub fn finalize(&mut self) -> Result<()> {
285        if self.phase != AntiEntropyPhase::PersistAndUpdate {
286            return Err(FrankenError::Internal(format!(
287                "anti-entropy: expected PersistAndUpdate, got {:?}",
288                self.phase
289            )));
290        }
291        info!(
292            bead_id = BEAD_ID,
293            decoded_count = self.decoded_objects.len(),
294            "anti-entropy session complete — persisted"
295        );
296        self.phase = AntiEntropyPhase::Complete;
297        Ok(())
298    }
299
300    /// Check if the session has converged.
301    #[must_use]
302    pub const fn is_converged(&self) -> bool {
303        matches!(self.phase, AntiEntropyPhase::Complete)
304    }
305}
306
307impl Default for AntiEntropySession {
308    fn default() -> Self {
309        Self::new()
310    }
311}
312
313// ---------------------------------------------------------------------------
314// Quorum durability
315// ---------------------------------------------------------------------------
316
317/// Quorum durability policy.
318#[derive(Debug, Clone, PartialEq, Eq)]
319pub struct QuorumPolicy {
320    /// Minimum stores that must accept symbols before commit is durable.
321    pub required: u32,
322    /// Total number of stores in the quorum set.
323    pub total: u32,
324}
325
326impl QuorumPolicy {
327    /// Create a local-only policy: quorum(1, 1).
328    #[must_use]
329    pub const fn local_only() -> Self {
330        Self {
331            required: 1,
332            total: 1,
333        }
334    }
335
336    /// Create a 2-of-3 policy.
337    #[must_use]
338    pub const fn two_of_three() -> Self {
339        Self {
340            required: 2,
341            total: 3,
342        }
343    }
344
345    /// Create a custom quorum policy.
346    pub fn new(required: u32, total: u32) -> Result<Self> {
347        if required == 0 || required > total {
348            return Err(FrankenError::Internal(format!(
349                "invalid quorum: required={required}, total={total}"
350            )));
351        }
352        Ok(Self { required, total })
353    }
354}
355
356/// Tracks store acknowledgements for quorum satisfaction.
357#[derive(Debug)]
358pub struct QuorumTracker {
359    policy: QuorumPolicy,
360    accepted: HashSet<u32>,
361}
362
363impl QuorumTracker {
364    /// Create a new tracker for the given policy.
365    #[must_use]
366    pub fn new(policy: QuorumPolicy) -> Self {
367        Self {
368            policy,
369            accepted: HashSet::new(),
370        }
371    }
372
373    /// Record that store `store_id` has accepted sufficient symbols.
374    pub fn record_acceptance(&mut self, store_id: u32) {
375        self.accepted.insert(store_id);
376        debug!(
377            bead_id = BEAD_ID,
378            store_id,
379            accepted = self.accepted.len(),
380            required = self.policy.required,
381            "store accepted symbols"
382        );
383    }
384
385    /// Check if quorum is satisfied.
386    #[must_use]
387    #[allow(clippy::cast_possible_truncation)]
388    pub fn is_satisfied(&self) -> bool {
389        self.accepted.len() as u32 >= self.policy.required
390    }
391
392    /// Number of stores that have accepted.
393    #[must_use]
394    pub fn accepted_count(&self) -> usize {
395        self.accepted.len()
396    }
397
398    /// Policy reference.
399    #[must_use]
400    pub const fn policy(&self) -> &QuorumPolicy {
401        &self.policy
402    }
403}
404
405// ---------------------------------------------------------------------------
406// Consistent-hash symbol routing
407// ---------------------------------------------------------------------------
408
409/// Consistent hash ring for symbol routing.
410#[derive(Debug, Clone)]
411pub struct ConsistentHashRing {
412    /// (hash_value, node_id) sorted by hash_value.
413    ring: Vec<(u64, u32)>,
414    /// Number of virtual nodes per physical node.
415    vnodes: u32,
416}
417
418impl ConsistentHashRing {
419    /// Create a ring with the given node IDs and virtual node count.
420    #[must_use]
421    pub fn new(node_ids: &[u32], vnodes: u32) -> Self {
422        let mut ring = Vec::with_capacity(node_ids.len() * vnodes as usize);
423        for &nid in node_ids {
424            for v in 0..vnodes {
425                let hash = Self::hash_vnode(nid, v);
426                ring.push((hash, nid));
427            }
428        }
429        ring.sort_unstable_by_key(|&(h, _)| h);
430        Self { ring, vnodes }
431    }
432
433    /// Route a symbol (identified by `object_id` + `esi`) to a node.
434    #[must_use]
435    pub fn route(&self, object_id: &ObjectId, esi: u32) -> Option<u32> {
436        if self.ring.is_empty() {
437            return None;
438        }
439        let key = Self::hash_symbol(object_id, esi);
440        // Binary search for the first ring entry >= key.
441        let idx = self.ring.partition_point(|&(h, _)| h < key);
442        let idx = if idx >= self.ring.len() { 0 } else { idx };
443        Some(self.ring[idx].1)
444    }
445
446    /// Add a node to the ring. Returns the set of symbols that need to be re-routed.
447    #[must_use]
448    pub fn add_node(&mut self, node_id: u32) -> Self {
449        let mut node_ids: BTreeSet<u32> = self.ring.iter().map(|&(_, n)| n).collect();
450        node_ids.insert(node_id);
451        let ids: Vec<u32> = node_ids.into_iter().collect();
452        Self::new(&ids, self.vnodes)
453    }
454
455    /// Number of distinct physical nodes in the ring.
456    #[must_use]
457    pub fn node_count(&self) -> usize {
458        let nodes: HashSet<u32> = self.ring.iter().map(|&(_, n)| n).collect();
459        nodes.len()
460    }
461
462    fn hash_vnode(node_id: u32, vnode: u32) -> u64 {
463        let mut buf = [0u8; 8];
464        buf[..4].copy_from_slice(&node_id.to_le_bytes());
465        buf[4..8].copy_from_slice(&vnode.to_le_bytes());
466        xxhash_rust::xxh3::xxh3_64(&buf)
467    }
468
469    fn hash_symbol(object_id: &ObjectId, esi: u32) -> u64 {
470        let mut buf = [0u8; 20];
471        buf[..16].copy_from_slice(object_id.as_bytes());
472        buf[16..20].copy_from_slice(&esi.to_le_bytes());
473        xxhash_rust::xxh3::xxh3_64(&buf)
474    }
475}
476
477// ---------------------------------------------------------------------------
478// Authenticated symbols
479// ---------------------------------------------------------------------------
480
481/// An authenticated symbol with an auth tag.
482#[derive(Debug, Clone, PartialEq, Eq)]
483pub struct AuthenticatedSymbol {
484    pub object_id: ObjectId,
485    pub esi: u32,
486    pub data: Vec<u8>,
487    /// Auth tag for integrity verification.
488    pub auth_tag: [u8; 16],
489}
490
491impl AuthenticatedSymbol {
492    /// Compute expected auth tag for the given data.
493    #[must_use]
494    pub fn compute_auth_tag(object_id: &ObjectId, esi: u32, data: &[u8]) -> [u8; 16] {
495        let mut hasher = blake3::Hasher::new();
496        hasher.update(b"fsqlite:repl:auth:v1");
497        hasher.update(object_id.as_bytes());
498        hasher.update(&esi.to_le_bytes());
499        hasher.update(data);
500        let hash = hasher.finalize();
501        let mut tag = [0u8; 16];
502        tag.copy_from_slice(&hash.as_bytes()[..16]);
503        tag
504    }
505
506    /// Verify that this symbol's auth tag is valid.
507    #[must_use]
508    pub fn verify(&self) -> bool {
509        let expected = Self::compute_auth_tag(&self.object_id, self.esi, &self.data);
510        self.auth_tag == expected
511    }
512
513    /// Create a new authenticated symbol with a correct auth tag.
514    #[must_use]
515    pub fn new(object_id: ObjectId, esi: u32, data: Vec<u8>) -> Self {
516        let auth_tag = Self::compute_auth_tag(&object_id, esi, &data);
517        Self {
518            object_id,
519            esi,
520            data,
521            auth_tag,
522        }
523    }
524}
525
526// ---------------------------------------------------------------------------
527// Replication configuration
528// ---------------------------------------------------------------------------
529
530/// Configuration for the replication subsystem.
531#[derive(Debug, Clone)]
532pub struct ReplicationConfig {
533    pub role: ReplicationRole,
534    pub mode: ReplicationMode,
535    pub quorum: QuorumPolicy,
536    pub security_enabled: bool,
537    pub multi_writer_explicit: bool,
538}
539
540impl Default for ReplicationConfig {
541    fn default() -> Self {
542        Self {
543            role: ReplicationRole::Leader,
544            mode: ReplicationMode::LeaderCommitClock,
545            quorum: QuorumPolicy::local_only(),
546            security_enabled: false,
547            multi_writer_explicit: false,
548        }
549    }
550}
551
552/// Validate replication config. Multi-writer requires explicit opt-in.
553pub fn validate_config(config: &ReplicationConfig) -> Result<()> {
554    if config.mode == ReplicationMode::MultiWriter && !config.multi_writer_explicit {
555        error!(
556            bead_id = BEAD_ID,
557            "multi-writer mode requires explicit configuration"
558        );
559        return Err(FrankenError::Internal(
560            "multi-writer replication mode requires explicit opt-in via multi_writer_explicit=true"
561                .into(),
562        ));
563    }
564    info!(
565        bead_id = BEAD_ID,
566        role = ?config.role,
567        mode = ?config.mode,
568        quorum_required = config.quorum.required,
569        quorum_total = config.quorum.total,
570        security = config.security_enabled,
571        "replication config validated"
572    );
573    Ok(())
574}
575
576// ---------------------------------------------------------------------------
577// Commit publication gate
578// ---------------------------------------------------------------------------
579
580/// Manages the commit-publication gate: markers are not published until
581/// the durability quorum is satisfied.
582#[derive(Debug)]
583pub struct CommitPublicationGate {
584    tracker: QuorumTracker,
585    marker: Option<CommitMarker>,
586    published: bool,
587}
588
589impl CommitPublicationGate {
590    /// Create a gate for the given marker and quorum policy.
591    #[must_use]
592    pub fn new(marker: CommitMarker, policy: QuorumPolicy) -> Self {
593        Self {
594            tracker: QuorumTracker::new(policy),
595            marker: Some(marker),
596            published: false,
597        }
598    }
599
600    /// Record that a store accepted the commit's symbols.
601    pub fn record_store_acceptance(&mut self, store_id: u32) {
602        self.tracker.record_acceptance(store_id);
603    }
604
605    /// Try to publish the marker. Returns the marker if quorum is met,
606    /// None if not yet satisfied.
607    pub fn try_publish(&mut self) -> Option<&CommitMarker> {
608        if self.published {
609            return self.marker.as_ref();
610        }
611        if self.tracker.is_satisfied() {
612            self.published = true;
613            info!(
614                bead_id = BEAD_ID,
615                accepted = self.tracker.accepted_count(),
616                required = self.tracker.policy().required,
617                "quorum satisfied — marker published"
618            );
619            self.marker.as_ref()
620        } else {
621            warn!(
622                bead_id = BEAD_ID,
623                accepted = self.tracker.accepted_count(),
624                required = self.tracker.policy().required,
625                "quorum not yet satisfied — marker withheld"
626            );
627            None
628        }
629    }
630
631    /// Is the marker published?
632    #[must_use]
633    pub const fn is_published(&self) -> bool {
634        self.published
635    }
636}
637
638// ---------------------------------------------------------------------------
639// Symbol filter for security
640// ---------------------------------------------------------------------------
641
642/// Filter authenticated symbols, rejecting invalid ones.
643pub fn filter_authenticated_symbols(
644    symbols: &[AuthenticatedSymbol],
645) -> (Vec<&AuthenticatedSymbol>, Vec<&AuthenticatedSymbol>) {
646    let mut accepted = Vec::new();
647    let mut rejected = Vec::new();
648
649    for sym in symbols {
650        if sym.verify() {
651            accepted.push(sym);
652        } else {
653            debug!(
654                bead_id = BEAD_ID,
655                esi = sym.esi,
656                "rejected unauthenticated symbol"
657            );
658            rejected.push(sym);
659        }
660    }
661
662    if !rejected.is_empty() {
663        warn!(
664            bead_id = BEAD_ID,
665            rejected_count = rejected.len(),
666            accepted_count = accepted.len(),
667            "filtered out unauthenticated symbols"
668        );
669    }
670
671    (accepted, rejected)
672}
673
674// ---------------------------------------------------------------------------
675// Sheaf consistency check (simplified)
676// ---------------------------------------------------------------------------
677
678/// A trace event for consistency checking.
679#[derive(Debug, Clone, PartialEq, Eq)]
680pub struct TraceEvent {
681    pub node_id: u32,
682    pub commit_seq: u64,
683    pub object_id: ObjectId,
684    pub event_type: TraceEventType,
685}
686
687/// Type of trace event.
688#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
689pub enum TraceEventType {
690    Published,
691    Received,
692    Applied,
693}
694
695/// Result of a sheaf consistency check.
696#[derive(Debug, Clone, PartialEq, Eq)]
697pub struct SheafCheckResult {
698    pub is_consistent: bool,
699    pub anomalies: Vec<SheafAnomaly>,
700}
701
702/// A consistency anomaly detected by sheaf check.
703#[derive(Debug, Clone, PartialEq, Eq)]
704pub struct SheafAnomaly {
705    pub description: String,
706    pub commit_seq: u64,
707    pub involved_nodes: Vec<u32>,
708}
709
710/// Run a sheaf consistency check on a set of trace events.
711///
712/// Detects phantom commits: commits seen by no single node end-to-end
713/// (Published + Applied on the same node).
714#[must_use]
715pub fn sheaf_consistency_check(events: &[TraceEvent]) -> SheafCheckResult {
716    // Group events by (commit_seq, node_id).
717    let mut node_events: HashMap<(u64, u32), HashSet<TraceEventType>> = HashMap::new();
718    for ev in events {
719        node_events
720            .entry((ev.commit_seq, ev.node_id))
721            .or_default()
722            .insert(ev.event_type);
723    }
724
725    // Find all commit sequences.
726    let commit_seqs: BTreeSet<u64> = events.iter().map(|e| e.commit_seq).collect();
727    let all_nodes: BTreeSet<u32> = events.iter().map(|e| e.node_id).collect();
728
729    let mut anomalies = Vec::new();
730
731    for &seq in &commit_seqs {
732        // Check if any single node has both Published and Applied.
733        let has_end_to_end = all_nodes.iter().any(|&nid| {
734            let key = (seq, nid);
735            if let Some(types) = node_events.get(&key) {
736                types.contains(&TraceEventType::Published)
737                    && types.contains(&TraceEventType::Applied)
738            } else {
739                false
740            }
741        });
742
743        if !has_end_to_end {
744            // Phantom commit: no single node witnessed end-to-end.
745            let involved: Vec<u32> = all_nodes
746                .iter()
747                .filter(|&&nid| node_events.contains_key(&(seq, nid)))
748                .copied()
749                .collect();
750
751            if !involved.is_empty() {
752                anomalies.push(SheafAnomaly {
753                    description: format!(
754                        "phantom commit at seq {seq}: no single node has both Published and Applied"
755                    ),
756                    commit_seq: seq,
757                    involved_nodes: involved,
758                });
759            }
760        }
761    }
762
763    let is_consistent = anomalies.is_empty();
764
765    if is_consistent {
766        debug!(
767            bead_id = BEAD_ID,
768            commit_count = commit_seqs.len(),
769            "sheaf consistency check passed"
770        );
771    } else {
772        warn!(
773            bead_id = BEAD_ID,
774            anomaly_count = anomalies.len(),
775            "sheaf consistency check found anomalies"
776        );
777    }
778
779    SheafCheckResult {
780        is_consistent,
781        anomalies,
782    }
783}
784
785// ---------------------------------------------------------------------------
786// TLA+ trace export (simplified)
787// ---------------------------------------------------------------------------
788
789/// Export trace events as TLA+ behavior specification.
790#[must_use]
791pub fn export_tla_trace(events: &[TraceEvent]) -> String {
792    use std::fmt::Write;
793    let mut out = String::new();
794    let _ = writeln!(out, "---- MODULE ReplicationTrace ----");
795    let _ = writeln!(out, "EXTENDS Integers, Sequences, FiniteSets");
796    let _ = writeln!(out);
797    let _ = writeln!(out, "VARIABLES committed, applied");
798    let _ = writeln!(out);
799    let _ = writeln!(out, "Init ==");
800    let _ = writeln!(out, "  /\\ committed = {{}}");
801    let _ = writeln!(out, "  /\\ applied = {{}}");
802    let _ = writeln!(out);
803
804    for (i, ev) in events.iter().enumerate() {
805        let _ = writeln!(
806            out,
807            "\\* Step {i}: node={}, seq={}",
808            ev.node_id, ev.commit_seq
809        );
810        match ev.event_type {
811            TraceEventType::Published => {
812                let _ = writeln!(
813                    out,
814                    "Step{i} == committed' = committed \\cup {{{}}}",
815                    ev.commit_seq
816                );
817            }
818            TraceEventType::Applied => {
819                let _ = writeln!(
820                    out,
821                    "Step{i} == applied' = applied \\cup {{{}}}",
822                    ev.commit_seq
823                );
824            }
825            TraceEventType::Received => {
826                let _ = writeln!(out, "\\* Received event (no state change in this model)");
827            }
828        }
829        let _ = writeln!(out);
830    }
831
832    let _ = writeln!(out, "====");
833    out
834}
835
836// ---------------------------------------------------------------------------
837// Tests
838// ---------------------------------------------------------------------------
839
840#[cfg(test)]
841#[allow(clippy::too_many_lines)]
842mod tests {
843    use super::*;
844
845    fn make_oid(seed: u8) -> ObjectId {
846        let mut b = [0u8; 16];
847        b[0] = seed;
848        ObjectId::from_bytes(b)
849    }
850
851    // -- Compliance gates --
852
853    #[test]
854    fn test_bd_1hi_19_unit_compliance_gate() {
855        assert_eq!(BEAD_ID, "bd-1hi.19");
856        // Verify all required types exist.
857        let _ = ReplicationRole::Leader;
858        let _ = ReplicationRole::Follower;
859        let _ = ReplicationMode::LeaderCommitClock;
860        let _ = ReplicationMode::MultiWriter;
861        let _ = AntiEntropyPhase::ExchangeTips;
862        let _ = QuorumPolicy::local_only();
863    }
864
865    #[test]
866    fn prop_bd_1hi_19_structure_compliance() {
867        // Property: anti-entropy session progresses through all phases.
868        let mut session = AntiEntropySession::new();
869        assert_eq!(session.phase(), AntiEntropyPhase::ExchangeTips);
870
871        let local = ReplicaTip {
872            root_manifest_id: make_oid(1),
873            marker_position: 10,
874            index_segment_tips: vec![],
875        };
876        let remote = ReplicaTip {
877            root_manifest_id: make_oid(2),
878            marker_position: 12,
879            index_segment_tips: vec![],
880        };
881        session.exchange_tips(local, remote).unwrap();
882        assert_eq!(session.phase(), AntiEntropyPhase::ComputeMissing);
883    }
884
885    #[test]
886    fn test_e2e_bd_1hi_19_compliance() {
887        // E2E: full anti-entropy cycle with quorum gate.
888        let config = ReplicationConfig::default();
889        validate_config(&config).unwrap();
890
891        let mut session = AntiEntropySession::new();
892        let local = ReplicaTip {
893            root_manifest_id: make_oid(1),
894            marker_position: 5,
895            index_segment_tips: vec![],
896        };
897        let remote = ReplicaTip {
898            root_manifest_id: make_oid(2),
899            marker_position: 7,
900            index_segment_tips: vec![],
901        };
902        session.exchange_tips(local, remote).unwrap();
903
904        let local_objects: BTreeSet<ObjectId> = [make_oid(10), make_oid(20)].into();
905        let remote_objects: BTreeSet<ObjectId> = [make_oid(20), make_oid(30)].into();
906        let missing = session
907            .compute_missing(&local_objects, &remote_objects)
908            .unwrap();
909        assert!(missing.needed.contains(&make_oid(30)));
910
911        session.record_decoded(make_oid(30)).unwrap();
912        assert_eq!(session.phase(), AntiEntropyPhase::PersistAndUpdate);
913        session.finalize().unwrap();
914        assert!(session.is_converged());
915    }
916
917    // -- Leader-follower replication --
918
919    #[test]
920    fn test_leader_follower_replication() {
921        let config = ReplicationConfig {
922            role: ReplicationRole::Leader,
923            mode: ReplicationMode::LeaderCommitClock,
924            ..Default::default()
925        };
926        validate_config(&config).unwrap();
927
928        let follower_config = ReplicationConfig {
929            role: ReplicationRole::Follower,
930            mode: ReplicationMode::LeaderCommitClock,
931            ..Default::default()
932        };
933        validate_config(&follower_config).unwrap();
934    }
935
936    // -- Anti-entropy tests --
937
938    #[test]
939    fn test_anti_entropy_exchange_tips() {
940        let mut session = AntiEntropySession::new();
941        let local = ReplicaTip {
942            root_manifest_id: make_oid(1),
943            marker_position: 10,
944            index_segment_tips: vec![make_oid(100)],
945        };
946        let remote = ReplicaTip {
947            root_manifest_id: make_oid(2),
948            marker_position: 15,
949            index_segment_tips: vec![make_oid(200)],
950        };
951        session.exchange_tips(local, remote).unwrap();
952        assert_eq!(session.phase(), AntiEntropyPhase::ComputeMissing);
953    }
954
955    #[test]
956    fn test_anti_entropy_compute_missing() {
957        let mut session = AntiEntropySession::new();
958        session
959            .exchange_tips(
960                ReplicaTip {
961                    root_manifest_id: make_oid(1),
962                    marker_position: 0,
963                    index_segment_tips: vec![],
964                },
965                ReplicaTip {
966                    root_manifest_id: make_oid(2),
967                    marker_position: 0,
968                    index_segment_tips: vec![],
969                },
970            )
971            .unwrap();
972
973        let local: BTreeSet<ObjectId> = [make_oid(1), make_oid(2), make_oid(3)].into();
974        let remote: BTreeSet<ObjectId> = [make_oid(2), make_oid(3), make_oid(4)].into();
975
976        let missing = session.compute_missing(&local, &remote).unwrap();
977        assert_eq!(missing.needed, [make_oid(4)].into());
978        assert_eq!(missing.to_offer, [make_oid(1)].into());
979    }
980
981    #[test]
982    fn test_anti_entropy_stream_until_decode() {
983        let mut session = AntiEntropySession::new();
984        session
985            .exchange_tips(
986                ReplicaTip {
987                    root_manifest_id: make_oid(1),
988                    marker_position: 0,
989                    index_segment_tips: vec![],
990                },
991                ReplicaTip {
992                    root_manifest_id: make_oid(2),
993                    marker_position: 0,
994                    index_segment_tips: vec![],
995                },
996            )
997            .unwrap();
998
999        let local: BTreeSet<ObjectId> = [make_oid(1)].into();
1000        let remote: BTreeSet<ObjectId> = [make_oid(1), make_oid(2), make_oid(3)].into();
1001        session.compute_missing(&local, &remote).unwrap();
1002
1003        // Decode objects one by one.
1004        session.record_decoded(make_oid(2)).unwrap();
1005        assert_eq!(session.phase(), AntiEntropyPhase::StreamUntilDecode);
1006        session.record_decoded(make_oid(3)).unwrap();
1007        assert_eq!(session.phase(), AntiEntropyPhase::PersistAndUpdate);
1008    }
1009
1010    #[test]
1011    fn test_anti_entropy_convergence() {
1012        let mut session = AntiEntropySession::new();
1013        session
1014            .exchange_tips(
1015                ReplicaTip {
1016                    root_manifest_id: make_oid(1),
1017                    marker_position: 0,
1018                    index_segment_tips: vec![],
1019                },
1020                ReplicaTip {
1021                    root_manifest_id: make_oid(2),
1022                    marker_position: 0,
1023                    index_segment_tips: vec![],
1024                },
1025            )
1026            .unwrap();
1027
1028        let local: BTreeSet<ObjectId> = [make_oid(1), make_oid(2)].into();
1029        let remote: BTreeSet<ObjectId> = [make_oid(2), make_oid(3)].into();
1030        session.compute_missing(&local, &remote).unwrap();
1031        session.record_decoded(make_oid(3)).unwrap();
1032        session.finalize().unwrap();
1033        assert!(session.is_converged());
1034    }
1035
1036    // -- Quorum tests --
1037
1038    #[test]
1039    fn test_quorum_local_only() {
1040        let policy = QuorumPolicy::local_only();
1041        let mut tracker = QuorumTracker::new(policy);
1042        assert!(!tracker.is_satisfied());
1043        tracker.record_acceptance(0);
1044        assert!(tracker.is_satisfied());
1045    }
1046
1047    #[test]
1048    fn test_quorum_2_of_3() {
1049        let policy = QuorumPolicy::two_of_three();
1050        let mut tracker = QuorumTracker::new(policy);
1051        assert!(!tracker.is_satisfied());
1052        tracker.record_acceptance(0);
1053        assert!(!tracker.is_satisfied()); // 1 of 3.
1054        tracker.record_acceptance(1);
1055        assert!(tracker.is_satisfied()); // 2 of 3.
1056        tracker.record_acceptance(2);
1057        assert!(tracker.is_satisfied()); // 3 of 3 — still satisfied.
1058    }
1059
1060    #[test]
1061    fn test_quorum_blocks_marker_publication() {
1062        let marker = CommitMarker {
1063            commit_seq: 42,
1064            capsule_id: make_oid(10),
1065            timestamp_ns: 1_000_000,
1066        };
1067        let policy = QuorumPolicy::two_of_three();
1068        let mut gate = CommitPublicationGate::new(marker, policy);
1069
1070        // Marker not published before quorum.
1071        assert!(!gate.is_published());
1072        assert!(gate.try_publish().is_none());
1073
1074        gate.record_store_acceptance(0);
1075        assert!(gate.try_publish().is_none()); // 1 of 2 needed.
1076
1077        gate.record_store_acceptance(1);
1078        let published = gate.try_publish();
1079        assert!(published.is_some());
1080        assert_eq!(published.unwrap().commit_seq, 42);
1081        assert!(gate.is_published());
1082    }
1083
1084    // -- Symbol routing tests --
1085
1086    #[test]
1087    fn test_symbol_routing_consistent_hash() {
1088        let ring = ConsistentHashRing::new(&[1, 2, 3], 100);
1089        assert_eq!(ring.node_count(), 3);
1090
1091        let oid = make_oid(42);
1092        let node = ring.route(&oid, 0).unwrap();
1093        assert!([1, 2, 3].contains(&node));
1094
1095        // Deterministic: same input → same output.
1096        let node2 = ring.route(&oid, 0).unwrap();
1097        assert_eq!(node, node2);
1098    }
1099
1100    #[test]
1101    fn test_symbol_routing_add_node_minimal_reroute() {
1102        let mut ring3 = ConsistentHashRing::new(&[1, 2, 3], 100);
1103        let ring4 = ring3.add_node(4);
1104        assert_eq!(ring4.node_count(), 4);
1105
1106        // Most symbols should stay on same node. Count reroutes.
1107        let oid = make_oid(1);
1108        let mut rerouted = 0_u32;
1109        for esi in 0..1000 {
1110            let n3 = ring3.route(&oid, esi).unwrap();
1111            let n4 = ring4.route(&oid, esi).unwrap();
1112            if n3 != n4 {
1113                rerouted += 1;
1114            }
1115        }
1116        // Adding 1 of 4 nodes should reroute roughly 25% (with consistent hashing).
1117        // Allow wide margin due to hash distribution.
1118        assert!(rerouted < 500, "too many reroutes: {rerouted}/1000");
1119    }
1120
1121    // -- Authenticated symbols tests --
1122
1123    #[test]
1124    fn test_authenticated_symbols_verified() {
1125        let sym = AuthenticatedSymbol::new(make_oid(1), 0, vec![1, 2, 3]);
1126        assert!(sym.verify());
1127
1128        // Tampered data.
1129        let mut bad = sym.clone();
1130        bad.data[0] = 99;
1131        assert!(!bad.verify());
1132
1133        // Tampered auth_tag.
1134        let mut bad2 = sym;
1135        bad2.auth_tag[0] ^= 0xFF;
1136        assert!(!bad2.verify());
1137    }
1138
1139    #[test]
1140    fn test_unauthenticated_fallback() {
1141        let good1 = AuthenticatedSymbol::new(make_oid(1), 0, vec![10, 20]);
1142        let good2 = AuthenticatedSymbol::new(make_oid(1), 1, vec![30, 40]);
1143        let mut bad = AuthenticatedSymbol::new(make_oid(1), 2, vec![50, 60]);
1144        bad.auth_tag[0] ^= 0xFF; // Corrupt.
1145
1146        let all = [good1, good2, bad];
1147        let (accepted, rejected) = filter_authenticated_symbols(&all);
1148        assert_eq!(accepted.len(), 2);
1149        assert_eq!(rejected.len(), 1);
1150        assert_eq!(rejected[0].esi, 2);
1151    }
1152
1153    // -- Sheaf consistency check --
1154
1155    #[test]
1156    fn test_sheaf_consistency_check_clean() {
1157        let events = vec![
1158            TraceEvent {
1159                node_id: 1,
1160                commit_seq: 1,
1161                object_id: make_oid(10),
1162                event_type: TraceEventType::Published,
1163            },
1164            TraceEvent {
1165                node_id: 1,
1166                commit_seq: 1,
1167                object_id: make_oid(10),
1168                event_type: TraceEventType::Applied,
1169            },
1170        ];
1171        let result = sheaf_consistency_check(&events);
1172        assert!(result.is_consistent);
1173        assert!(result.anomalies.is_empty());
1174    }
1175
1176    #[test]
1177    fn test_sheaf_consistency_check_phantom() {
1178        // Phantom commit: node 1 Published, node 2 Applied, no single node has both.
1179        let events = vec![
1180            TraceEvent {
1181                node_id: 1,
1182                commit_seq: 1,
1183                object_id: make_oid(10),
1184                event_type: TraceEventType::Published,
1185            },
1186            TraceEvent {
1187                node_id: 2,
1188                commit_seq: 1,
1189                object_id: make_oid(10),
1190                event_type: TraceEventType::Applied,
1191            },
1192        ];
1193        let result = sheaf_consistency_check(&events);
1194        assert!(!result.is_consistent);
1195        assert_eq!(result.anomalies.len(), 1);
1196        assert_eq!(result.anomalies[0].commit_seq, 1);
1197    }
1198
1199    // -- TLA+ export --
1200
1201    #[test]
1202    fn test_tla_export() {
1203        let events = vec![
1204            TraceEvent {
1205                node_id: 1,
1206                commit_seq: 1,
1207                object_id: make_oid(10),
1208                event_type: TraceEventType::Published,
1209            },
1210            TraceEvent {
1211                node_id: 2,
1212                commit_seq: 1,
1213                object_id: make_oid(10),
1214                event_type: TraceEventType::Applied,
1215            },
1216        ];
1217        let tla = export_tla_trace(&events);
1218        assert!(tla.contains("MODULE ReplicationTrace"));
1219        assert!(tla.contains("committed"));
1220        assert!(tla.contains("applied"));
1221        assert!(tla.contains("===="));
1222    }
1223
1224    // -- Multi-writer gated --
1225
1226    #[test]
1227    fn test_multiwriter_not_default() {
1228        let config = ReplicationConfig {
1229            mode: ReplicationMode::MultiWriter,
1230            multi_writer_explicit: false,
1231            ..Default::default()
1232        };
1233        let result = validate_config(&config);
1234        assert!(result.is_err());
1235    }
1236
1237    #[test]
1238    fn test_multiwriter_explicit_ok() {
1239        let config = ReplicationConfig {
1240            mode: ReplicationMode::MultiWriter,
1241            multi_writer_explicit: true,
1242            ..Default::default()
1243        };
1244        validate_config(&config).unwrap();
1245    }
1246
1247    // -- Property tests --
1248
1249    #[test]
1250    fn prop_anti_entropy_convergence() {
1251        // For various random-ish object sets, anti-entropy always converges.
1252        for seed in 0..20_u8 {
1253            let local: BTreeSet<ObjectId> = (0..seed).map(|i| make_oid(i * 2)).collect();
1254            let remote: BTreeSet<ObjectId> = (0..seed).map(|i| make_oid(i * 2 + 1)).collect();
1255
1256            let mut session = AntiEntropySession::new();
1257            session
1258                .exchange_tips(
1259                    ReplicaTip {
1260                        root_manifest_id: make_oid(100),
1261                        marker_position: 0,
1262                        index_segment_tips: vec![],
1263                    },
1264                    ReplicaTip {
1265                        root_manifest_id: make_oid(200),
1266                        marker_position: 0,
1267                        index_segment_tips: vec![],
1268                    },
1269                )
1270                .unwrap();
1271            let missing = session.compute_missing(&local, &remote).unwrap();
1272            for &oid in &missing.needed.clone() {
1273                session.record_decoded(oid).unwrap();
1274            }
1275            if session.phase() == AntiEntropyPhase::PersistAndUpdate {
1276                session.finalize().unwrap();
1277            }
1278            // Either converged (had missing objects) or still at RequestSymbols (nothing missing).
1279            assert!(
1280                session.is_converged() || session.phase() == AntiEntropyPhase::RequestSymbols,
1281                "failed to converge for seed={seed}"
1282            );
1283        }
1284    }
1285
1286    #[test]
1287    fn prop_quorum_safety() {
1288        // For various M, N, quorum only reports satisfied when >= M accepts.
1289        for m in 1..=5_u32 {
1290            for n in m..=5 {
1291                let policy = QuorumPolicy::new(m, n).unwrap();
1292                let mut tracker = QuorumTracker::new(policy);
1293                for i in 0..m - 1 {
1294                    tracker.record_acceptance(i);
1295                    assert!(
1296                        !tracker.is_satisfied(),
1297                        "should not be satisfied with {} of {} (need {})",
1298                        i + 1,
1299                        n,
1300                        m
1301                    );
1302                }
1303                tracker.record_acceptance(m - 1);
1304                assert!(
1305                    tracker.is_satisfied(),
1306                    "should be satisfied with {m} of {n}"
1307                );
1308            }
1309        }
1310    }
1311
1312    // -- ECS replication ordering --
1313
1314    #[test]
1315    fn test_ecs_replication_ordering() {
1316        // Commit markers applied in commit_seq order.
1317        let markers = [
1318            CommitMarker {
1319                commit_seq: 1,
1320                capsule_id: make_oid(1),
1321                timestamp_ns: 100,
1322            },
1323            CommitMarker {
1324                commit_seq: 2,
1325                capsule_id: make_oid(2),
1326                timestamp_ns: 200,
1327            },
1328            CommitMarker {
1329                commit_seq: 3,
1330                capsule_id: make_oid(3),
1331                timestamp_ns: 300,
1332            },
1333        ];
1334
1335        // Verify ordering invariant.
1336        for w in markers.windows(2) {
1337            assert!(w[0].commit_seq < w[1].commit_seq);
1338        }
1339    }
1340
1341    #[test]
1342    fn test_ecs_replication_commit_capsules() {
1343        // Commit capsules replicate as ECS objects and appear in missing-set diff.
1344        let local_capsules: BTreeSet<ObjectId> = [make_oid(1), make_oid(2)].into();
1345        let remote_capsules: BTreeSet<ObjectId> = [make_oid(1), make_oid(2), make_oid(3)].into();
1346
1347        let mut session = AntiEntropySession::new();
1348        session
1349            .exchange_tips(
1350                ReplicaTip {
1351                    root_manifest_id: make_oid(10),
1352                    marker_position: 1,
1353                    index_segment_tips: vec![],
1354                },
1355                ReplicaTip {
1356                    root_manifest_id: make_oid(11),
1357                    marker_position: 2,
1358                    index_segment_tips: vec![],
1359                },
1360            )
1361            .unwrap();
1362
1363        let missing = session
1364            .compute_missing(&local_capsules, &remote_capsules)
1365            .unwrap();
1366        assert_eq!(missing.needed, [make_oid(3)].into());
1367    }
1368
1369    #[test]
1370    fn test_ecs_replication_dedup() {
1371        // Duplicate commit markers are suppressed by idempotency key.
1372        let marker = CommitMarker {
1373            commit_seq: 77,
1374            capsule_id: make_oid(9),
1375            timestamp_ns: 1_234,
1376        };
1377        let mut dedup = CommitDeduplicator::default();
1378
1379        assert!(dedup.should_accept(&marker));
1380        assert!(!dedup.should_accept(&marker));
1381        assert_eq!(dedup.seen_count(), 1);
1382    }
1383
1384    // -- E2E tests --
1385
1386    #[test]
1387    fn test_e2e_3_node_replication() {
1388        // Simulate 1 leader + 2 followers. Leader commits, followers converge.
1389        let mut leader_objects: BTreeSet<ObjectId> = BTreeSet::new();
1390
1391        // Leader commits 10 transactions.
1392        for i in 0..10_u8 {
1393            leader_objects.insert(make_oid(i));
1394        }
1395
1396        // Follower 1 starts empty.
1397        let follower1_objects: BTreeSet<ObjectId> = BTreeSet::new();
1398
1399        // Anti-entropy: follower 1 syncs with leader.
1400        let mut session = AntiEntropySession::new();
1401        session
1402            .exchange_tips(
1403                ReplicaTip {
1404                    root_manifest_id: make_oid(0),
1405                    marker_position: 0,
1406                    index_segment_tips: vec![],
1407                },
1408                ReplicaTip {
1409                    root_manifest_id: make_oid(9),
1410                    marker_position: 10,
1411                    index_segment_tips: vec![],
1412                },
1413            )
1414            .unwrap();
1415        let missing = session
1416            .compute_missing(&follower1_objects, &leader_objects)
1417            .unwrap();
1418        assert_eq!(missing.needed.len(), 10);
1419
1420        for &oid in &missing.needed.clone() {
1421            session.record_decoded(oid).unwrap();
1422        }
1423        session.finalize().unwrap();
1424        assert!(session.is_converged());
1425    }
1426
1427    #[test]
1428    fn test_e2e_node_failure_recovery() {
1429        // 3 stores, quorum 2 of 3. Kill store B. Leader still commits. Restart B.
1430        let policy = QuorumPolicy::two_of_three();
1431        let marker = CommitMarker {
1432            commit_seq: 1,
1433            capsule_id: make_oid(1),
1434            timestamp_ns: 1000,
1435        };
1436        let mut gate = CommitPublicationGate::new(marker, policy);
1437
1438        // Store A accepts.
1439        gate.record_store_acceptance(0);
1440        // Store B down — no acceptance.
1441        // Store C accepts.
1442        gate.record_store_acceptance(2);
1443
1444        // Quorum satisfied (A + C = 2 of 3).
1445        assert!(gate.try_publish().is_some());
1446    }
1447
1448    #[test]
1449    fn test_e2e_lossy_replication_convergence() {
1450        // Deterministic 10% lossy delivery across anti-entropy rounds converges.
1451        fn delivered_with_loss(oid: &ObjectId, round: u32, loss_per_mille: u64) -> bool {
1452            let mut material = [0_u8; 20];
1453            material[..16].copy_from_slice(oid.as_bytes());
1454            material[16..].copy_from_slice(&round.to_le_bytes());
1455            xxhash_rust::xxh3::xxh3_64(&material) % 1000 >= loss_per_mille
1456        }
1457
1458        let leader_objects: BTreeSet<ObjectId> = (0_u8..100).map(make_oid).collect();
1459        let mut follower_objects: BTreeSet<ObjectId> = BTreeSet::new();
1460
1461        for round in 0_u32..32 {
1462            if follower_objects == leader_objects {
1463                break;
1464            }
1465
1466            let mut session = AntiEntropySession::new();
1467            session
1468                .exchange_tips(
1469                    ReplicaTip {
1470                        root_manifest_id: make_oid(1),
1471                        marker_position: follower_objects.len() as u64,
1472                        index_segment_tips: vec![],
1473                    },
1474                    ReplicaTip {
1475                        root_manifest_id: make_oid(2),
1476                        marker_position: leader_objects.len() as u64,
1477                        index_segment_tips: vec![],
1478                    },
1479                )
1480                .unwrap();
1481
1482            let missing = session
1483                .compute_missing(&follower_objects, &leader_objects)
1484                .unwrap()
1485                .needed
1486                .clone();
1487
1488            for oid in missing {
1489                if delivered_with_loss(&oid, round, 100) {
1490                    session.record_decoded(oid).unwrap();
1491                    follower_objects.insert(oid);
1492                }
1493            }
1494
1495            if session.phase() == AntiEntropyPhase::PersistAndUpdate {
1496                session.finalize().unwrap();
1497            }
1498        }
1499
1500        assert_eq!(follower_objects, leader_objects);
1501    }
1502}