Skip to main content

logicaffeine_data/crdt/causal/
context.rs

1//! Dot Context - Tracks seen events for OR-Set semantics.
2//!
3//! DotContext combines a VClock with a "cloud" of non-contiguous dots.
4
5use super::dot::Dot;
6use super::vclock::VClock;
7use super::super::replica::ReplicaId;
8use serde::{Deserialize, Serialize};
9use std::collections::HashSet;
10
11/// Tracks which events have been seen in the system.
12///
13/// The DotContext maintains a VClock for contiguous sequences and a "cloud"
14/// set for dots that arrive out of order. When gaps are filled, dots are
15/// compacted from the cloud into the clock.
16#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
17pub struct DotContext {
18    /// Contiguous event counters per replica
19    clock: VClock,
20    /// Non-contiguous dots (arrived out of order)
21    cloud: HashSet<Dot>,
22}
23
24impl DotContext {
25    /// Create a new empty context.
26    pub fn new() -> Self {
27        Self::default()
28    }
29
30    /// Generate the next dot for a replica and mark it as seen.
31    pub fn next(&mut self, replica: ReplicaId) -> Dot {
32        let counter = self.clock.increment(replica);
33        Dot::new(replica, counter)
34    }
35
36    /// Check if a dot has been seen (either in clock or cloud).
37    pub fn has_seen(&self, dot: &Dot) -> bool {
38        self.clock.get(dot.replica) >= dot.counter || self.cloud.contains(dot)
39    }
40
41    /// Add a dot to the context.
42    ///
43    /// If the dot is contiguous with the clock, increment the clock.
44    /// Otherwise, add it to the cloud. Then attempt to compact.
45    pub fn add(&mut self, dot: Dot) {
46        if dot.counter == self.clock.get(dot.replica) + 1 {
47            // Contiguous - add to clock
48            self.clock.increment(dot.replica);
49            // Compact: pull any waiting dots from cloud
50            self.compact_replica(dot.replica);
51        } else if dot.counter > self.clock.get(dot.replica) {
52            // Out of order - add to cloud
53            self.cloud.insert(dot);
54        }
55        // If dot.counter <= clock.get(dot.replica), we've already seen it
56    }
57
58    /// Compact the cloud for a specific replica.
59    fn compact_replica(&mut self, replica: ReplicaId) {
60        loop {
61            let next_counter = self.clock.get(replica) + 1;
62            let next_dot = Dot::new(replica, next_counter);
63            if self.cloud.remove(&next_dot) {
64                self.clock.increment(replica);
65            } else {
66                break;
67            }
68        }
69    }
70
71    /// Compact all replicas in the cloud.
72    fn compact(&mut self) {
73        let replicas: Vec<ReplicaId> = self.cloud.iter().map(|d| d.replica).collect();
74        for replica in replicas {
75            self.compact_replica(replica);
76        }
77    }
78
79    /// Merge another context into this one.
80    pub fn merge(&mut self, other: &Self) {
81        self.clock.merge_vclock(&other.clock);
82        for &dot in &other.cloud {
83            if !self.has_seen(&dot) {
84                self.cloud.insert(dot);
85            }
86        }
87        self.compact();
88    }
89
90    /// Get the underlying vector clock.
91    pub fn clock(&self) -> &VClock {
92        &self.clock
93    }
94
95    /// Get the version as a VClock clone (for DeltaCrdt).
96    pub fn version(&self) -> VClock {
97        self.clock.clone()
98    }
99}
100
101#[cfg(test)]
102mod tests {
103    use super::*;
104
105    #[test]
106    fn test_context_new() {
107        let ctx = DotContext::new();
108        assert!(!ctx.has_seen(&Dot::new(1, 1)));
109    }
110
111    #[test]
112    fn test_context_next() {
113        let mut ctx = DotContext::new();
114        let d1 = ctx.next(42);
115        let d2 = ctx.next(42);
116        assert_eq!(d1.counter, 1);
117        assert_eq!(d2.counter, 2);
118        assert!(ctx.has_seen(&d1));
119        assert!(ctx.has_seen(&d2));
120    }
121
122    #[test]
123    fn test_context_add_contiguous() {
124        let mut ctx = DotContext::new();
125        ctx.add(Dot::new(1, 1));
126        ctx.add(Dot::new(1, 2));
127        ctx.add(Dot::new(1, 3));
128
129        assert!(ctx.has_seen(&Dot::new(1, 1)));
130        assert!(ctx.has_seen(&Dot::new(1, 2)));
131        assert!(ctx.has_seen(&Dot::new(1, 3)));
132        assert!(!ctx.has_seen(&Dot::new(1, 4)));
133    }
134
135    #[test]
136    fn test_context_add_out_of_order() {
137        let mut ctx = DotContext::new();
138        ctx.add(Dot::new(1, 3)); // Into cloud
139        ctx.add(Dot::new(1, 1)); // Into clock
140        ctx.add(Dot::new(1, 2)); // Should trigger compaction
141
142        assert!(ctx.has_seen(&Dot::new(1, 1)));
143        assert!(ctx.has_seen(&Dot::new(1, 2)));
144        assert!(ctx.has_seen(&Dot::new(1, 3)));
145        // Cloud should be empty after compaction
146        assert!(ctx.cloud.is_empty());
147    }
148
149    #[test]
150    fn test_context_merge() {
151        let mut a = DotContext::new();
152        let mut b = DotContext::new();
153
154        a.next(1);
155        a.next(1);
156        b.next(2);
157
158        a.merge(&b);
159
160        assert!(a.has_seen(&Dot::new(1, 1)));
161        assert!(a.has_seen(&Dot::new(1, 2)));
162        assert!(a.has_seen(&Dot::new(2, 1)));
163    }
164}