Skip to main content

allsource_core/infrastructure/cluster/
crdt.rs

1/// CRDT-based Conflict Resolution for Geo-Replicated Events.
2///
3/// When events are ingested concurrently at different regions, conflicts are
4/// resolved deterministically without coordination using CRDTs (Conflict-free
5/// Replicated Data Types).
6///
7/// # Strategy
8///
9/// Events are immutable append-only facts — they use a **G-Set** (grow-only set)
10/// CRDT by nature. The conflict resolution challenge is in **ordering**: when two
11/// regions each write events at the same logical time, we need a deterministic
12/// total order for consistency.
13///
14/// ## Resolution Rules (in priority order)
15///
16/// 1. **HLC timestamp**: Higher physical time wins
17/// 2. **Logical counter**: If physical times are equal, higher logical counter wins
18/// 3. **Node ID**: If both are equal, lower node ID wins (deterministic tiebreak)
19/// 4. **Event ID (UUID)**: Final tiebreak on lexicographic UUID comparison
20///
21/// This produces a total order that all regions converge to identically,
22/// without any coordination.
23///
24/// # Version Vectors
25///
26/// Each region maintains a version vector tracking the latest HLC timestamp
27/// seen from every other region. During replication:
28/// - Incoming events are accepted if their HLC > the vector entry for their origin
29/// - The vector is updated on receipt
30/// - This prevents duplicate delivery and enables efficient delta sync
31use super::hlc::HlcTimestamp;
32use dashmap::DashMap;
33use serde::{Deserialize, Serialize};
34use std::collections::BTreeMap;
35
36/// Version vector tracking causal progress per region.
37///
38/// Maps region_id → latest HLC timestamp seen from that region.
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct VersionVector {
41    /// Region ID → latest HLC timestamp from that region.
42    entries: BTreeMap<String, HlcTimestamp>,
43}
44
45impl VersionVector {
46    /// Create an empty version vector.
47    pub fn new() -> Self {
48        Self {
49            entries: BTreeMap::new(),
50        }
51    }
52
53    /// Update the vector entry for a region if the given timestamp is newer.
54    /// Returns true if the vector was actually updated (i.e., the event is new).
55    pub fn advance(&mut self, region_id: &str, ts: HlcTimestamp) -> bool {
56        match self.entries.get(region_id) {
57            Some(existing) if ts <= *existing => false,
58            _ => {
59                self.entries.insert(region_id.to_string(), ts);
60                true
61            }
62        }
63    }
64
65    /// Check if a timestamp from a region is new (not yet seen).
66    pub fn is_new(&self, region_id: &str, ts: &HlcTimestamp) -> bool {
67        match self.entries.get(region_id) {
68            Some(existing) => ts > existing,
69            None => true,
70        }
71    }
72
73    /// Get the latest timestamp for a region.
74    pub fn get(&self, region_id: &str) -> Option<&HlcTimestamp> {
75        self.entries.get(region_id)
76    }
77
78    /// Merge another version vector into this one (pointwise max).
79    pub fn merge(&mut self, other: &VersionVector) {
80        for (region, ts) in &other.entries {
81            self.advance(region, *ts);
82        }
83    }
84
85    /// Get all entries.
86    pub fn entries(&self) -> &BTreeMap<String, HlcTimestamp> {
87        &self.entries
88    }
89
90    /// Number of regions tracked.
91    pub fn len(&self) -> usize {
92        self.entries.len()
93    }
94
95    /// Check if empty.
96    pub fn is_empty(&self) -> bool {
97        self.entries.is_empty()
98    }
99}
100
101impl Default for VersionVector {
102    fn default() -> Self {
103        Self::new()
104    }
105}
106
107/// A replicated event carrying CRDT metadata for conflict resolution.
108#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct ReplicatedEvent {
110    /// The event ID (UUID as string).
111    pub event_id: String,
112    /// HLC timestamp assigned at the origin region.
113    pub hlc_timestamp: HlcTimestamp,
114    /// Region ID where the event was originally ingested.
115    pub origin_region: String,
116    /// Serialized event data (the full Event struct as JSON).
117    pub event_data: serde_json::Value,
118}
119
120/// Conflict resolution outcome.
121#[derive(Debug, Clone, PartialEq, Eq)]
122pub enum ConflictResolution {
123    /// Accept: the incoming event should be applied.
124    Accept,
125    /// Skip: the incoming event is a duplicate or superseded.
126    Skip,
127}
128
129/// Per-entity-type merge strategy for conflict resolution.
130///
131/// Controls how the CRDT resolver handles events for specific event types
132/// (or prefixes). The default behavior is `AppendOnly`.
133#[derive(Debug, Clone, PartialEq, Eq)]
134pub enum MergeStrategy {
135    /// Accept all events (default, current G-Set behavior).
136    AppendOnly,
137    /// Newer HLC timestamp wins for same entity+type. Earlier events are skipped.
138    LastWriteWins,
139    /// First event for entity+type wins. Subsequent events are rejected.
140    FirstWriteWins,
141}
142
143/// CRDT-based conflict resolver for geo-replicated events.
144///
145/// Thread-safe: uses DashMap internally for the version vector.
146///
147/// Supports configurable per-entity-type merge strategies via
148/// [`with_strategies`](Self::with_strategies). Strategy lookup uses prefix
149/// matching: a strategy registered for `"config."` applies to
150/// `"config.updated"`, `"config.deleted"`, etc. The most specific
151/// (longest) prefix match wins. Unmatched types fall back to `AppendOnly`.
152pub struct CrdtResolver {
153    /// Per-region version vectors.
154    version_vectors: DashMap<String, VersionVector>,
155    /// Set of event IDs already seen (deduplication).
156    seen_events: DashMap<String, ()>,
157    /// Per-event-type merge strategies (prefix → strategy).
158    /// Sorted by prefix length descending for longest-match-first lookup.
159    strategies: Vec<(String, MergeStrategy)>,
160    /// entity+type → winning HLC timestamp, used for LWW and FWW tracking.
161    entity_type_winners: DashMap<String, HlcTimestamp>,
162}
163
164impl CrdtResolver {
165    /// Create a new CRDT resolver with default AppendOnly behavior.
166    pub fn new() -> Self {
167        Self {
168            version_vectors: DashMap::new(),
169            seen_events: DashMap::new(),
170            strategies: Vec::new(),
171            entity_type_winners: DashMap::new(),
172        }
173    }
174
175    /// Create a CRDT resolver with per-event-type merge strategies.
176    ///
177    /// Each entry is `(prefix, strategy)`. Prefix matching is used:
178    /// `"config."` matches `"config.updated"`. The longest matching
179    /// prefix wins. Unmatched types use `AppendOnly`.
180    pub fn with_strategies(strategies: Vec<(String, MergeStrategy)>) -> Self {
181        let mut sorted = strategies;
182        // Sort by prefix length descending for longest-match-first
183        sorted.sort_by(|a, b| b.0.len().cmp(&a.0.len()));
184        Self {
185            version_vectors: DashMap::new(),
186            seen_events: DashMap::new(),
187            strategies: sorted,
188            entity_type_winners: DashMap::new(),
189        }
190    }
191
192    /// Look up the merge strategy for an event type (longest prefix match).
193    fn strategy_for(&self, event_type: &str) -> &MergeStrategy {
194        for (prefix, strategy) in &self.strategies {
195            if event_type.starts_with(prefix.as_str()) {
196                return strategy;
197            }
198        }
199        // Static reference to default — avoids allocation
200        &MergeStrategy::AppendOnly
201    }
202
203    /// Resolve whether an incoming replicated event should be accepted.
204    ///
205    /// Returns `Accept` if the event is new, `Skip` if it's a duplicate.
206    /// Consults the per-event-type merge strategy when one is registered.
207    pub fn resolve(&self, event: &ReplicatedEvent) -> ConflictResolution {
208        // Dedup by event ID
209        if self.seen_events.contains_key(&event.event_id) {
210            return ConflictResolution::Skip;
211        }
212
213        // Check version vector for this region
214        let is_new = self
215            .version_vectors
216            .get(&event.origin_region)
217            .map(|vv| vv.is_new(&event.origin_region, &event.hlc_timestamp))
218            .unwrap_or(true);
219
220        if !is_new {
221            return ConflictResolution::Skip;
222        }
223
224        // Extract event_type and entity_id from event_data for strategy lookup
225        let event_type = event
226            .event_data
227            .get("event_type")
228            .and_then(|v| v.as_str())
229            .unwrap_or("");
230        let entity_id = event
231            .event_data
232            .get("entity_id")
233            .and_then(|v| v.as_str())
234            .unwrap_or("");
235
236        match self.strategy_for(event_type) {
237            MergeStrategy::AppendOnly => ConflictResolution::Accept,
238            MergeStrategy::LastWriteWins => {
239                let key = format!("{}\x00{}", entity_id, event_type);
240                match self.entity_type_winners.get(&key) {
241                    Some(existing) if event.hlc_timestamp <= *existing => {
242                        ConflictResolution::Skip
243                    }
244                    _ => ConflictResolution::Accept,
245                }
246            }
247            MergeStrategy::FirstWriteWins => {
248                let key = format!("{}\x00{}", entity_id, event_type);
249                if self.entity_type_winners.contains_key(&key) {
250                    ConflictResolution::Skip
251                } else {
252                    ConflictResolution::Accept
253                }
254            }
255        }
256    }
257
258    /// Mark an event as accepted: update version vector, dedup set, and strategy state.
259    pub fn accept(&self, event: &ReplicatedEvent) {
260        self.seen_events.insert(event.event_id.clone(), ());
261
262        let mut vv = self
263            .version_vectors
264            .entry(event.origin_region.clone())
265            .or_default();
266        vv.advance(&event.origin_region, event.hlc_timestamp);
267
268        // Track entity+type winner for LWW/FWW strategies
269        let event_type = event
270            .event_data
271            .get("event_type")
272            .and_then(|v| v.as_str())
273            .unwrap_or("");
274        let entity_id = event
275            .event_data
276            .get("entity_id")
277            .and_then(|v| v.as_str())
278            .unwrap_or("");
279        if !event_type.is_empty() && !entity_id.is_empty() {
280            let key = format!("{}\x00{}", entity_id, event_type);
281            self.entity_type_winners
282                .entry(key)
283                .and_modify(|existing| {
284                    if event.hlc_timestamp > *existing {
285                        *existing = event.hlc_timestamp;
286                    }
287                })
288                .or_insert(event.hlc_timestamp);
289        }
290    }
291
292    /// Resolve and accept in one step. Returns the resolution.
293    pub fn resolve_and_accept(&self, event: &ReplicatedEvent) -> ConflictResolution {
294        let resolution = self.resolve(event);
295        if resolution == ConflictResolution::Accept {
296            self.accept(event);
297        }
298        resolution
299    }
300
301    /// Get the version vector for a specific region.
302    pub fn version_vector_for(&self, region_id: &str) -> Option<VersionVector> {
303        self.version_vectors.get(region_id).map(|vv| vv.clone())
304    }
305
306    /// Get all version vectors (for status/debug).
307    pub fn all_version_vectors(&self) -> BTreeMap<String, VersionVector> {
308        self.version_vectors
309            .iter()
310            .map(|entry| (entry.key().clone(), entry.value().clone()))
311            .collect()
312    }
313
314    /// Merge a remote version vector into our local state.
315    pub fn merge_version_vector(&self, region_id: &str, remote_vv: &VersionVector) {
316        let mut vv = self
317            .version_vectors
318            .entry(region_id.to_string())
319            .or_default();
320        vv.merge(remote_vv);
321    }
322
323    /// Number of unique events seen.
324    pub fn seen_count(&self) -> usize {
325        self.seen_events.len()
326    }
327}
328
329impl Default for CrdtResolver {
330    fn default() -> Self {
331        Self::new()
332    }
333}
334
335/// Sort replicated events into deterministic total order.
336///
337/// This produces the same ordering on every node, regardless of arrival order.
338pub fn deterministic_order(events: &mut [ReplicatedEvent]) {
339    events.sort_by(|a, b| {
340        a.hlc_timestamp
341            .cmp(&b.hlc_timestamp)
342            .then_with(|| a.event_id.cmp(&b.event_id))
343    });
344}
345
346#[cfg(test)]
347mod tests {
348    use super::*;
349
350    fn make_event(
351        id: &str,
352        region: &str,
353        physical_ms: u64,
354        logical: u32,
355        node_id: u32,
356    ) -> ReplicatedEvent {
357        ReplicatedEvent {
358            event_id: id.to_string(),
359            hlc_timestamp: HlcTimestamp::new(physical_ms, logical, node_id),
360            origin_region: region.to_string(),
361            event_data: serde_json::json!({"type": "test"}),
362        }
363    }
364
365    #[test]
366    fn test_version_vector_advance() {
367        let mut vv = VersionVector::new();
368        let ts1 = HlcTimestamp::new(100, 0, 1);
369        let ts2 = HlcTimestamp::new(200, 0, 1);
370        let ts_old = HlcTimestamp::new(50, 0, 1);
371
372        assert!(vv.advance("us-east", ts1));
373        assert!(vv.advance("us-east", ts2)); // newer
374        assert!(!vv.advance("us-east", ts_old)); // older, rejected
375
376        assert_eq!(vv.get("us-east").unwrap().physical_ms, 200);
377    }
378
379    #[test]
380    fn test_version_vector_is_new() {
381        let mut vv = VersionVector::new();
382        let ts = HlcTimestamp::new(100, 0, 1);
383        vv.advance("us-east", ts);
384
385        assert!(!vv.is_new("us-east", &HlcTimestamp::new(50, 0, 1)));
386        assert!(!vv.is_new("us-east", &HlcTimestamp::new(100, 0, 1)));
387        assert!(vv.is_new("us-east", &HlcTimestamp::new(101, 0, 1)));
388        assert!(vv.is_new("eu-west", &HlcTimestamp::new(1, 0, 1))); // unknown region
389    }
390
391    #[test]
392    fn test_version_vector_merge() {
393        let mut vv1 = VersionVector::new();
394        vv1.advance("us-east", HlcTimestamp::new(100, 0, 1));
395        vv1.advance("eu-west", HlcTimestamp::new(50, 0, 2));
396
397        let mut vv2 = VersionVector::new();
398        vv2.advance("us-east", HlcTimestamp::new(80, 0, 1));
399        vv2.advance("eu-west", HlcTimestamp::new(120, 0, 2));
400        vv2.advance("ap-east", HlcTimestamp::new(90, 0, 3));
401
402        vv1.merge(&vv2);
403
404        assert_eq!(vv1.get("us-east").unwrap().physical_ms, 100); // kept higher
405        assert_eq!(vv1.get("eu-west").unwrap().physical_ms, 120); // took higher
406        assert_eq!(vv1.get("ap-east").unwrap().physical_ms, 90); // new entry
407        assert_eq!(vv1.len(), 3);
408    }
409
410    #[test]
411    fn test_crdt_resolver_accept_new_event() {
412        let resolver = CrdtResolver::new();
413        let event = make_event("evt-1", "us-east", 100, 0, 1);
414
415        assert_eq!(resolver.resolve(&event), ConflictResolution::Accept);
416        resolver.accept(&event);
417        assert_eq!(resolver.seen_count(), 1);
418    }
419
420    #[test]
421    fn test_crdt_resolver_skip_duplicate() {
422        let resolver = CrdtResolver::new();
423        let event = make_event("evt-1", "us-east", 100, 0, 1);
424
425        assert_eq!(
426            resolver.resolve_and_accept(&event),
427            ConflictResolution::Accept
428        );
429        assert_eq!(resolver.resolve(&event), ConflictResolution::Skip);
430    }
431
432    #[test]
433    fn test_crdt_resolver_skip_old_version() {
434        let resolver = CrdtResolver::new();
435        let new_event = make_event("evt-2", "us-east", 200, 0, 1);
436        let old_event = make_event("evt-1", "us-east", 100, 0, 1);
437
438        resolver.resolve_and_accept(&new_event);
439        // old_event has lower HLC than what we've seen from us-east
440        assert_eq!(resolver.resolve(&old_event), ConflictResolution::Skip);
441    }
442
443    #[test]
444    fn test_crdt_resolver_different_regions_independent() {
445        let resolver = CrdtResolver::new();
446        let us_event = make_event("evt-1", "us-east", 100, 0, 1);
447        let eu_event = make_event("evt-2", "eu-west", 50, 0, 2);
448
449        resolver.resolve_and_accept(&us_event);
450        // eu-west at lower timestamp is still accepted (different region)
451        assert_eq!(resolver.resolve(&eu_event), ConflictResolution::Accept);
452    }
453
454    #[test]
455    fn test_deterministic_order() {
456        let mut events = vec![
457            make_event("evt-3", "ap-east", 100, 0, 3),
458            make_event("evt-1", "us-east", 100, 0, 1),
459            make_event("evt-2", "eu-west", 100, 0, 2),
460        ];
461
462        deterministic_order(&mut events);
463
464        // Same physical time + logical: ordered by node_id, then event_id
465        assert_eq!(events[0].event_id, "evt-1"); // node 1
466        assert_eq!(events[1].event_id, "evt-2"); // node 2
467        assert_eq!(events[2].event_id, "evt-3"); // node 3
468    }
469
470    #[test]
471    fn test_deterministic_order_by_hlc() {
472        let mut events = vec![
473            make_event("evt-1", "us-east", 300, 0, 1),
474            make_event("evt-2", "eu-west", 100, 0, 2),
475            make_event("evt-3", "ap-east", 200, 0, 3),
476        ];
477
478        deterministic_order(&mut events);
479
480        assert_eq!(events[0].event_id, "evt-2"); // 100ms
481        assert_eq!(events[1].event_id, "evt-3"); // 200ms
482        assert_eq!(events[2].event_id, "evt-1"); // 300ms
483    }
484
485    #[test]
486    fn test_replicated_event_serialization() {
487        let event = make_event("evt-1", "us-east", 1000, 5, 1);
488        let json = serde_json::to_string(&event).unwrap();
489        let parsed: ReplicatedEvent = serde_json::from_str(&json).unwrap();
490        assert_eq!(parsed.event_id, "evt-1");
491        assert_eq!(parsed.origin_region, "us-east");
492        assert_eq!(parsed.hlc_timestamp.physical_ms, 1000);
493    }
494}