Skip to main content

allsource_core/infrastructure/cluster/
crdt.rs

1/// CRDT-based Conflict Resolution for Geo-Replicated Events.
2///
3/// When events are ingested concurrently at different regions, conflicts are
4/// resolved deterministically without coordination using CRDTs (Conflict-free
5/// Replicated Data Types).
6///
7/// # Strategy
8///
9/// Events are immutable append-only facts — they use a **G-Set** (grow-only set)
10/// CRDT by nature. The conflict resolution challenge is in **ordering**: when two
11/// regions each write events at the same logical time, we need a deterministic
12/// total order for consistency.
13///
14/// ## Resolution Rules (in priority order)
15///
16/// 1. **HLC timestamp**: Higher physical time wins
17/// 2. **Logical counter**: If physical times are equal, higher logical counter wins
18/// 3. **Node ID**: If both are equal, lower node ID wins (deterministic tiebreak)
19/// 4. **Event ID (UUID)**: Final tiebreak on lexicographic UUID comparison
20///
21/// This produces a total order that all regions converge to identically,
22/// without any coordination.
23///
24/// # Version Vectors
25///
26/// Each region maintains a version vector tracking the latest HLC timestamp
27/// seen from every other region. During replication:
28/// - Incoming events are accepted if their HLC > the vector entry for their origin
29/// - The vector is updated on receipt
30/// - This prevents duplicate delivery and enables efficient delta sync
31use super::hlc::HlcTimestamp;
32use dashmap::DashMap;
33use serde::{Deserialize, Serialize};
34use std::collections::BTreeMap;
35
36/// Version vector tracking causal progress per region.
37///
38/// Maps region_id → latest HLC timestamp seen from that region.
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct VersionVector {
41    /// Region ID → latest HLC timestamp from that region.
42    entries: BTreeMap<String, HlcTimestamp>,
43}
44
45impl VersionVector {
46    /// Create an empty version vector.
47    pub fn new() -> Self {
48        Self {
49            entries: BTreeMap::new(),
50        }
51    }
52
53    /// Update the vector entry for a region if the given timestamp is newer.
54    /// Returns true if the vector was actually updated (i.e., the event is new).
55    pub fn advance(&mut self, region_id: &str, ts: HlcTimestamp) -> bool {
56        match self.entries.get(region_id) {
57            Some(existing) if ts <= *existing => false,
58            _ => {
59                self.entries.insert(region_id.to_string(), ts);
60                true
61            }
62        }
63    }
64
65    /// Check if a timestamp from a region is new (not yet seen).
66    pub fn is_new(&self, region_id: &str, ts: &HlcTimestamp) -> bool {
67        match self.entries.get(region_id) {
68            Some(existing) => ts > existing,
69            None => true,
70        }
71    }
72
73    /// Get the latest timestamp for a region.
74    pub fn get(&self, region_id: &str) -> Option<&HlcTimestamp> {
75        self.entries.get(region_id)
76    }
77
78    /// Merge another version vector into this one (pointwise max).
79    pub fn merge(&mut self, other: &VersionVector) {
80        for (region, ts) in &other.entries {
81            self.advance(region, *ts);
82        }
83    }
84
85    /// Get all entries.
86    pub fn entries(&self) -> &BTreeMap<String, HlcTimestamp> {
87        &self.entries
88    }
89
90    /// Number of regions tracked.
91    pub fn len(&self) -> usize {
92        self.entries.len()
93    }
94
95    /// Check if empty.
96    pub fn is_empty(&self) -> bool {
97        self.entries.is_empty()
98    }
99}
100
101impl Default for VersionVector {
102    fn default() -> Self {
103        Self::new()
104    }
105}
106
107/// A replicated event carrying CRDT metadata for conflict resolution.
108#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct ReplicatedEvent {
110    /// The event ID (UUID as string).
111    pub event_id: String,
112    /// HLC timestamp assigned at the origin region.
113    pub hlc_timestamp: HlcTimestamp,
114    /// Region ID where the event was originally ingested.
115    pub origin_region: String,
116    /// Serialized event data (the full Event struct as JSON).
117    pub event_data: serde_json::Value,
118}
119
120/// Conflict resolution outcome.
121#[derive(Debug, Clone, PartialEq, Eq)]
122pub enum ConflictResolution {
123    /// Accept: the incoming event should be applied.
124    Accept,
125    /// Skip: the incoming event is a duplicate or superseded.
126    Skip,
127}
128
129/// CRDT-based conflict resolver for geo-replicated events.
130///
131/// Thread-safe: uses DashMap internally for the version vector.
132pub struct CrdtResolver {
133    /// Per-region version vectors.
134    version_vectors: DashMap<String, VersionVector>,
135    /// Set of event IDs already seen (deduplication).
136    seen_events: DashMap<String, ()>,
137}
138
139impl CrdtResolver {
140    /// Create a new CRDT resolver.
141    pub fn new() -> Self {
142        Self {
143            version_vectors: DashMap::new(),
144            seen_events: DashMap::new(),
145        }
146    }
147
148    /// Resolve whether an incoming replicated event should be accepted.
149    ///
150    /// Returns `Accept` if the event is new, `Skip` if it's a duplicate.
151    pub fn resolve(&self, event: &ReplicatedEvent) -> ConflictResolution {
152        // Dedup by event ID
153        if self.seen_events.contains_key(&event.event_id) {
154            return ConflictResolution::Skip;
155        }
156
157        // Check version vector for this region
158        let is_new = self
159            .version_vectors
160            .get(&event.origin_region)
161            .map(|vv| vv.is_new(&event.origin_region, &event.hlc_timestamp))
162            .unwrap_or(true);
163
164        if !is_new {
165            return ConflictResolution::Skip;
166        }
167
168        ConflictResolution::Accept
169    }
170
171    /// Mark an event as accepted: update version vector and dedup set.
172    pub fn accept(&self, event: &ReplicatedEvent) {
173        self.seen_events.insert(event.event_id.clone(), ());
174
175        let mut vv = self
176            .version_vectors
177            .entry(event.origin_region.clone())
178            .or_default();
179        vv.advance(&event.origin_region, event.hlc_timestamp);
180    }
181
182    /// Resolve and accept in one step. Returns the resolution.
183    pub fn resolve_and_accept(&self, event: &ReplicatedEvent) -> ConflictResolution {
184        let resolution = self.resolve(event);
185        if resolution == ConflictResolution::Accept {
186            self.accept(event);
187        }
188        resolution
189    }
190
191    /// Get the version vector for a specific region.
192    pub fn version_vector_for(&self, region_id: &str) -> Option<VersionVector> {
193        self.version_vectors.get(region_id).map(|vv| vv.clone())
194    }
195
196    /// Get all version vectors (for status/debug).
197    pub fn all_version_vectors(&self) -> BTreeMap<String, VersionVector> {
198        self.version_vectors
199            .iter()
200            .map(|entry| (entry.key().clone(), entry.value().clone()))
201            .collect()
202    }
203
204    /// Merge a remote version vector into our local state.
205    pub fn merge_version_vector(&self, region_id: &str, remote_vv: &VersionVector) {
206        let mut vv = self
207            .version_vectors
208            .entry(region_id.to_string())
209            .or_default();
210        vv.merge(remote_vv);
211    }
212
213    /// Number of unique events seen.
214    pub fn seen_count(&self) -> usize {
215        self.seen_events.len()
216    }
217}
218
219impl Default for CrdtResolver {
220    fn default() -> Self {
221        Self::new()
222    }
223}
224
225/// Sort replicated events into deterministic total order.
226///
227/// This produces the same ordering on every node, regardless of arrival order.
228pub fn deterministic_order(events: &mut [ReplicatedEvent]) {
229    events.sort_by(|a, b| {
230        a.hlc_timestamp
231            .cmp(&b.hlc_timestamp)
232            .then_with(|| a.event_id.cmp(&b.event_id))
233    });
234}
235
236#[cfg(test)]
237mod tests {
238    use super::*;
239
240    fn make_event(
241        id: &str,
242        region: &str,
243        physical_ms: u64,
244        logical: u32,
245        node_id: u32,
246    ) -> ReplicatedEvent {
247        ReplicatedEvent {
248            event_id: id.to_string(),
249            hlc_timestamp: HlcTimestamp::new(physical_ms, logical, node_id),
250            origin_region: region.to_string(),
251            event_data: serde_json::json!({"type": "test"}),
252        }
253    }
254
255    #[test]
256    fn test_version_vector_advance() {
257        let mut vv = VersionVector::new();
258        let ts1 = HlcTimestamp::new(100, 0, 1);
259        let ts2 = HlcTimestamp::new(200, 0, 1);
260        let ts_old = HlcTimestamp::new(50, 0, 1);
261
262        assert!(vv.advance("us-east", ts1));
263        assert!(vv.advance("us-east", ts2)); // newer
264        assert!(!vv.advance("us-east", ts_old)); // older, rejected
265
266        assert_eq!(vv.get("us-east").unwrap().physical_ms, 200);
267    }
268
269    #[test]
270    fn test_version_vector_is_new() {
271        let mut vv = VersionVector::new();
272        let ts = HlcTimestamp::new(100, 0, 1);
273        vv.advance("us-east", ts);
274
275        assert!(!vv.is_new("us-east", &HlcTimestamp::new(50, 0, 1)));
276        assert!(!vv.is_new("us-east", &HlcTimestamp::new(100, 0, 1)));
277        assert!(vv.is_new("us-east", &HlcTimestamp::new(101, 0, 1)));
278        assert!(vv.is_new("eu-west", &HlcTimestamp::new(1, 0, 1))); // unknown region
279    }
280
281    #[test]
282    fn test_version_vector_merge() {
283        let mut vv1 = VersionVector::new();
284        vv1.advance("us-east", HlcTimestamp::new(100, 0, 1));
285        vv1.advance("eu-west", HlcTimestamp::new(50, 0, 2));
286
287        let mut vv2 = VersionVector::new();
288        vv2.advance("us-east", HlcTimestamp::new(80, 0, 1));
289        vv2.advance("eu-west", HlcTimestamp::new(120, 0, 2));
290        vv2.advance("ap-east", HlcTimestamp::new(90, 0, 3));
291
292        vv1.merge(&vv2);
293
294        assert_eq!(vv1.get("us-east").unwrap().physical_ms, 100); // kept higher
295        assert_eq!(vv1.get("eu-west").unwrap().physical_ms, 120); // took higher
296        assert_eq!(vv1.get("ap-east").unwrap().physical_ms, 90); // new entry
297        assert_eq!(vv1.len(), 3);
298    }
299
300    #[test]
301    fn test_crdt_resolver_accept_new_event() {
302        let resolver = CrdtResolver::new();
303        let event = make_event("evt-1", "us-east", 100, 0, 1);
304
305        assert_eq!(resolver.resolve(&event), ConflictResolution::Accept);
306        resolver.accept(&event);
307        assert_eq!(resolver.seen_count(), 1);
308    }
309
310    #[test]
311    fn test_crdt_resolver_skip_duplicate() {
312        let resolver = CrdtResolver::new();
313        let event = make_event("evt-1", "us-east", 100, 0, 1);
314
315        assert_eq!(
316            resolver.resolve_and_accept(&event),
317            ConflictResolution::Accept
318        );
319        assert_eq!(resolver.resolve(&event), ConflictResolution::Skip);
320    }
321
322    #[test]
323    fn test_crdt_resolver_skip_old_version() {
324        let resolver = CrdtResolver::new();
325        let new_event = make_event("evt-2", "us-east", 200, 0, 1);
326        let old_event = make_event("evt-1", "us-east", 100, 0, 1);
327
328        resolver.resolve_and_accept(&new_event);
329        // old_event has lower HLC than what we've seen from us-east
330        assert_eq!(resolver.resolve(&old_event), ConflictResolution::Skip);
331    }
332
333    #[test]
334    fn test_crdt_resolver_different_regions_independent() {
335        let resolver = CrdtResolver::new();
336        let us_event = make_event("evt-1", "us-east", 100, 0, 1);
337        let eu_event = make_event("evt-2", "eu-west", 50, 0, 2);
338
339        resolver.resolve_and_accept(&us_event);
340        // eu-west at lower timestamp is still accepted (different region)
341        assert_eq!(resolver.resolve(&eu_event), ConflictResolution::Accept);
342    }
343
344    #[test]
345    fn test_deterministic_order() {
346        let mut events = vec![
347            make_event("evt-3", "ap-east", 100, 0, 3),
348            make_event("evt-1", "us-east", 100, 0, 1),
349            make_event("evt-2", "eu-west", 100, 0, 2),
350        ];
351
352        deterministic_order(&mut events);
353
354        // Same physical time + logical: ordered by node_id, then event_id
355        assert_eq!(events[0].event_id, "evt-1"); // node 1
356        assert_eq!(events[1].event_id, "evt-2"); // node 2
357        assert_eq!(events[2].event_id, "evt-3"); // node 3
358    }
359
360    #[test]
361    fn test_deterministic_order_by_hlc() {
362        let mut events = vec![
363            make_event("evt-1", "us-east", 300, 0, 1),
364            make_event("evt-2", "eu-west", 100, 0, 2),
365            make_event("evt-3", "ap-east", 200, 0, 3),
366        ];
367
368        deterministic_order(&mut events);
369
370        assert_eq!(events[0].event_id, "evt-2"); // 100ms
371        assert_eq!(events[1].event_id, "evt-3"); // 200ms
372        assert_eq!(events[2].event_id, "evt-1"); // 300ms
373    }
374
375    #[test]
376    fn test_replicated_event_serialization() {
377        let event = make_event("evt-1", "us-east", 1000, 5, 1);
378        let json = serde_json::to_string(&event).unwrap();
379        let parsed: ReplicatedEvent = serde_json::from_str(&json).unwrap();
380        assert_eq!(parsed.event_id, "evt-1");
381        assert_eq!(parsed.origin_region, "us-east");
382        assert_eq!(parsed.hlc_timestamp.physical_ms, 1000);
383    }
384}