Skip to main content

mdcs_delta/
buffer.rs

1//! Delta buffer for grouping and batching deltas
2//! Implements Algorithm 1 from the δ-CRDT paper (convergence mode)
3//!
4//! # Algorithm 1: δ-CRDT Anti-Entropy (Convergence Mode)
5//!
6//! The algorithm maintains:
7//! - A local state X
8//! - A delta buffer D
9//! - Sequence numbers for causal ordering
10//!
11//! On local mutation m:
12//!   d = mδ(X)          // compute delta
13//!   X = X ⊔ d          // apply to state
14//!   D = D ⊔ d          // buffer delta
15//!
16//! On send to peer j:
17//!   send D[acked[j]..] to j
18//!
19//! On receive delta d from peer i:
20//!   X = X ⊔ d          // apply (idempotent!)
21//!   ack to i
22
23use mdcs_core::lattice::Lattice;
24use serde::{Deserialize, Serialize};
25use std::collections::{BTreeMap, VecDeque};
26
27/// Sequence number for delta intervals
28pub type SeqNo = u64;
29
30/// Replica identifier
31pub type ReplicaId = String;
32
33/// A delta tagged with sequence information for causal ordering
34#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
35pub struct TaggedDelta<D> {
36    pub seq: SeqNo,
37    pub delta: D,
38}
39
40/// Buffer for outgoing deltas with grouping support
41#[derive(Debug, Clone)]
42pub struct DeltaBuffer<D: Lattice> {
43    /// Current sequence number
44    current_seq: SeqNo,
45    /// Buffered deltas awaiting acknowledgment
46    deltas: VecDeque<TaggedDelta<D>>,
47    /// Maximum deltas to buffer before forcing group-join
48    max_buffer_size: usize,
49}
50
51impl<D: Lattice> DeltaBuffer<D> {
52    pub fn new(max_buffer_size: usize) -> Self {
53        Self {
54            current_seq: 0,
55            deltas: VecDeque::new(),
56            max_buffer_size,
57        }
58    }
59
60    /// Add a new delta to the buffer
61    pub fn push(&mut self, delta: D) {
62        self.current_seq += 1;
63        self.deltas.push_back(TaggedDelta {
64            seq: self.current_seq,
65            delta,
66        });
67
68        // If buffer is full, compact by joining older deltas
69        if self.deltas.len() > self.max_buffer_size {
70            self.compact_oldest();
71        }
72    }
73
74    /// Get deltas for sending to a peer that has acked up to `acked_seq`
75    pub fn deltas_since(&self, acked_seq: SeqNo) -> Vec<&TaggedDelta<D>> {
76        self.deltas.iter().filter(|td| td.seq > acked_seq).collect()
77    }
78
79    /// Create a delta-group (joined deltas) for a peer
80    pub fn delta_group_since(&self, acked_seq: SeqNo) -> Option<D> {
81        let deltas: Vec<_> = self.deltas_since(acked_seq);
82        if deltas.is_empty() {
83            return None;
84        }
85
86        let mut group = D::bottom();
87        for td in deltas {
88            group.join_assign(&td.delta);
89        }
90        Some(group)
91    }
92
93    /// Acknowledge that a peer has received up to `seq`
94    /// Deltas before this can be GC'd if all peers have acked
95    pub fn ack(&mut self, acked_seq: SeqNo) -> usize {
96        let initial_len = self.deltas.len();
97        self.deltas.retain(|td| td.seq > acked_seq);
98        initial_len - self.deltas.len()
99    }
100
101    /// Current sequence number
102    pub fn current_seq(&self) -> SeqNo {
103        self.current_seq
104    }
105
106    /// Number of buffered deltas
107    pub fn len(&self) -> usize {
108        self.deltas.len()
109    }
110
111    /// Check if buffer is empty
112    pub fn is_empty(&self) -> bool {
113        self.deltas.is_empty()
114    }
115
116    /// Clear all buffered deltas
117    pub fn clear(&mut self) {
118        self.deltas.clear();
119    }
120
121    /// Compact oldest deltas by joining them
122    fn compact_oldest(&mut self) {
123        if self.deltas.len() < 2 {
124            return;
125        }
126
127        // Join the two oldest deltas
128        let oldest = self.deltas.pop_front().unwrap();
129        if let Some(second) = self.deltas.front_mut() {
130            second.delta = oldest.delta.join(&second.delta);
131        }
132    }
133}
134
135/// Tracks acknowledgments from peers for garbage collection
136#[derive(Debug, Clone)]
137pub struct AckTracker {
138    /// Maps peer_id -> last acked sequence number
139    acked: BTreeMap<ReplicaId, SeqNo>,
140}
141
142impl AckTracker {
143    pub fn new() -> Self {
144        Self {
145            acked: BTreeMap::new(),
146        }
147    }
148
149    /// Register a peer (initializes ack to 0)
150    pub fn register_peer(&mut self, peer_id: ReplicaId) {
151        self.acked.entry(peer_id).or_insert(0);
152    }
153
154    /// Update the ack for a peer
155    pub fn update_ack(&mut self, peer_id: &str, seq: SeqNo) {
156        if let Some(acked) = self.acked.get_mut(peer_id) {
157            *acked = (*acked).max(seq);
158        }
159    }
160
161    /// Get the ack for a peer
162    pub fn get_ack(&self, peer_id: &str) -> SeqNo {
163        self.acked.get(peer_id).copied().unwrap_or(0)
164    }
165
166    /// Get minimum acked sequence across all peers (safe to GC before this)
167    pub fn min_acked(&self) -> SeqNo {
168        self.acked.values().copied().min().unwrap_or(0)
169    }
170
171    /// Get all registered peers
172    pub fn peers(&self) -> impl Iterator<Item = &ReplicaId> {
173        self.acked.keys()
174    }
175}
176
177impl Default for AckTracker {
178    fn default() -> Self {
179        Self::new()
180    }
181}
182
183/// A delta-CRDT replica implementing Algorithm 1
184#[derive(Debug, Clone)]
185pub struct DeltaReplica<S: Lattice, D: Lattice = S> {
186    /// Replica identifier
187    pub id: ReplicaId,
188    /// Current state
189    state: S,
190    /// Delta buffer for outgoing deltas
191    buffer: DeltaBuffer<D>,
192    /// Ack tracker for peers
193    acks: AckTracker,
194    /// Function to convert state delta to buffer delta (usually identity or subset)
195    _phantom: std::marker::PhantomData<D>,
196}
197
198impl<S: Lattice, D: Lattice> DeltaReplica<S, D> {
199    /// Create a new replica with default buffer size
200    pub fn new(id: impl Into<ReplicaId>) -> Self {
201        Self::with_buffer_size(id, 100)
202    }
203
204    /// Create a new replica with specified buffer size
205    pub fn with_buffer_size(id: impl Into<ReplicaId>, buffer_size: usize) -> Self {
206        Self {
207            id: id.into(),
208            state: S::bottom(),
209            buffer: DeltaBuffer::new(buffer_size),
210            acks: AckTracker::new(),
211            _phantom: std::marker::PhantomData,
212        }
213    }
214
215    /// Get current state (read-only)
216    pub fn state(&self) -> &S {
217        &self.state
218    }
219
220    /// Get mutable access to buffer
221    pub fn buffer(&self) -> &DeltaBuffer<D> {
222        &self.buffer
223    }
224
225    /// Register a peer for anti-entropy
226    pub fn register_peer(&mut self, peer_id: ReplicaId) {
227        self.acks.register_peer(peer_id);
228    }
229
230    /// Current sequence number
231    pub fn current_seq(&self) -> SeqNo {
232        self.buffer.current_seq()
233    }
234}
235
236/// Delta-CRDT replica where state and delta are the same type
237impl<S: Lattice + Clone> DeltaReplica<S, S> {
238    /// Apply a delta-mutator: computes delta, applies to state, buffers delta
239    /// Returns the computed delta
240    pub fn mutate<F>(&mut self, mutator: F) -> S
241    where
242        F: FnOnce(&S) -> S,
243    {
244        // Compute delta: d = mδ(X)
245        let delta = mutator(&self.state);
246
247        // Apply to state: X = X ⊔ d
248        self.state.join_assign(&delta);
249
250        // Buffer delta: D = D ⊔ d
251        self.buffer.push(delta.clone());
252
253        delta
254    }
255
256    /// Get delta-group to send to a peer
257    pub fn prepare_sync(&self, peer_id: &str) -> Option<(S, SeqNo)> {
258        let acked = self.acks.get_ack(peer_id);
259        self.buffer
260            .delta_group_since(acked)
261            .map(|d| (d, self.buffer.current_seq()))
262    }
263
264    /// Receive and apply a delta from a peer (idempotent!)
265    pub fn receive_delta(&mut self, delta: &S) {
266        // X = X ⊔ d (idempotent merge)
267        self.state.join_assign(delta);
268    }
269
270    /// Process an ack from a peer
271    pub fn process_ack(&mut self, peer_id: &str, seq: SeqNo) {
272        self.acks.update_ack(peer_id, seq);
273
274        // GC: remove deltas that all peers have acked
275        let min_acked = self.acks.min_acked();
276        self.buffer.ack(min_acked);
277    }
278
279    /// Full state (for initial sync or recovery)
280    pub fn full_state(&self) -> &S {
281        &self.state
282    }
283
284    /// Sync with another replica directly (for testing/simulation)
285    pub fn sync_with(&mut self, other: &mut DeltaReplica<S, S>) {
286        // Exchange full states (simulates delta exchange converging to full state)
287        let my_state = self.state.clone();
288        let their_state = other.state.clone();
289
290        self.receive_delta(&their_state);
291        other.receive_delta(&my_state);
292    }
293}
294
295#[cfg(test)]
296mod tests {
297    use super::*;
298    use mdcs_core::gset::GSet;
299
300    #[test]
301    fn test_delta_buffer_basic() {
302        let mut buffer: DeltaBuffer<GSet<i32>> = DeltaBuffer::new(10);
303
304        let mut delta1 = GSet::new();
305        delta1.insert(1);
306        buffer.push(delta1);
307
308        assert_eq!(buffer.current_seq(), 1);
309        assert_eq!(buffer.len(), 1);
310
311        let mut delta2 = GSet::new();
312        delta2.insert(2);
313        buffer.push(delta2);
314
315        assert_eq!(buffer.current_seq(), 2);
316        assert_eq!(buffer.len(), 2);
317    }
318
319    #[test]
320    fn test_delta_buffer_group() {
321        let mut buffer: DeltaBuffer<GSet<i32>> = DeltaBuffer::new(10);
322
323        for i in 1..=5 {
324            let mut delta = GSet::new();
325            delta.insert(i);
326            buffer.push(delta);
327        }
328
329        // Get group from seq 2 onwards
330        let group = buffer.delta_group_since(2).unwrap();
331        assert!(!group.contains(&1));
332        assert!(!group.contains(&2));
333        assert!(group.contains(&3));
334        assert!(group.contains(&4));
335        assert!(group.contains(&5));
336    }
337
338    #[test]
339    fn test_delta_buffer_ack() {
340        let mut buffer: DeltaBuffer<GSet<i32>> = DeltaBuffer::new(10);
341
342        for i in 1..=5 {
343            let mut delta = GSet::new();
344            delta.insert(i);
345            buffer.push(delta);
346        }
347
348        assert_eq!(buffer.len(), 5);
349
350        // Ack up to seq 3
351        let removed = buffer.ack(3);
352        assert_eq!(removed, 3);
353        assert_eq!(buffer.len(), 2);
354    }
355
356    #[test]
357    fn test_delta_buffer_compaction() {
358        let mut buffer: DeltaBuffer<GSet<i32>> = DeltaBuffer::new(3);
359
360        for i in 1..=5 {
361            let mut delta = GSet::new();
362            delta.insert(i);
363            buffer.push(delta);
364        }
365
366        // Should have compacted to stay within bounds
367        assert!(buffer.len() <= 3);
368
369        // But all elements should still be reachable via group
370        let group = buffer.delta_group_since(0).unwrap();
371        for i in 1..=5 {
372            assert!(group.contains(&i));
373        }
374    }
375
376    #[test]
377    fn test_ack_tracker() {
378        let mut tracker = AckTracker::new();
379
380        tracker.register_peer("peer1".to_string());
381        tracker.register_peer("peer2".to_string());
382
383        assert_eq!(tracker.get_ack("peer1"), 0);
384        assert_eq!(tracker.get_ack("peer2"), 0);
385
386        tracker.update_ack("peer1", 5);
387        assert_eq!(tracker.get_ack("peer1"), 5);
388        assert_eq!(tracker.min_acked(), 0); // peer2 still at 0
389
390        tracker.update_ack("peer2", 3);
391        assert_eq!(tracker.min_acked(), 3);
392
393        tracker.update_ack("peer2", 7);
394        assert_eq!(tracker.min_acked(), 5);
395    }
396
397    #[test]
398    fn test_delta_replica_basic() {
399        let mut replica: DeltaReplica<GSet<i32>> = DeltaReplica::new("replica1");
400
401        // Mutate using delta-mutator
402        replica.mutate(|_state| {
403            let mut delta = GSet::new();
404            delta.insert(42);
405            delta
406        });
407
408        assert!(replica.state().contains(&42));
409        assert_eq!(replica.current_seq(), 1);
410    }
411
412    #[test]
413    fn test_delta_replica_sync() {
414        let mut replica1: DeltaReplica<GSet<i32>> = DeltaReplica::new("r1");
415        let mut replica2: DeltaReplica<GSet<i32>> = DeltaReplica::new("r2");
416
417        replica1.mutate(|_| {
418            let mut d = GSet::new();
419            d.insert(1);
420            d
421        });
422
423        replica2.mutate(|_| {
424            let mut d = GSet::new();
425            d.insert(2);
426            d
427        });
428
429        // Before sync
430        assert!(replica1.state().contains(&1));
431        assert!(!replica1.state().contains(&2));
432        assert!(!replica2.state().contains(&1));
433        assert!(replica2.state().contains(&2));
434
435        // Sync
436        replica1.sync_with(&mut replica2);
437
438        // After sync - both should have both elements
439        assert!(replica1.state().contains(&1));
440        assert!(replica1.state().contains(&2));
441        assert!(replica2.state().contains(&1));
442        assert!(replica2.state().contains(&2));
443    }
444}