1use mdcs_core::lattice::Lattice;
24use serde::{Deserialize, Serialize};
25use std::collections::{BTreeMap, VecDeque};
26
27pub type SeqNo = u64;
29
30pub type ReplicaId = String;
32
33#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
35pub struct TaggedDelta<D> {
36 pub seq: SeqNo,
37 pub delta: D,
38}
39
40#[derive(Debug, Clone)]
42pub struct DeltaBuffer<D: Lattice> {
43 current_seq: SeqNo,
45 deltas: VecDeque<TaggedDelta<D>>,
47 max_buffer_size: usize,
49}
50
51impl<D: Lattice> DeltaBuffer<D> {
52 pub fn new(max_buffer_size: usize) -> Self {
53 Self {
54 current_seq: 0,
55 deltas: VecDeque::new(),
56 max_buffer_size,
57 }
58 }
59
60 pub fn push(&mut self, delta: D) {
62 self.current_seq += 1;
63 self.deltas.push_back(TaggedDelta {
64 seq: self.current_seq,
65 delta,
66 });
67
68 if self.deltas.len() > self.max_buffer_size {
70 self.compact_oldest();
71 }
72 }
73
74 pub fn deltas_since(&self, acked_seq: SeqNo) -> Vec<&TaggedDelta<D>> {
76 self.deltas.iter().filter(|td| td.seq > acked_seq).collect()
77 }
78
79 pub fn delta_group_since(&self, acked_seq: SeqNo) -> Option<D> {
81 let deltas: Vec<_> = self.deltas_since(acked_seq);
82 if deltas.is_empty() {
83 return None;
84 }
85
86 let mut group = D::bottom();
87 for td in deltas {
88 group.join_assign(&td.delta);
89 }
90 Some(group)
91 }
92
93 pub fn ack(&mut self, acked_seq: SeqNo) -> usize {
96 let initial_len = self.deltas.len();
97 self.deltas.retain(|td| td.seq > acked_seq);
98 initial_len - self.deltas.len()
99 }
100
101 pub fn current_seq(&self) -> SeqNo {
103 self.current_seq
104 }
105
106 pub fn len(&self) -> usize {
108 self.deltas.len()
109 }
110
111 pub fn is_empty(&self) -> bool {
113 self.deltas.is_empty()
114 }
115
116 pub fn clear(&mut self) {
118 self.deltas.clear();
119 }
120
121 fn compact_oldest(&mut self) {
123 if self.deltas.len() < 2 {
124 return;
125 }
126
127 let oldest = self.deltas.pop_front().unwrap();
129 if let Some(second) = self.deltas.front_mut() {
130 second.delta = oldest.delta.join(&second.delta);
131 }
132 }
133}
134
135#[derive(Debug, Clone)]
137pub struct AckTracker {
138 acked: BTreeMap<ReplicaId, SeqNo>,
140}
141
142impl AckTracker {
143 pub fn new() -> Self {
144 Self {
145 acked: BTreeMap::new(),
146 }
147 }
148
149 pub fn register_peer(&mut self, peer_id: ReplicaId) {
151 self.acked.entry(peer_id).or_insert(0);
152 }
153
154 pub fn update_ack(&mut self, peer_id: &str, seq: SeqNo) {
156 if let Some(acked) = self.acked.get_mut(peer_id) {
157 *acked = (*acked).max(seq);
158 }
159 }
160
161 pub fn get_ack(&self, peer_id: &str) -> SeqNo {
163 self.acked.get(peer_id).copied().unwrap_or(0)
164 }
165
166 pub fn min_acked(&self) -> SeqNo {
168 self.acked.values().copied().min().unwrap_or(0)
169 }
170
171 pub fn peers(&self) -> impl Iterator<Item = &ReplicaId> {
173 self.acked.keys()
174 }
175}
176
177impl Default for AckTracker {
178 fn default() -> Self {
179 Self::new()
180 }
181}
182
183#[derive(Debug, Clone)]
185pub struct DeltaReplica<S: Lattice, D: Lattice = S> {
186 pub id: ReplicaId,
188 state: S,
190 buffer: DeltaBuffer<D>,
192 acks: AckTracker,
194 _phantom: std::marker::PhantomData<D>,
196}
197
198impl<S: Lattice, D: Lattice> DeltaReplica<S, D> {
199 pub fn new(id: impl Into<ReplicaId>) -> Self {
201 Self::with_buffer_size(id, 100)
202 }
203
204 pub fn with_buffer_size(id: impl Into<ReplicaId>, buffer_size: usize) -> Self {
206 Self {
207 id: id.into(),
208 state: S::bottom(),
209 buffer: DeltaBuffer::new(buffer_size),
210 acks: AckTracker::new(),
211 _phantom: std::marker::PhantomData,
212 }
213 }
214
215 pub fn state(&self) -> &S {
217 &self.state
218 }
219
220 pub fn buffer(&self) -> &DeltaBuffer<D> {
222 &self.buffer
223 }
224
225 pub fn register_peer(&mut self, peer_id: ReplicaId) {
227 self.acks.register_peer(peer_id);
228 }
229
230 pub fn current_seq(&self) -> SeqNo {
232 self.buffer.current_seq()
233 }
234}
235
236impl<S: Lattice + Clone> DeltaReplica<S, S> {
238 pub fn mutate<F>(&mut self, mutator: F) -> S
241 where
242 F: FnOnce(&S) -> S,
243 {
244 let delta = mutator(&self.state);
246
247 self.state.join_assign(&delta);
249
250 self.buffer.push(delta.clone());
252
253 delta
254 }
255
256 pub fn prepare_sync(&self, peer_id: &str) -> Option<(S, SeqNo)> {
258 let acked = self.acks.get_ack(peer_id);
259 self.buffer
260 .delta_group_since(acked)
261 .map(|d| (d, self.buffer.current_seq()))
262 }
263
264 pub fn receive_delta(&mut self, delta: &S) {
266 self.state.join_assign(delta);
268 }
269
270 pub fn process_ack(&mut self, peer_id: &str, seq: SeqNo) {
272 self.acks.update_ack(peer_id, seq);
273
274 let min_acked = self.acks.min_acked();
276 self.buffer.ack(min_acked);
277 }
278
279 pub fn full_state(&self) -> &S {
281 &self.state
282 }
283
284 pub fn sync_with(&mut self, other: &mut DeltaReplica<S, S>) {
286 let my_state = self.state.clone();
288 let their_state = other.state.clone();
289
290 self.receive_delta(&their_state);
291 other.receive_delta(&my_state);
292 }
293}
294
295#[cfg(test)]
296mod tests {
297 use super::*;
298 use mdcs_core::gset::GSet;
299
300 #[test]
301 fn test_delta_buffer_basic() {
302 let mut buffer: DeltaBuffer<GSet<i32>> = DeltaBuffer::new(10);
303
304 let mut delta1 = GSet::new();
305 delta1.insert(1);
306 buffer.push(delta1);
307
308 assert_eq!(buffer.current_seq(), 1);
309 assert_eq!(buffer.len(), 1);
310
311 let mut delta2 = GSet::new();
312 delta2.insert(2);
313 buffer.push(delta2);
314
315 assert_eq!(buffer.current_seq(), 2);
316 assert_eq!(buffer.len(), 2);
317 }
318
319 #[test]
320 fn test_delta_buffer_group() {
321 let mut buffer: DeltaBuffer<GSet<i32>> = DeltaBuffer::new(10);
322
323 for i in 1..=5 {
324 let mut delta = GSet::new();
325 delta.insert(i);
326 buffer.push(delta);
327 }
328
329 let group = buffer.delta_group_since(2).unwrap();
331 assert!(!group.contains(&1));
332 assert!(!group.contains(&2));
333 assert!(group.contains(&3));
334 assert!(group.contains(&4));
335 assert!(group.contains(&5));
336 }
337
338 #[test]
339 fn test_delta_buffer_ack() {
340 let mut buffer: DeltaBuffer<GSet<i32>> = DeltaBuffer::new(10);
341
342 for i in 1..=5 {
343 let mut delta = GSet::new();
344 delta.insert(i);
345 buffer.push(delta);
346 }
347
348 assert_eq!(buffer.len(), 5);
349
350 let removed = buffer.ack(3);
352 assert_eq!(removed, 3);
353 assert_eq!(buffer.len(), 2);
354 }
355
356 #[test]
357 fn test_delta_buffer_compaction() {
358 let mut buffer: DeltaBuffer<GSet<i32>> = DeltaBuffer::new(3);
359
360 for i in 1..=5 {
361 let mut delta = GSet::new();
362 delta.insert(i);
363 buffer.push(delta);
364 }
365
366 assert!(buffer.len() <= 3);
368
369 let group = buffer.delta_group_since(0).unwrap();
371 for i in 1..=5 {
372 assert!(group.contains(&i));
373 }
374 }
375
376 #[test]
377 fn test_ack_tracker() {
378 let mut tracker = AckTracker::new();
379
380 tracker.register_peer("peer1".to_string());
381 tracker.register_peer("peer2".to_string());
382
383 assert_eq!(tracker.get_ack("peer1"), 0);
384 assert_eq!(tracker.get_ack("peer2"), 0);
385
386 tracker.update_ack("peer1", 5);
387 assert_eq!(tracker.get_ack("peer1"), 5);
388 assert_eq!(tracker.min_acked(), 0); tracker.update_ack("peer2", 3);
391 assert_eq!(tracker.min_acked(), 3);
392
393 tracker.update_ack("peer2", 7);
394 assert_eq!(tracker.min_acked(), 5);
395 }
396
397 #[test]
398 fn test_delta_replica_basic() {
399 let mut replica: DeltaReplica<GSet<i32>> = DeltaReplica::new("replica1");
400
401 replica.mutate(|_state| {
403 let mut delta = GSet::new();
404 delta.insert(42);
405 delta
406 });
407
408 assert!(replica.state().contains(&42));
409 assert_eq!(replica.current_seq(), 1);
410 }
411
412 #[test]
413 fn test_delta_replica_sync() {
414 let mut replica1: DeltaReplica<GSet<i32>> = DeltaReplica::new("r1");
415 let mut replica2: DeltaReplica<GSet<i32>> = DeltaReplica::new("r2");
416
417 replica1.mutate(|_| {
418 let mut d = GSet::new();
419 d.insert(1);
420 d
421 });
422
423 replica2.mutate(|_| {
424 let mut d = GSet::new();
425 d.insert(2);
426 d
427 });
428
429 assert!(replica1.state().contains(&1));
431 assert!(!replica1.state().contains(&2));
432 assert!(!replica2.state().contains(&1));
433 assert!(replica2.state().contains(&2));
434
435 replica1.sync_with(&mut replica2);
437
438 assert!(replica1.state().contains(&1));
440 assert!(replica1.state().contains(&2));
441 assert!(replica2.state().contains(&1));
442 assert!(replica2.state().contains(&2));
443 }
444}