Skip to main content

mdcs_delta/
causal.rs

1//! Causal Consistency Mode for δ-CRDTs (Algorithm 2)
2//!
3//! This module implements the delta-interval anti-entropy algorithm that provides
4//! **causal consistency** guarantees, extending Algorithm 1's convergence mode.
5//!
6//! # Algorithm 2: δ-CRDT Anti-Entropy with Causal Delivery
7//!
8//! Unlike Algorithm 1 which only guarantees eventual convergence, Algorithm 2
9//! ensures that deltas are applied in causal order. This prevents seeing effects
10//! before their causes.
11//!
12//! ## State Components
13//!
14//! Each replica i maintains:
15//! - **Durable state** `(Xᵢ, cᵢ)`:
16//!   - `Xᵢ`: The current CRDT state
17//!   - `cᵢ`: A durable counter (sequence number) that survives crashes
18//!
19//! - **Volatile state** `(Dᵢ, Aᵢ)`:
20//!   - `Dᵢ[j]`: Delta-interval buffer for each peer j (deltas to send)
21//!   - `Aᵢ[j]`: Acknowledgment map - last seq acked by peer j
22//!
23//! ## Protocol
24//!
25//! 1. **On local mutation m**:
26//!    ```text
27//!    cᵢ := cᵢ + 1
28//!    d := mδ(Xᵢ)
29//!    Xᵢ := Xᵢ ⊔ d
30//!    ∀j: Dᵢ[j] := Dᵢ[j] ⊔ d   // add delta to all peer buffers
31//!    ```
32//!
33//! 2. **On send to peer j** (periodic or on-demand):
34//!    ```text
35//!    if Dᵢ[j] ≠ ⊥ then
36//!        send ⟨Dᵢ[j], Aᵢ[j]+1, cᵢ⟩ to j   // delta-interval with seq range
37//!    ```
38//!
39//! 3. **On receive `⟨d, n, m⟩` from peer j**:
40//!    ```text
41//!    if n = Aᵢ[j] + 1 then        // causally ready
42//!        Xᵢ := Xᵢ ⊔ d
43//!        Aᵢ[j] := m
44//!        send ack(m) to j
45//!    else
46//!        discard (or buffer for later)
47//!    ```
48//!
49//! 4. **On receive ack(m) from peer j**:
50//!    ```text
51//!    Dᵢ[j] := ⊥                   // clear delta buffer for j
52//!    ```
53//!
54//! ## Garbage Collection
55//!
56//! Deltas can be safely garbage collected when ALL tracked peers have acknowledged them.
57//! This ensures no peer will ever need those deltas again.
58//!
59//! ## Crash Recovery
60//!
61//! On restart:
62//! - `Xᵢ` and `cᵢ` are restored from durable storage
63//! - `Dᵢ` and `Aᵢ` start fresh (volatile state lost)
64//! - Peers will detect the gap and request retransmission
65
66use crate::buffer::{ReplicaId, SeqNo};
67use mdcs_core::lattice::Lattice;
68use serde::{Deserialize, Serialize};
69use std::collections::{HashMap, VecDeque};
70
71/// A delta-interval message for causal delivery
72///
73/// Contains: `⟨delta, from_seq, to_seq⟩`
74/// - `delta`: The joined delta-group to apply
75/// - `from_seq`: Starting sequence number (exclusive)
76/// - `to_seq`: Ending sequence number (inclusive)
77///
78/// The receiver should only accept if `from_seq == last_acked_from_this_sender`
79#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
80pub struct DeltaInterval<D> {
81    /// The source replica that generated this interval
82    pub from: ReplicaId,
83    /// The destination replica
84    pub to: ReplicaId,
85    /// The joined delta-group
86    pub delta: D,
87    /// Sequence number just before this interval (exclusive lower bound)
88    pub from_seq: SeqNo,
89    /// Sequence number at the end of this interval (inclusive upper bound)
90    pub to_seq: SeqNo,
91}
92
93/// Acknowledgment for a delta-interval
94#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
95pub struct IntervalAck {
96    pub from: ReplicaId,
97    pub to: ReplicaId,
98    /// The sequence number being acknowledged
99    pub acked_seq: SeqNo,
100}
101
102/// Messages for the causal anti-entropy protocol
103#[derive(Debug, Clone)]
104pub enum CausalMessage<D> {
105    /// Delta-interval with causal ordering information
106    DeltaInterval(DeltaInterval<D>),
107    /// Acknowledgment of received interval
108    Ack(IntervalAck),
109    /// Request for state snapshot (for bootstrapping new replicas)
110    SnapshotRequest { from: ReplicaId, to: ReplicaId },
111    /// Full state snapshot response
112    Snapshot {
113        from: ReplicaId,
114        to: ReplicaId,
115        state: D,
116        seq: SeqNo,
117    },
118}
119
120/// Durable state that survives crashes
121///
122/// This must be persisted to stable storage before acknowledging
123/// any mutation or received delta.
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct DurableState<S> {
126    /// The replica's unique identifier
127    pub replica_id: ReplicaId,
128    /// The current CRDT state
129    pub state: S,
130    /// The durable counter (last generated sequence number)
131    pub counter: SeqNo,
132}
133
134impl<S: Lattice> DurableState<S> {
135    pub fn new(replica_id: impl Into<ReplicaId>) -> Self {
136        Self {
137            replica_id: replica_id.into(),
138            state: S::bottom(),
139            counter: 0,
140        }
141    }
142}
143
144/// Per-peer delta buffer for causal mode
145///
146/// Stores deltas that need to be sent to a specific peer,
147/// along with the sequence range they cover.
148#[derive(Debug, Clone)]
149pub struct PeerDeltaBuffer<D: Lattice> {
150    /// The accumulated delta to send
151    delta: Option<D>,
152    /// Sequence number before the first delta in buffer
153    from_seq: SeqNo,
154    /// Sequence number of the last delta in buffer
155    to_seq: SeqNo,
156}
157
158impl<D: Lattice> PeerDeltaBuffer<D> {
159    pub fn new() -> Self {
160        Self {
161            delta: None,
162            from_seq: 0,
163            to_seq: 0,
164        }
165    }
166
167    /// Start tracking from a specific sequence number
168    pub fn start_from(seq: SeqNo) -> Self {
169        Self {
170            delta: None,
171            from_seq: seq,
172            to_seq: seq,
173        }
174    }
175
176    /// Add a delta to this buffer
177    pub fn push(&mut self, delta: D, seq: SeqNo) {
178        match &mut self.delta {
179            Some(existing) => {
180                existing.join_assign(&delta);
181            }
182            None => {
183                self.delta = Some(delta);
184            }
185        }
186        self.to_seq = seq;
187    }
188
189    /// Check if buffer has pending deltas
190    pub fn has_pending(&self) -> bool {
191        self.delta.is_some()
192    }
193
194    /// Take the delta, clearing the buffer
195    pub fn take(&mut self) -> Option<(D, SeqNo, SeqNo)> {
196        self.delta.take().map(|d| {
197            let from = self.from_seq;
198            let to = self.to_seq;
199            self.from_seq = to;
200            (d, from, to)
201        })
202    }
203
204    /// Clear the buffer (on successful ack)
205    pub fn clear(&mut self) {
206        self.delta = None;
207        self.from_seq = self.to_seq;
208    }
209
210    /// Reset the buffer from a new sequence (after peer reconnect)
211    pub fn reset_from(&mut self, seq: SeqNo) {
212        self.delta = None;
213        self.from_seq = seq;
214        self.to_seq = seq;
215    }
216}
217
218impl<D: Lattice> Default for PeerDeltaBuffer<D> {
219    fn default() -> Self {
220        Self::new()
221    }
222}
223
224/// Volatile state for causal anti-entropy (lost on crash)
225#[derive(Debug, Clone)]
226pub struct VolatileState<D: Lattice> {
227    /// Per-peer delta buffers: Dᵢ[j]
228    pub delta_buffers: HashMap<ReplicaId, PeerDeltaBuffer<D>>,
229    /// Per-peer acknowledgment tracking: Aᵢ[j]
230    /// Stores the last sequence number we've received from each peer
231    pub peer_acks: HashMap<ReplicaId, SeqNo>,
232}
233
234impl<D: Lattice> VolatileState<D> {
235    pub fn new() -> Self {
236        Self {
237            delta_buffers: HashMap::new(),
238            peer_acks: HashMap::new(),
239        }
240    }
241
242    /// Register a peer
243    pub fn register_peer(&mut self, peer_id: ReplicaId) {
244        self.delta_buffers
245            .entry(peer_id.clone())
246            .or_default();
247        self.peer_acks.entry(peer_id).or_insert(0);
248    }
249
250    /// Get last acked sequence from a peer
251    pub fn get_peer_ack(&self, peer_id: &str) -> SeqNo {
252        self.peer_acks.get(peer_id).copied().unwrap_or(0)
253    }
254
255    /// Update the ack for a peer
256    pub fn update_peer_ack(&mut self, peer_id: &str, seq: SeqNo) {
257        if let Some(ack) = self.peer_acks.get_mut(peer_id) {
258            *ack = (*ack).max(seq);
259        }
260    }
261}
262
263impl<D: Lattice> Default for VolatileState<D> {
264    fn default() -> Self {
265        Self::new()
266    }
267}
268
269/// A causal δ-CRDT replica implementing Algorithm 2
270///
271/// Provides causal consistency guarantees by:
272/// 1. Tracking per-peer delta intervals
273/// 2. Only accepting deltas in causal order
274/// 3. Supporting crash recovery via durable state
275#[derive(Debug, Clone)]
276pub struct CausalReplica<S: Lattice + Clone> {
277    /// Durable state (survives crashes)
278    durable: DurableState<S>,
279    /// Volatile state (lost on crash)
280    volatile: VolatileState<S>,
281    /// Pending deltas waiting for causal predecessors
282    pending: HashMap<ReplicaId, VecDeque<DeltaInterval<S>>>,
283}
284
285impl<S: Lattice + Clone> CausalReplica<S> {
286    /// Create a new causal replica
287    pub fn new(id: impl Into<ReplicaId>) -> Self {
288        Self {
289            durable: DurableState::new(id),
290            volatile: VolatileState::new(),
291            pending: HashMap::new(),
292        }
293    }
294
295    /// Restore from durable state (after crash)
296    pub fn restore(durable: DurableState<S>) -> Self {
297        Self {
298            durable,
299            volatile: VolatileState::new(),
300            pending: HashMap::new(),
301        }
302    }
303
304    /// Get the replica ID
305    pub fn id(&self) -> &ReplicaId {
306        &self.durable.replica_id
307    }
308
309    /// Get current state (read-only)
310    pub fn state(&self) -> &S {
311        &self.durable.state
312    }
313
314    /// Get the durable counter (sequence number)
315    pub fn counter(&self) -> SeqNo {
316        self.durable.counter
317    }
318
319    /// Get durable state for persistence
320    pub fn durable_state(&self) -> &DurableState<S> {
321        &self.durable
322    }
323
324    /// Register a peer for causal anti-entropy
325    pub fn register_peer(&mut self, peer_id: ReplicaId) {
326        self.volatile.register_peer(peer_id.clone());
327        self.pending.entry(peer_id).or_default();
328    }
329
330    /// Apply a local mutation
331    ///
332    /// Algorithm 2, step 1:
333    /// ```text
334    /// cᵢ := cᵢ + 1
335    /// d := mδ(Xᵢ)
336    /// Xᵢ := Xᵢ ⊔ d
337    /// ∀j: Dᵢ[j] := Dᵢ[j] ⊔ d
338    /// ```
339    ///
340    /// Returns the computed delta
341    pub fn mutate<F>(&mut self, mutator: F) -> S
342    where
343        F: FnOnce(&S) -> S,
344    {
345        // Increment durable counter
346        self.durable.counter += 1;
347        let seq = self.durable.counter;
348
349        // Compute delta: d = mδ(X)
350        let delta = mutator(&self.durable.state);
351
352        // Apply to state: X = X ⊔ d
353        self.durable.state.join_assign(&delta);
354
355        // Add to all peer buffers: ∀j: Dᵢ[j] := Dᵢ[j] ⊔ d
356        for buffer in self.volatile.delta_buffers.values_mut() {
357            buffer.push(delta.clone(), seq);
358        }
359
360        delta
361    }
362
363    /// Prepare a delta-interval to send to a peer
364    ///
365    /// Returns `Some(DeltaInterval)` if there are pending deltas for this peer,
366    /// or `None` if the buffer is empty.
367    pub fn prepare_interval(&mut self, peer_id: &str) -> Option<DeltaInterval<S>> {
368        let buffer = self.volatile.delta_buffers.get_mut(peer_id)?;
369
370        buffer
371            .take()
372            .map(|(delta, from_seq, to_seq)| DeltaInterval {
373                from: self.durable.replica_id.clone(),
374                to: peer_id.to_string(),
375                delta,
376                from_seq,
377                to_seq,
378            })
379    }
380
381    /// Check if a delta-interval is causally ready
382    ///
383    /// A delta-interval is ready if its from_seq matches our last acked seq from that peer
384    fn is_causally_ready(&self, interval: &DeltaInterval<S>) -> bool {
385        let last_acked = self.volatile.get_peer_ack(&interval.from);
386        interval.from_seq == last_acked
387    }
388
389    /// Receive a delta-interval from a peer
390    ///
391    /// Algorithm 2, step 3:
392    /// ```text
393    /// if n = Aᵢ[j] + 1 then        // causally ready
394    ///     Xᵢ := Xᵢ ⊔ d
395    ///     Aᵢ[j] := m
396    ///     send ack(m) to j
397    /// else
398    ///     buffer for later
399    /// ```
400    ///
401    /// Returns `Some(IntervalAck)` if the interval was applied (causally ready),
402    /// or `None` if it was buffered for later.
403    pub fn receive_interval(&mut self, interval: DeltaInterval<S>) -> Option<IntervalAck> {
404        // Register the peer if not known
405        if !self.volatile.peer_acks.contains_key(&interval.from) {
406            self.register_peer(interval.from.clone());
407        }
408
409        if self.is_causally_ready(&interval) {
410            // Apply the delta
411            self.durable.state.join_assign(&interval.delta);
412
413            // Update our ack for this peer
414            self.volatile
415                .update_peer_ack(&interval.from, interval.to_seq);
416
417            let ack = IntervalAck {
418                from: self.durable.replica_id.clone(),
419                to: interval.from.clone(),
420                acked_seq: interval.to_seq,
421            };
422
423            // Try to apply any pending intervals that are now ready
424            self.try_apply_pending(&interval.from);
425
426            Some(ack)
427        } else {
428            // Buffer for later
429            let pending = self
430                .pending
431                .entry(interval.from.clone())
432                .or_default();
433
434            // Insert in sorted order by from_seq
435            let pos = pending.iter().position(|p| p.from_seq > interval.from_seq);
436            match pos {
437                Some(i) => pending.insert(i, interval),
438                None => pending.push_back(interval),
439            }
440
441            None
442        }
443    }
444
445    /// Try to apply pending intervals that are now causally ready
446    fn try_apply_pending(&mut self, peer_id: &str) -> Vec<IntervalAck> {
447        let mut acks = Vec::new();
448
449        if let Some(pending) = self.pending.get_mut(peer_id) {
450            while let Some(interval) = pending.front() {
451                let last_acked = self.volatile.get_peer_ack(peer_id);
452                if interval.from_seq == last_acked {
453                    let interval = pending.pop_front().unwrap();
454
455                    // Apply the delta
456                    self.durable.state.join_assign(&interval.delta);
457
458                    // Update our ack
459                    self.volatile.update_peer_ack(peer_id, interval.to_seq);
460
461                    acks.push(IntervalAck {
462                        from: self.durable.replica_id.clone(),
463                        to: interval.from.clone(),
464                        acked_seq: interval.to_seq,
465                    });
466                } else {
467                    break;
468                }
469            }
470        }
471
472        acks
473    }
474
475    /// Process an acknowledgment from a peer
476    ///
477    /// Algorithm 2, step 4:
478    /// ```text
479    /// Dᵢ[j] := ⊥   // clear delta buffer for j
480    /// ```
481    pub fn receive_ack(&mut self, ack: &IntervalAck) {
482        if let Some(buffer) = self.volatile.delta_buffers.get_mut(&ack.from) {
483            buffer.clear();
484        }
485    }
486
487    /// Get a full state snapshot for bootstrapping
488    pub fn snapshot(&self) -> (S, SeqNo) {
489        (self.durable.state.clone(), self.durable.counter)
490    }
491
492    /// Apply a snapshot from another replica (for bootstrapping)
493    pub fn apply_snapshot(&mut self, state: S, seq: SeqNo, from: &str) {
494        self.durable.state.join_assign(&state);
495        self.volatile.update_peer_ack(from, seq);
496    }
497
498    /// Get all registered peer IDs
499    pub fn peers(&self) -> impl Iterator<Item = &ReplicaId> {
500        self.volatile.peer_acks.keys()
501    }
502
503    /// Check if we have pending deltas for any peer
504    pub fn has_pending_deltas(&self) -> bool {
505        self.volatile
506            .delta_buffers
507            .values()
508            .any(|b| b.has_pending())
509    }
510
511    /// Count of pending out-of-order intervals
512    pub fn pending_count(&self) -> usize {
513        self.pending.values().map(|v| v.len()).sum()
514    }
515}
516
517/// Trait for durable storage backends
518///
519/// Implement this trait to persist `DurableState` across crashes.
520pub trait DurableStorage<S: Lattice> {
521    /// Persist the durable state
522    fn persist(&mut self, state: &DurableState<S>) -> Result<(), StorageError>;
523
524    /// Load the durable state
525    fn load(&self, replica_id: &str) -> Result<Option<DurableState<S>>, StorageError>;
526
527    /// Force sync to stable storage
528    fn sync(&mut self) -> Result<(), StorageError>;
529}
530
531/// Storage errors
532#[derive(Debug, Clone)]
533pub enum StorageError {
534    IoError(String),
535    SerializationError(String),
536    NotFound,
537}
538
539impl std::fmt::Display for StorageError {
540    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
541        match self {
542            StorageError::IoError(msg) => write!(f, "IO error: {}", msg),
543            StorageError::SerializationError(msg) => write!(f, "Serialization error: {}", msg),
544            StorageError::NotFound => write!(f, "State not found"),
545        }
546    }
547}
548
549impl std::error::Error for StorageError {}
550
551/// In-memory storage for testing (simulates durable storage)
552#[derive(Debug, Default)]
553pub struct MemoryStorage<S> {
554    states: HashMap<ReplicaId, DurableState<S>>,
555}
556
557impl<S: Clone> MemoryStorage<S> {
558    pub fn new() -> Self {
559        Self {
560            states: HashMap::new(),
561        }
562    }
563}
564
565impl<S: Lattice + Clone + Serialize + for<'de> Deserialize<'de>> DurableStorage<S>
566    for MemoryStorage<S>
567{
568    fn persist(&mut self, state: &DurableState<S>) -> Result<(), StorageError> {
569        self.states.insert(state.replica_id.clone(), state.clone());
570        Ok(())
571    }
572
573    fn load(&self, replica_id: &str) -> Result<Option<DurableState<S>>, StorageError> {
574        Ok(self.states.get(replica_id).cloned())
575    }
576
577    fn sync(&mut self) -> Result<(), StorageError> {
578        Ok(())
579    }
580}
581
582/// Network simulator for causal anti-entropy
583#[derive(Debug)]
584pub struct CausalNetworkSimulator<D> {
585    /// Messages in flight
586    in_flight: VecDeque<CausalMessage<D>>,
587    /// Messages that were "lost"
588    lost: Vec<CausalMessage<D>>,
589    /// Loss rate (0.0 - 1.0)
590    loss_rate: f64,
591    /// Random state
592    rng_state: u64,
593}
594
595impl<D: Clone> CausalNetworkSimulator<D> {
596    pub fn new(loss_rate: f64) -> Self {
597        Self {
598            in_flight: VecDeque::new(),
599            lost: Vec::new(),
600            loss_rate,
601            rng_state: 42,
602        }
603    }
604
605    /// Simple random number generator
606    fn next_random(&mut self) -> f64 {
607        self.rng_state = self.rng_state.wrapping_mul(1103515245).wrapping_add(12345);
608        ((self.rng_state >> 16) & 0x7fff) as f64 / 32768.0
609    }
610
611    /// Send a message
612    pub fn send(&mut self, msg: CausalMessage<D>) {
613        if self.next_random() < self.loss_rate {
614            self.lost.push(msg);
615        } else {
616            self.in_flight.push_back(msg);
617        }
618    }
619
620    /// Receive the next message
621    pub fn receive(&mut self) -> Option<CausalMessage<D>> {
622        self.in_flight.pop_front()
623    }
624
625    /// Retransmit lost messages
626    pub fn retransmit_lost(&mut self) {
627        for msg in self.lost.drain(..) {
628            self.in_flight.push_back(msg);
629        }
630    }
631
632    /// Check if empty
633    pub fn is_empty(&self) -> bool {
634        self.in_flight.is_empty()
635    }
636
637    /// Messages in flight
638    pub fn in_flight_count(&self) -> usize {
639        self.in_flight.len()
640    }
641
642    /// Lost messages
643    pub fn lost_count(&self) -> usize {
644        self.lost.len()
645    }
646}
647
648/// Cluster coordinator for causal anti-entropy
649#[derive(Debug)]
650pub struct CausalCluster<S: Lattice + Clone> {
651    /// All replicas
652    replicas: Vec<CausalReplica<S>>,
653    /// Network simulator
654    network: CausalNetworkSimulator<S>,
655}
656
657impl<S: Lattice + Clone> CausalCluster<S> {
658    /// Create a new cluster with n replicas
659    pub fn new(n: usize, loss_rate: f64) -> Self {
660        let mut replicas = Vec::with_capacity(n);
661
662        // Create replicas
663        for i in 0..n {
664            let mut replica = CausalReplica::new(format!("causal_{}", i));
665            // Register all other peers
666            for j in 0..n {
667                if i != j {
668                    replica.register_peer(format!("causal_{}", j));
669                }
670            }
671            replicas.push(replica);
672        }
673
674        Self {
675            replicas,
676            network: CausalNetworkSimulator::new(loss_rate),
677        }
678    }
679
680    /// Get replica by index
681    pub fn replica(&self, idx: usize) -> &CausalReplica<S> {
682        &self.replicas[idx]
683    }
684
685    /// Get mutable replica
686    pub fn replica_mut(&mut self, idx: usize) -> &mut CausalReplica<S> {
687        &mut self.replicas[idx]
688    }
689
690    /// Perform a mutation
691    pub fn mutate<F>(&mut self, replica_idx: usize, mutator: F) -> S
692    where
693        F: FnOnce(&S) -> S,
694    {
695        self.replicas[replica_idx].mutate(mutator)
696    }
697
698    /// Initiate sync from one replica to all its peers
699    pub fn broadcast_intervals(&mut self, from_idx: usize) {
700        let replica = &mut self.replicas[from_idx];
701        let peer_ids: Vec<_> = replica.peers().cloned().collect();
702
703        for peer_id in peer_ids {
704            if let Some(interval) = replica.prepare_interval(&peer_id) {
705                self.network.send(CausalMessage::DeltaInterval(interval));
706            }
707        }
708    }
709
710    /// Process one network message
711    pub fn process_one(&mut self) -> bool {
712        if let Some(msg) = self.network.receive() {
713            match msg {
714                CausalMessage::DeltaInterval(interval) => {
715                    // Find recipient
716                    for replica in &mut self.replicas {
717                        if replica.id() == &interval.to {
718                            if let Some(ack) = replica.receive_interval(interval.clone()) {
719                                self.network.send(CausalMessage::Ack(ack));
720                            }
721                            break;
722                        }
723                    }
724                }
725                CausalMessage::Ack(ack) => {
726                    // Find recipient
727                    for replica in &mut self.replicas {
728                        if replica.id() == &ack.to {
729                            replica.receive_ack(&ack);
730                            break;
731                        }
732                    }
733                }
734                CausalMessage::SnapshotRequest { from, to } => {
735                    // Find source and send snapshot
736                    for replica in &self.replicas {
737                        if replica.id() == &to {
738                            let (state, seq) = replica.snapshot();
739                            self.network.send(CausalMessage::Snapshot {
740                                from: to,
741                                to: from,
742                                state,
743                                seq,
744                            });
745                            break;
746                        }
747                    }
748                }
749                CausalMessage::Snapshot {
750                    from,
751                    to,
752                    state,
753                    seq,
754                } => {
755                    // Find recipient and apply
756                    for replica in &mut self.replicas {
757                        if replica.id() == &to {
758                            replica.apply_snapshot(state, seq, &from);
759                            break;
760                        }
761                    }
762                }
763            }
764            true
765        } else {
766            false
767        }
768    }
769
770    /// Drain all messages
771    pub fn drain_network(&mut self) {
772        while self.process_one() {}
773    }
774
775    /// Full sync round
776    pub fn full_sync_round(&mut self) {
777        let n = self.replicas.len();
778        for i in 0..n {
779            self.broadcast_intervals(i);
780        }
781        self.drain_network();
782    }
783
784    /// Check if converged
785    pub fn is_converged(&self) -> bool {
786        if self.replicas.len() < 2 {
787            return true;
788        }
789
790        let first = self.replicas[0].state();
791        self.replicas.iter().skip(1).all(|r| r.state() == first)
792    }
793
794    /// Retransmit and process
795    pub fn retransmit_and_process(&mut self) {
796        self.network.retransmit_lost();
797        self.drain_network();
798    }
799
800    /// Number of replicas
801    pub fn len(&self) -> usize {
802        self.replicas.len()
803    }
804
805    /// Check if empty
806    pub fn is_empty(&self) -> bool {
807        self.replicas.is_empty()
808    }
809
810    /// Simulate a crash and recovery for a replica
811    pub fn crash_and_recover(&mut self, idx: usize) {
812        let durable = self.replicas[idx].durable_state().clone();
813
814        // Restore from durable state (volatile state is lost)
815        let mut recovered = CausalReplica::restore(durable);
816
817        // Re-register peers
818        let n = self.replicas.len();
819        for j in 0..n {
820            if idx != j {
821                recovered.register_peer(format!("causal_{}", j));
822            }
823        }
824
825        self.replicas[idx] = recovered;
826    }
827
828    /// Get total pending out-of-order intervals across all replicas
829    pub fn total_pending(&self) -> usize {
830        self.replicas.iter().map(|r| r.pending_count()).sum()
831    }
832}
833
834#[cfg(test)]
835mod tests {
836    use super::*;
837    use mdcs_core::gset::GSet;
838    use mdcs_core::pncounter::PNCounter;
839
840    #[test]
841    fn test_causal_replica_basic() {
842        let mut replica: CausalReplica<GSet<i32>> = CausalReplica::new("test1");
843
844        replica.mutate(|_| {
845            let mut d = GSet::new();
846            d.insert(42);
847            d
848        });
849
850        assert!(replica.state().contains(&42));
851        assert_eq!(replica.counter(), 1);
852    }
853
854    #[test]
855    fn test_causal_interval_generation() {
856        let mut replica: CausalReplica<GSet<i32>> = CausalReplica::new("test1");
857        replica.register_peer("peer1".to_string());
858
859        replica.mutate(|_| {
860            let mut d = GSet::new();
861            d.insert(1);
862            d
863        });
864
865        replica.mutate(|_| {
866            let mut d = GSet::new();
867            d.insert(2);
868            d
869        });
870
871        let interval = replica.prepare_interval("peer1").unwrap();
872        assert_eq!(interval.from_seq, 0);
873        assert_eq!(interval.to_seq, 2);
874        assert!(interval.delta.contains(&1));
875        assert!(interval.delta.contains(&2));
876    }
877
878    #[test]
879    fn test_causal_delivery() {
880        let mut r1: CausalReplica<GSet<i32>> = CausalReplica::new("r1");
881        let mut r2: CausalReplica<GSet<i32>> = CausalReplica::new("r2");
882
883        r1.register_peer("r2".to_string());
884        r2.register_peer("r1".to_string());
885
886        // r1 creates two mutations
887        r1.mutate(|_| {
888            let mut d = GSet::new();
889            d.insert(1);
890            d
891        });
892        r1.mutate(|_| {
893            let mut d = GSet::new();
894            d.insert(2);
895            d
896        });
897
898        // Get interval
899        let interval = r1.prepare_interval("r2").unwrap();
900        assert_eq!(interval.from_seq, 0);
901        assert_eq!(interval.to_seq, 2);
902
903        // r2 receives it
904        let ack = r2.receive_interval(interval).unwrap();
905        assert_eq!(ack.acked_seq, 2);
906
907        // r2 now has both elements
908        assert!(r2.state().contains(&1));
909        assert!(r2.state().contains(&2));
910    }
911
912    #[test]
913    fn test_out_of_order_buffering() {
914        let mut replica: CausalReplica<GSet<i32>> = CausalReplica::new("r1");
915        replica.register_peer("peer".to_string());
916
917        // Create an interval that's NOT causally ready (from_seq = 5, but we've acked 0)
918        let out_of_order = DeltaInterval {
919            from: "peer".to_string(),
920            to: "r1".to_string(),
921            delta: {
922                let mut d = GSet::new();
923                d.insert(999);
924                d
925            },
926            from_seq: 5, // Not ready - we haven't seen 1-5
927            to_seq: 6,
928        };
929
930        // Should be buffered, not applied
931        let result = replica.receive_interval(out_of_order);
932        assert!(result.is_none());
933        assert_eq!(replica.pending_count(), 1);
934        assert!(!replica.state().contains(&999));
935    }
936
937    #[test]
938    fn test_cluster_convergence() {
939        let mut cluster: CausalCluster<GSet<i32>> = CausalCluster::new(3, 0.0);
940
941        // Each replica adds different element
942        for i in 0..3 {
943            let val = (i + 1) as i32;
944            cluster.mutate(i, move |_| {
945                let mut d = GSet::new();
946                d.insert(val);
947                d
948            });
949        }
950
951        // Not converged yet
952        assert!(!cluster.is_converged());
953
954        // Sync
955        cluster.full_sync_round();
956
957        // Should converge
958        assert!(cluster.is_converged());
959
960        // All replicas should have all elements
961        for i in 0..3 {
962            for val in 1..=3 {
963                assert!(cluster.replica(i).state().contains(&val));
964            }
965        }
966    }
967
968    #[test]
969    fn test_cluster_with_loss() {
970        let mut cluster: CausalCluster<GSet<i32>> = CausalCluster::new(3, 0.3);
971
972        for i in 0..3 {
973            let val = (i + 1) as i32;
974            cluster.mutate(i, move |_| {
975                let mut d = GSet::new();
976                d.insert(val);
977                d
978            });
979        }
980
981        // Multiple rounds with retransmission
982        for _ in 0..10 {
983            cluster.full_sync_round();
984            cluster.retransmit_and_process();
985        }
986
987        // Should eventually converge
988        assert!(cluster.is_converged());
989    }
990
991    #[test]
992    fn test_crash_recovery() {
993        let mut cluster: CausalCluster<GSet<i32>> = CausalCluster::new(2, 0.0);
994
995        // r0 adds element
996        cluster.mutate(0, |_| {
997            let mut d = GSet::new();
998            d.insert(1);
999            d
1000        });
1001
1002        // Sync
1003        cluster.full_sync_round();
1004        assert!(cluster.is_converged());
1005
1006        // r0 adds another element
1007        cluster.mutate(0, |_| {
1008            let mut d = GSet::new();
1009            d.insert(2);
1010            d
1011        });
1012
1013        // r0 crashes before syncing
1014        let counter_before = cluster.replica(0).counter();
1015        cluster.crash_and_recover(0);
1016
1017        // Durable state should be preserved
1018        assert_eq!(cluster.replica(0).counter(), counter_before);
1019        assert!(cluster.replica(0).state().contains(&1));
1020        assert!(cluster.replica(0).state().contains(&2));
1021
1022        // But volatile state (delta buffers) is lost
1023        // r0 needs to re-sync
1024        assert!(!cluster.replica(0).has_pending_deltas());
1025    }
1026
1027    #[test]
1028    fn test_pncounter_causal() {
1029        let mut cluster: CausalCluster<PNCounter<String>> = CausalCluster::new(2, 0.0);
1030
1031        // r0 increments
1032        cluster.mutate(0, |_s| {
1033            let mut delta = PNCounter::new();
1034            delta.increment("r0".to_string(), 1);
1035            delta
1036        });
1037
1038        // r1 decrements
1039        cluster.mutate(1, |_s| {
1040            let mut delta = PNCounter::new();
1041            delta.decrement("r1".to_string(), 1);
1042            delta
1043        });
1044
1045        // Sync
1046        cluster.full_sync_round();
1047
1048        // Both should have value 0 (1 - 1)
1049        assert!(cluster.is_converged());
1050        assert_eq!(cluster.replica(0).state().value(), 0);
1051    }
1052
1053    #[test]
1054    fn test_causal_ordering_preserved() {
1055        // This test verifies that causal ordering is respected
1056        let mut r1: CausalReplica<GSet<i32>> = CausalReplica::new("r1");
1057        let mut r2: CausalReplica<GSet<i32>> = CausalReplica::new("r2");
1058
1059        r1.register_peer("r2".to_string());
1060        r2.register_peer("r1".to_string());
1061
1062        // r1 creates three sequential mutations
1063        for i in 1..=3 {
1064            r1.mutate(move |_| {
1065                let mut d = GSet::new();
1066                d.insert(i);
1067                d
1068            });
1069        }
1070
1071        // Create intervals for each mutation
1072        // Simulate them arriving out of order by creating separate intervals
1073
1074        // We need to manually create intervals to test out-of-order delivery
1075        let interval_1_3 = DeltaInterval {
1076            from: "r1".to_string(),
1077            to: "r2".to_string(),
1078            delta: {
1079                let mut d = GSet::new();
1080                d.insert(3);
1081                d
1082            },
1083            from_seq: 2, // This requires seq 1-2 to be acked first
1084            to_seq: 3,
1085        };
1086
1087        let interval_0_2 = DeltaInterval {
1088            from: "r1".to_string(),
1089            to: "r2".to_string(),
1090            delta: {
1091                let mut d = GSet::new();
1092                d.insert(1);
1093                d.insert(2);
1094                d
1095            },
1096            from_seq: 0,
1097            to_seq: 2,
1098        };
1099
1100        // Send interval 2-3 first (out of order)
1101        let result = r2.receive_interval(interval_1_3.clone());
1102        assert!(result.is_none()); // Should be buffered
1103        assert!(!r2.state().contains(&3)); // Not yet applied
1104
1105        // Now send interval 0-2
1106        let result = r2.receive_interval(interval_0_2);
1107        assert!(result.is_some()); // Should be applied
1108        assert!(r2.state().contains(&1));
1109        assert!(r2.state().contains(&2));
1110
1111        // And the pending interval should now be applied too!
1112        assert!(r2.state().contains(&3));
1113        assert_eq!(r2.pending_count(), 0);
1114    }
1115
1116    #[test]
1117    fn test_durable_storage() {
1118        let mut storage: MemoryStorage<GSet<i32>> = MemoryStorage::new();
1119
1120        let mut replica: CausalReplica<GSet<i32>> = CausalReplica::new("test");
1121        replica.mutate(|_| {
1122            let mut d = GSet::new();
1123            d.insert(42);
1124            d
1125        });
1126
1127        // Persist
1128        storage.persist(replica.durable_state()).unwrap();
1129
1130        // Load
1131        let loaded = storage.load("test").unwrap().unwrap();
1132        assert_eq!(loaded.counter, 1);
1133        assert!(loaded.state.contains(&42));
1134
1135        // Restore
1136        let recovered = CausalReplica::restore(loaded);
1137        assert!(recovered.state().contains(&42));
1138    }
1139}