Skip to main content

allsource_core/infrastructure/cluster/
crdt.rs

1/// CRDT-based Conflict Resolution for Geo-Replicated Events.
2///
3/// When events are ingested concurrently at different regions, conflicts are
4/// resolved deterministically without coordination using CRDTs (Conflict-free
5/// Replicated Data Types).
6///
7/// # Strategy
8///
9/// Events are immutable append-only facts — they use a **G-Set** (grow-only set)
10/// CRDT by nature. The conflict resolution challenge is in **ordering**: when two
11/// regions each write events at the same logical time, we need a deterministic
12/// total order for consistency.
13///
14/// ## Resolution Rules (in priority order)
15///
16/// 1. **HLC timestamp**: Higher physical time wins
17/// 2. **Logical counter**: If physical times are equal, higher logical counter wins
18/// 3. **Node ID**: If both are equal, lower node ID wins (deterministic tiebreak)
19/// 4. **Event ID (UUID)**: Final tiebreak on lexicographic UUID comparison
20///
21/// This produces a total order that all regions converge to identically,
22/// without any coordination.
23///
24/// # Version Vectors
25///
26/// Each region maintains a version vector tracking the latest HLC timestamp
27/// seen from every other region. During replication:
28/// - Incoming events are accepted if their HLC > the vector entry for their origin
29/// - The vector is updated on receipt
30/// - This prevents duplicate delivery and enables efficient delta sync
31use super::hlc::HlcTimestamp;
32use dashmap::DashMap;
33use serde::{Deserialize, Serialize};
34use std::collections::BTreeMap;
35
36/// Version vector tracking causal progress per region.
37///
38/// Maps region_id → latest HLC timestamp seen from that region.
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct VersionVector {
41    /// Region ID → latest HLC timestamp from that region.
42    entries: BTreeMap<String, HlcTimestamp>,
43}
44
45impl VersionVector {
46    /// Create an empty version vector.
47    pub fn new() -> Self {
48        Self {
49            entries: BTreeMap::new(),
50        }
51    }
52
53    /// Update the vector entry for a region if the given timestamp is newer.
54    /// Returns true if the vector was actually updated (i.e., the event is new).
55    pub fn advance(&mut self, region_id: &str, ts: HlcTimestamp) -> bool {
56        match self.entries.get(region_id) {
57            Some(existing) if ts <= *existing => false,
58            _ => {
59                self.entries.insert(region_id.to_string(), ts);
60                true
61            }
62        }
63    }
64
65    /// Check if a timestamp from a region is new (not yet seen).
66    pub fn is_new(&self, region_id: &str, ts: &HlcTimestamp) -> bool {
67        match self.entries.get(region_id) {
68            Some(existing) => ts > existing,
69            None => true,
70        }
71    }
72
73    /// Get the latest timestamp for a region.
74    pub fn get(&self, region_id: &str) -> Option<&HlcTimestamp> {
75        self.entries.get(region_id)
76    }
77
78    /// Merge another version vector into this one (pointwise max).
79    pub fn merge(&mut self, other: &VersionVector) {
80        for (region, ts) in &other.entries {
81            self.advance(region, *ts);
82        }
83    }
84
85    /// Get all entries.
86    pub fn entries(&self) -> &BTreeMap<String, HlcTimestamp> {
87        &self.entries
88    }
89
90    /// Number of regions tracked.
91    pub fn len(&self) -> usize {
92        self.entries.len()
93    }
94
95    /// Check if empty.
96    pub fn is_empty(&self) -> bool {
97        self.entries.is_empty()
98    }
99}
100
101impl Default for VersionVector {
102    fn default() -> Self {
103        Self::new()
104    }
105}
106
107/// A replicated event carrying CRDT metadata for conflict resolution.
108#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct ReplicatedEvent {
110    /// The event ID (UUID as string).
111    pub event_id: String,
112    /// HLC timestamp assigned at the origin region.
113    pub hlc_timestamp: HlcTimestamp,
114    /// Region ID where the event was originally ingested.
115    pub origin_region: String,
116    /// Serialized event data (the full Event struct as JSON).
117    pub event_data: serde_json::Value,
118}
119
120/// Conflict resolution outcome.
121#[derive(Debug, Clone, PartialEq, Eq)]
122pub enum ConflictResolution {
123    /// Accept: the incoming event should be applied.
124    Accept,
125    /// Skip: the incoming event is a duplicate or superseded.
126    Skip,
127}
128
129/// Per-entity-type merge strategy for conflict resolution.
130///
131/// Controls how the CRDT resolver handles events for specific event types
132/// (or prefixes). The default behavior is `AppendOnly`.
133#[derive(Debug, Clone, PartialEq, Eq)]
134pub enum MergeStrategy {
135    /// Accept all events (default, current G-Set behavior).
136    AppendOnly,
137    /// Newer HLC timestamp wins for same entity+type. Earlier events are skipped.
138    LastWriteWins,
139    /// First event for entity+type wins. Subsequent events are rejected.
140    FirstWriteWins,
141}
142
143/// CRDT-based conflict resolver for geo-replicated events.
144///
145/// Thread-safe: uses DashMap internally for the version vector.
146///
147/// Supports configurable per-entity-type merge strategies via
148/// [`with_strategies`](Self::with_strategies). Strategy lookup uses prefix
149/// matching: a strategy registered for `"config."` applies to
150/// `"config.updated"`, `"config.deleted"`, etc. The most specific
151/// (longest) prefix match wins. Unmatched types fall back to `AppendOnly`.
152pub struct CrdtResolver {
153    /// Per-region version vectors.
154    version_vectors: DashMap<String, VersionVector>,
155    /// Set of event IDs already seen (deduplication).
156    seen_events: DashMap<String, ()>,
157    /// Per-event-type merge strategies (prefix → strategy).
158    /// Sorted by prefix length descending for longest-match-first lookup.
159    strategies: Vec<(String, MergeStrategy)>,
160    /// entity+type → winning HLC timestamp, used for LWW and FWW tracking.
161    entity_type_winners: DashMap<String, HlcTimestamp>,
162}
163
164impl CrdtResolver {
165    /// Create a new CRDT resolver with default AppendOnly behavior.
166    pub fn new() -> Self {
167        Self {
168            version_vectors: DashMap::new(),
169            seen_events: DashMap::new(),
170            strategies: Vec::new(),
171            entity_type_winners: DashMap::new(),
172        }
173    }
174
175    /// Create a CRDT resolver with per-event-type merge strategies.
176    ///
177    /// Each entry is `(prefix, strategy)`. Prefix matching is used:
178    /// `"config."` matches `"config.updated"`. The longest matching
179    /// prefix wins. Unmatched types use `AppendOnly`.
180    pub fn with_strategies(strategies: Vec<(String, MergeStrategy)>) -> Self {
181        let mut sorted = strategies;
182        // Sort by prefix length descending for longest-match-first
183        sorted.sort_by(|a, b| b.0.len().cmp(&a.0.len()));
184        Self {
185            version_vectors: DashMap::new(),
186            seen_events: DashMap::new(),
187            strategies: sorted,
188            entity_type_winners: DashMap::new(),
189        }
190    }
191
192    /// Look up the merge strategy for an event type (longest prefix match).
193    fn strategy_for(&self, event_type: &str) -> &MergeStrategy {
194        for (prefix, strategy) in &self.strategies {
195            if event_type.starts_with(prefix.as_str()) {
196                return strategy;
197            }
198        }
199        // Static reference to default — avoids allocation
200        &MergeStrategy::AppendOnly
201    }
202
203    /// Resolve whether an incoming replicated event should be accepted.
204    ///
205    /// Returns `Accept` if the event is new, `Skip` if it's a duplicate.
206    /// Consults the per-event-type merge strategy when one is registered.
207    pub fn resolve(&self, event: &ReplicatedEvent) -> ConflictResolution {
208        // Dedup by event ID
209        if self.seen_events.contains_key(&event.event_id) {
210            return ConflictResolution::Skip;
211        }
212
213        // Check version vector for this region
214        let is_new = self
215            .version_vectors
216            .get(&event.origin_region)
217            .is_none_or(|vv| vv.is_new(&event.origin_region, &event.hlc_timestamp));
218
219        if !is_new {
220            return ConflictResolution::Skip;
221        }
222
223        // Extract event_type and entity_id from event_data for strategy lookup
224        let event_type = event
225            .event_data
226            .get("event_type")
227            .and_then(|v| v.as_str())
228            .unwrap_or("");
229        let entity_id = event
230            .event_data
231            .get("entity_id")
232            .and_then(|v| v.as_str())
233            .unwrap_or("");
234
235        match self.strategy_for(event_type) {
236            MergeStrategy::AppendOnly => ConflictResolution::Accept,
237            MergeStrategy::LastWriteWins => {
238                let key = format!("{entity_id}\x00{event_type}");
239                match self.entity_type_winners.get(&key) {
240                    Some(existing) if event.hlc_timestamp <= *existing => ConflictResolution::Skip,
241                    _ => ConflictResolution::Accept,
242                }
243            }
244            MergeStrategy::FirstWriteWins => {
245                let key = format!("{entity_id}\x00{event_type}");
246                if self.entity_type_winners.contains_key(&key) {
247                    ConflictResolution::Skip
248                } else {
249                    ConflictResolution::Accept
250                }
251            }
252        }
253    }
254
255    /// Mark an event as accepted: update version vector, dedup set, and strategy state.
256    pub fn accept(&self, event: &ReplicatedEvent) {
257        self.seen_events.insert(event.event_id.clone(), ());
258
259        let mut vv = self
260            .version_vectors
261            .entry(event.origin_region.clone())
262            .or_default();
263        vv.advance(&event.origin_region, event.hlc_timestamp);
264
265        // Track entity+type winner for LWW/FWW strategies
266        let event_type = event
267            .event_data
268            .get("event_type")
269            .and_then(|v| v.as_str())
270            .unwrap_or("");
271        let entity_id = event
272            .event_data
273            .get("entity_id")
274            .and_then(|v| v.as_str())
275            .unwrap_or("");
276        if !event_type.is_empty() && !entity_id.is_empty() {
277            let key = format!("{entity_id}\x00{event_type}");
278            self.entity_type_winners
279                .entry(key)
280                .and_modify(|existing| {
281                    if event.hlc_timestamp > *existing {
282                        *existing = event.hlc_timestamp;
283                    }
284                })
285                .or_insert(event.hlc_timestamp);
286        }
287    }
288
289    /// Resolve and accept in one step. Returns the resolution.
290    pub fn resolve_and_accept(&self, event: &ReplicatedEvent) -> ConflictResolution {
291        let resolution = self.resolve(event);
292        if resolution == ConflictResolution::Accept {
293            self.accept(event);
294        }
295        resolution
296    }
297
298    /// Get the version vector for a specific region.
299    pub fn version_vector_for(&self, region_id: &str) -> Option<VersionVector> {
300        self.version_vectors.get(region_id).map(|vv| vv.clone())
301    }
302
303    /// Get all version vectors (for status/debug).
304    pub fn all_version_vectors(&self) -> BTreeMap<String, VersionVector> {
305        self.version_vectors
306            .iter()
307            .map(|entry| (entry.key().clone(), entry.value().clone()))
308            .collect()
309    }
310
311    /// Merge a remote version vector into our local state.
312    pub fn merge_version_vector(&self, region_id: &str, remote_vv: &VersionVector) {
313        let mut vv = self
314            .version_vectors
315            .entry(region_id.to_string())
316            .or_default();
317        vv.merge(remote_vv);
318    }
319
320    /// Number of unique events seen.
321    pub fn seen_count(&self) -> usize {
322        self.seen_events.len()
323    }
324}
325
326impl Default for CrdtResolver {
327    fn default() -> Self {
328        Self::new()
329    }
330}
331
332/// Sort replicated events into deterministic total order.
333///
334/// This produces the same ordering on every node, regardless of arrival order.
335pub fn deterministic_order(events: &mut [ReplicatedEvent]) {
336    events.sort_by(|a, b| {
337        a.hlc_timestamp
338            .cmp(&b.hlc_timestamp)
339            .then_with(|| a.event_id.cmp(&b.event_id))
340    });
341}
342
343#[cfg(test)]
344mod tests {
345    use super::*;
346
347    fn make_event(
348        id: &str,
349        region: &str,
350        physical_ms: u64,
351        logical: u32,
352        node_id: u32,
353    ) -> ReplicatedEvent {
354        ReplicatedEvent {
355            event_id: id.to_string(),
356            hlc_timestamp: HlcTimestamp::new(physical_ms, logical, node_id),
357            origin_region: region.to_string(),
358            event_data: serde_json::json!({"type": "test"}),
359        }
360    }
361
362    #[test]
363    fn test_version_vector_advance() {
364        let mut vv = VersionVector::new();
365        let ts1 = HlcTimestamp::new(100, 0, 1);
366        let ts2 = HlcTimestamp::new(200, 0, 1);
367        let ts_old = HlcTimestamp::new(50, 0, 1);
368
369        assert!(vv.advance("us-east", ts1));
370        assert!(vv.advance("us-east", ts2)); // newer
371        assert!(!vv.advance("us-east", ts_old)); // older, rejected
372
373        assert_eq!(vv.get("us-east").unwrap().physical_ms, 200);
374    }
375
376    #[test]
377    fn test_version_vector_is_new() {
378        let mut vv = VersionVector::new();
379        let ts = HlcTimestamp::new(100, 0, 1);
380        vv.advance("us-east", ts);
381
382        assert!(!vv.is_new("us-east", &HlcTimestamp::new(50, 0, 1)));
383        assert!(!vv.is_new("us-east", &HlcTimestamp::new(100, 0, 1)));
384        assert!(vv.is_new("us-east", &HlcTimestamp::new(101, 0, 1)));
385        assert!(vv.is_new("eu-west", &HlcTimestamp::new(1, 0, 1))); // unknown region
386    }
387
388    #[test]
389    fn test_version_vector_merge() {
390        let mut vv1 = VersionVector::new();
391        vv1.advance("us-east", HlcTimestamp::new(100, 0, 1));
392        vv1.advance("eu-west", HlcTimestamp::new(50, 0, 2));
393
394        let mut vv2 = VersionVector::new();
395        vv2.advance("us-east", HlcTimestamp::new(80, 0, 1));
396        vv2.advance("eu-west", HlcTimestamp::new(120, 0, 2));
397        vv2.advance("ap-east", HlcTimestamp::new(90, 0, 3));
398
399        vv1.merge(&vv2);
400
401        assert_eq!(vv1.get("us-east").unwrap().physical_ms, 100); // kept higher
402        assert_eq!(vv1.get("eu-west").unwrap().physical_ms, 120); // took higher
403        assert_eq!(vv1.get("ap-east").unwrap().physical_ms, 90); // new entry
404        assert_eq!(vv1.len(), 3);
405    }
406
407    #[test]
408    fn test_crdt_resolver_accept_new_event() {
409        let resolver = CrdtResolver::new();
410        let event = make_event("evt-1", "us-east", 100, 0, 1);
411
412        assert_eq!(resolver.resolve(&event), ConflictResolution::Accept);
413        resolver.accept(&event);
414        assert_eq!(resolver.seen_count(), 1);
415    }
416
417    #[test]
418    fn test_crdt_resolver_skip_duplicate() {
419        let resolver = CrdtResolver::new();
420        let event = make_event("evt-1", "us-east", 100, 0, 1);
421
422        assert_eq!(
423            resolver.resolve_and_accept(&event),
424            ConflictResolution::Accept
425        );
426        assert_eq!(resolver.resolve(&event), ConflictResolution::Skip);
427    }
428
429    #[test]
430    fn test_crdt_resolver_skip_old_version() {
431        let resolver = CrdtResolver::new();
432        let new_event = make_event("evt-2", "us-east", 200, 0, 1);
433        let old_event = make_event("evt-1", "us-east", 100, 0, 1);
434
435        resolver.resolve_and_accept(&new_event);
436        // old_event has lower HLC than what we've seen from us-east
437        assert_eq!(resolver.resolve(&old_event), ConflictResolution::Skip);
438    }
439
440    #[test]
441    fn test_crdt_resolver_different_regions_independent() {
442        let resolver = CrdtResolver::new();
443        let us_event = make_event("evt-1", "us-east", 100, 0, 1);
444        let eu_event = make_event("evt-2", "eu-west", 50, 0, 2);
445
446        resolver.resolve_and_accept(&us_event);
447        // eu-west at lower timestamp is still accepted (different region)
448        assert_eq!(resolver.resolve(&eu_event), ConflictResolution::Accept);
449    }
450
451    #[test]
452    fn test_deterministic_order() {
453        let mut events = vec![
454            make_event("evt-3", "ap-east", 100, 0, 3),
455            make_event("evt-1", "us-east", 100, 0, 1),
456            make_event("evt-2", "eu-west", 100, 0, 2),
457        ];
458
459        deterministic_order(&mut events);
460
461        // Same physical time + logical: ordered by node_id, then event_id
462        assert_eq!(events[0].event_id, "evt-1"); // node 1
463        assert_eq!(events[1].event_id, "evt-2"); // node 2
464        assert_eq!(events[2].event_id, "evt-3"); // node 3
465    }
466
467    #[test]
468    fn test_deterministic_order_by_hlc() {
469        let mut events = vec![
470            make_event("evt-1", "us-east", 300, 0, 1),
471            make_event("evt-2", "eu-west", 100, 0, 2),
472            make_event("evt-3", "ap-east", 200, 0, 3),
473        ];
474
475        deterministic_order(&mut events);
476
477        assert_eq!(events[0].event_id, "evt-2"); // 100ms
478        assert_eq!(events[1].event_id, "evt-3"); // 200ms
479        assert_eq!(events[2].event_id, "evt-1"); // 300ms
480    }
481
482    #[test]
483    fn test_replicated_event_serialization() {
484        let event = make_event("evt-1", "us-east", 1000, 5, 1);
485        let json = serde_json::to_string(&event).unwrap();
486        let parsed: ReplicatedEvent = serde_json::from_str(&json).unwrap();
487        assert_eq!(parsed.event_id, "evt-1");
488        assert_eq!(parsed.origin_region, "us-east");
489        assert_eq!(parsed.hlc_timestamp.physical_ms, 1000);
490    }
491}