Skip to main content

allsource_core/infrastructure/cluster/
crdt.rs

1/// CRDT-based Conflict Resolution for Geo-Replicated Events.
2///
3/// When events are ingested concurrently at different regions, conflicts are
4/// resolved deterministically without coordination using CRDTs (Conflict-free
5/// Replicated Data Types).
6///
7/// # Strategy
8///
9/// Events are immutable append-only facts — they use a **G-Set** (grow-only set)
10/// CRDT by nature. The conflict resolution challenge is in **ordering**: when two
11/// regions each write events at the same logical time, we need a deterministic
12/// total order for consistency.
13///
14/// ## Resolution Rules (in priority order)
15///
16/// 1. **HLC timestamp**: Higher physical time wins
17/// 2. **Logical counter**: If physical times are equal, higher logical counter wins
18/// 3. **Node ID**: If both are equal, lower node ID wins (deterministic tiebreak)
19/// 4. **Event ID (UUID)**: Final tiebreak on lexicographic UUID comparison
20///
21/// This produces a total order that all regions converge to identically,
22/// without any coordination.
23///
24/// # Version Vectors
25///
26/// Each region maintains a version vector tracking the latest HLC timestamp
27/// seen from every other region. During replication:
28/// - Incoming events are accepted if their HLC > the vector entry for their origin
29/// - The vector is updated on receipt
30/// - This prevents duplicate delivery and enables efficient delta sync
31use super::hlc::HlcTimestamp;
32use dashmap::DashMap;
33use serde::{Deserialize, Serialize};
34use std::collections::BTreeMap;
35
36/// Version vector tracking causal progress per region.
37///
38/// Maps region_id → latest HLC timestamp seen from that region.
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct VersionVector {
41    /// Region ID → latest HLC timestamp from that region.
42    entries: BTreeMap<String, HlcTimestamp>,
43}
44
45impl VersionVector {
46    /// Create an empty version vector.
47    pub fn new() -> Self {
48        Self {
49            entries: BTreeMap::new(),
50        }
51    }
52
53    /// Update the vector entry for a region if the given timestamp is newer.
54    /// Returns true if the vector was actually updated (i.e., the event is new).
55    pub fn advance(&mut self, region_id: &str, ts: HlcTimestamp) -> bool {
56        match self.entries.get(region_id) {
57            Some(existing) if ts <= *existing => false,
58            _ => {
59                self.entries.insert(region_id.to_string(), ts);
60                true
61            }
62        }
63    }
64
65    /// Check if a timestamp from a region is new (not yet seen).
66    pub fn is_new(&self, region_id: &str, ts: &HlcTimestamp) -> bool {
67        match self.entries.get(region_id) {
68            Some(existing) => ts > existing,
69            None => true,
70        }
71    }
72
73    /// Get the latest timestamp for a region.
74    pub fn get(&self, region_id: &str) -> Option<&HlcTimestamp> {
75        self.entries.get(region_id)
76    }
77
78    /// Merge another version vector into this one (pointwise max).
79    pub fn merge(&mut self, other: &VersionVector) {
80        for (region, ts) in &other.entries {
81            self.advance(region, *ts);
82        }
83    }
84
85    /// Get all entries.
86    pub fn entries(&self) -> &BTreeMap<String, HlcTimestamp> {
87        &self.entries
88    }
89
90    /// Number of regions tracked.
91    pub fn len(&self) -> usize {
92        self.entries.len()
93    }
94
95    /// Check if empty.
96    pub fn is_empty(&self) -> bool {
97        self.entries.is_empty()
98    }
99}
100
101impl Default for VersionVector {
102    fn default() -> Self {
103        Self::new()
104    }
105}
106
107/// A replicated event carrying CRDT metadata for conflict resolution.
108#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct ReplicatedEvent {
110    /// The event ID (UUID as string).
111    pub event_id: String,
112    /// HLC timestamp assigned at the origin region.
113    pub hlc_timestamp: HlcTimestamp,
114    /// Region ID where the event was originally ingested.
115    pub origin_region: String,
116    /// Serialized event data (the full Event struct as JSON).
117    pub event_data: serde_json::Value,
118}
119
120/// Conflict resolution outcome.
121#[derive(Debug, Clone, PartialEq, Eq)]
122pub enum ConflictResolution {
123    /// Accept: the incoming event should be applied.
124    Accept,
125    /// Skip: the incoming event is a duplicate or superseded.
126    Skip,
127}
128
129/// Per-entity-type merge strategy for conflict resolution.
130///
131/// Controls how the CRDT resolver handles events for specific event types
132/// (or prefixes). The default behavior is `AppendOnly`.
133#[derive(Debug, Clone, PartialEq, Eq)]
134pub enum MergeStrategy {
135    /// Accept all events (default, current G-Set behavior).
136    AppendOnly,
137    /// Newer HLC timestamp wins for same entity+type. Earlier events are skipped.
138    LastWriteWins,
139    /// First event for entity+type wins. Subsequent events are rejected.
140    FirstWriteWins,
141}
142
143/// CRDT-based conflict resolver for geo-replicated events.
144///
145/// Thread-safe: uses DashMap internally for the version vector.
146///
147/// Supports configurable per-entity-type merge strategies via
148/// [`with_strategies`](Self::with_strategies). Strategy lookup uses prefix
149/// matching: a strategy registered for `"config."` applies to
150/// `"config.updated"`, `"config.deleted"`, etc. The most specific
151/// (longest) prefix match wins. Unmatched types fall back to `AppendOnly`.
152pub struct CrdtResolver {
153    /// Per-region version vectors.
154    version_vectors: DashMap<String, VersionVector>,
155    /// Set of event IDs already seen (deduplication).
156    seen_events: DashMap<String, ()>,
157    /// Per-event-type merge strategies (prefix → strategy).
158    /// Sorted by prefix length descending for longest-match-first lookup.
159    strategies: Vec<(String, MergeStrategy)>,
160    /// entity+type → winning HLC timestamp, used for LWW and FWW tracking.
161    entity_type_winners: DashMap<String, HlcTimestamp>,
162}
163
164impl CrdtResolver {
165    /// Create a new CRDT resolver with default AppendOnly behavior.
166    pub fn new() -> Self {
167        Self {
168            version_vectors: DashMap::new(),
169            seen_events: DashMap::new(),
170            strategies: Vec::new(),
171            entity_type_winners: DashMap::new(),
172        }
173    }
174
175    /// Create a CRDT resolver with per-event-type merge strategies.
176    ///
177    /// Each entry is `(prefix, strategy)`. Prefix matching is used:
178    /// `"config."` matches `"config.updated"`. The longest matching
179    /// prefix wins. Unmatched types use `AppendOnly`.
180    pub fn with_strategies(strategies: Vec<(String, MergeStrategy)>) -> Self {
181        let mut sorted = strategies;
182        // Sort by prefix length descending for longest-match-first
183        sorted.sort_by(|a, b| b.0.len().cmp(&a.0.len()));
184        Self {
185            version_vectors: DashMap::new(),
186            seen_events: DashMap::new(),
187            strategies: sorted,
188            entity_type_winners: DashMap::new(),
189        }
190    }
191
192    /// Look up the merge strategy for an event type (longest prefix match).
193    fn strategy_for(&self, event_type: &str) -> &MergeStrategy {
194        for (prefix, strategy) in &self.strategies {
195            if event_type.starts_with(prefix.as_str()) {
196                return strategy;
197            }
198        }
199        // Static reference to default — avoids allocation
200        &MergeStrategy::AppendOnly
201    }
202
203    /// Resolve whether an incoming replicated event should be accepted.
204    ///
205    /// Returns `Accept` if the event is new, `Skip` if it's a duplicate.
206    /// Consults the per-event-type merge strategy when one is registered.
207    pub fn resolve(&self, event: &ReplicatedEvent) -> ConflictResolution {
208        // Dedup by event ID
209        if self.seen_events.contains_key(&event.event_id) {
210            return ConflictResolution::Skip;
211        }
212
213        // Check version vector for this region
214        let is_new = self
215            .version_vectors
216            .get(&event.origin_region)
217            .map(|vv| vv.is_new(&event.origin_region, &event.hlc_timestamp))
218            .unwrap_or(true);
219
220        if !is_new {
221            return ConflictResolution::Skip;
222        }
223
224        // Extract event_type and entity_id from event_data for strategy lookup
225        let event_type = event
226            .event_data
227            .get("event_type")
228            .and_then(|v| v.as_str())
229            .unwrap_or("");
230        let entity_id = event
231            .event_data
232            .get("entity_id")
233            .and_then(|v| v.as_str())
234            .unwrap_or("");
235
236        match self.strategy_for(event_type) {
237            MergeStrategy::AppendOnly => ConflictResolution::Accept,
238            MergeStrategy::LastWriteWins => {
239                let key = format!("{}\x00{}", entity_id, event_type);
240                match self.entity_type_winners.get(&key) {
241                    Some(existing) if event.hlc_timestamp <= *existing => ConflictResolution::Skip,
242                    _ => ConflictResolution::Accept,
243                }
244            }
245            MergeStrategy::FirstWriteWins => {
246                let key = format!("{}\x00{}", entity_id, event_type);
247                if self.entity_type_winners.contains_key(&key) {
248                    ConflictResolution::Skip
249                } else {
250                    ConflictResolution::Accept
251                }
252            }
253        }
254    }
255
256    /// Mark an event as accepted: update version vector, dedup set, and strategy state.
257    pub fn accept(&self, event: &ReplicatedEvent) {
258        self.seen_events.insert(event.event_id.clone(), ());
259
260        let mut vv = self
261            .version_vectors
262            .entry(event.origin_region.clone())
263            .or_default();
264        vv.advance(&event.origin_region, event.hlc_timestamp);
265
266        // Track entity+type winner for LWW/FWW strategies
267        let event_type = event
268            .event_data
269            .get("event_type")
270            .and_then(|v| v.as_str())
271            .unwrap_or("");
272        let entity_id = event
273            .event_data
274            .get("entity_id")
275            .and_then(|v| v.as_str())
276            .unwrap_or("");
277        if !event_type.is_empty() && !entity_id.is_empty() {
278            let key = format!("{}\x00{}", entity_id, event_type);
279            self.entity_type_winners
280                .entry(key)
281                .and_modify(|existing| {
282                    if event.hlc_timestamp > *existing {
283                        *existing = event.hlc_timestamp;
284                    }
285                })
286                .or_insert(event.hlc_timestamp);
287        }
288    }
289
290    /// Resolve and accept in one step. Returns the resolution.
291    pub fn resolve_and_accept(&self, event: &ReplicatedEvent) -> ConflictResolution {
292        let resolution = self.resolve(event);
293        if resolution == ConflictResolution::Accept {
294            self.accept(event);
295        }
296        resolution
297    }
298
299    /// Get the version vector for a specific region.
300    pub fn version_vector_for(&self, region_id: &str) -> Option<VersionVector> {
301        self.version_vectors.get(region_id).map(|vv| vv.clone())
302    }
303
304    /// Get all version vectors (for status/debug).
305    pub fn all_version_vectors(&self) -> BTreeMap<String, VersionVector> {
306        self.version_vectors
307            .iter()
308            .map(|entry| (entry.key().clone(), entry.value().clone()))
309            .collect()
310    }
311
312    /// Merge a remote version vector into our local state.
313    pub fn merge_version_vector(&self, region_id: &str, remote_vv: &VersionVector) {
314        let mut vv = self
315            .version_vectors
316            .entry(region_id.to_string())
317            .or_default();
318        vv.merge(remote_vv);
319    }
320
321    /// Number of unique events seen.
322    pub fn seen_count(&self) -> usize {
323        self.seen_events.len()
324    }
325}
326
327impl Default for CrdtResolver {
328    fn default() -> Self {
329        Self::new()
330    }
331}
332
333/// Sort replicated events into deterministic total order.
334///
335/// This produces the same ordering on every node, regardless of arrival order.
336pub fn deterministic_order(events: &mut [ReplicatedEvent]) {
337    events.sort_by(|a, b| {
338        a.hlc_timestamp
339            .cmp(&b.hlc_timestamp)
340            .then_with(|| a.event_id.cmp(&b.event_id))
341    });
342}
343
344#[cfg(test)]
345mod tests {
346    use super::*;
347
348    fn make_event(
349        id: &str,
350        region: &str,
351        physical_ms: u64,
352        logical: u32,
353        node_id: u32,
354    ) -> ReplicatedEvent {
355        ReplicatedEvent {
356            event_id: id.to_string(),
357            hlc_timestamp: HlcTimestamp::new(physical_ms, logical, node_id),
358            origin_region: region.to_string(),
359            event_data: serde_json::json!({"type": "test"}),
360        }
361    }
362
363    #[test]
364    fn test_version_vector_advance() {
365        let mut vv = VersionVector::new();
366        let ts1 = HlcTimestamp::new(100, 0, 1);
367        let ts2 = HlcTimestamp::new(200, 0, 1);
368        let ts_old = HlcTimestamp::new(50, 0, 1);
369
370        assert!(vv.advance("us-east", ts1));
371        assert!(vv.advance("us-east", ts2)); // newer
372        assert!(!vv.advance("us-east", ts_old)); // older, rejected
373
374        assert_eq!(vv.get("us-east").unwrap().physical_ms, 200);
375    }
376
377    #[test]
378    fn test_version_vector_is_new() {
379        let mut vv = VersionVector::new();
380        let ts = HlcTimestamp::new(100, 0, 1);
381        vv.advance("us-east", ts);
382
383        assert!(!vv.is_new("us-east", &HlcTimestamp::new(50, 0, 1)));
384        assert!(!vv.is_new("us-east", &HlcTimestamp::new(100, 0, 1)));
385        assert!(vv.is_new("us-east", &HlcTimestamp::new(101, 0, 1)));
386        assert!(vv.is_new("eu-west", &HlcTimestamp::new(1, 0, 1))); // unknown region
387    }
388
389    #[test]
390    fn test_version_vector_merge() {
391        let mut vv1 = VersionVector::new();
392        vv1.advance("us-east", HlcTimestamp::new(100, 0, 1));
393        vv1.advance("eu-west", HlcTimestamp::new(50, 0, 2));
394
395        let mut vv2 = VersionVector::new();
396        vv2.advance("us-east", HlcTimestamp::new(80, 0, 1));
397        vv2.advance("eu-west", HlcTimestamp::new(120, 0, 2));
398        vv2.advance("ap-east", HlcTimestamp::new(90, 0, 3));
399
400        vv1.merge(&vv2);
401
402        assert_eq!(vv1.get("us-east").unwrap().physical_ms, 100); // kept higher
403        assert_eq!(vv1.get("eu-west").unwrap().physical_ms, 120); // took higher
404        assert_eq!(vv1.get("ap-east").unwrap().physical_ms, 90); // new entry
405        assert_eq!(vv1.len(), 3);
406    }
407
408    #[test]
409    fn test_crdt_resolver_accept_new_event() {
410        let resolver = CrdtResolver::new();
411        let event = make_event("evt-1", "us-east", 100, 0, 1);
412
413        assert_eq!(resolver.resolve(&event), ConflictResolution::Accept);
414        resolver.accept(&event);
415        assert_eq!(resolver.seen_count(), 1);
416    }
417
418    #[test]
419    fn test_crdt_resolver_skip_duplicate() {
420        let resolver = CrdtResolver::new();
421        let event = make_event("evt-1", "us-east", 100, 0, 1);
422
423        assert_eq!(
424            resolver.resolve_and_accept(&event),
425            ConflictResolution::Accept
426        );
427        assert_eq!(resolver.resolve(&event), ConflictResolution::Skip);
428    }
429
430    #[test]
431    fn test_crdt_resolver_skip_old_version() {
432        let resolver = CrdtResolver::new();
433        let new_event = make_event("evt-2", "us-east", 200, 0, 1);
434        let old_event = make_event("evt-1", "us-east", 100, 0, 1);
435
436        resolver.resolve_and_accept(&new_event);
437        // old_event has lower HLC than what we've seen from us-east
438        assert_eq!(resolver.resolve(&old_event), ConflictResolution::Skip);
439    }
440
441    #[test]
442    fn test_crdt_resolver_different_regions_independent() {
443        let resolver = CrdtResolver::new();
444        let us_event = make_event("evt-1", "us-east", 100, 0, 1);
445        let eu_event = make_event("evt-2", "eu-west", 50, 0, 2);
446
447        resolver.resolve_and_accept(&us_event);
448        // eu-west at lower timestamp is still accepted (different region)
449        assert_eq!(resolver.resolve(&eu_event), ConflictResolution::Accept);
450    }
451
452    #[test]
453    fn test_deterministic_order() {
454        let mut events = vec![
455            make_event("evt-3", "ap-east", 100, 0, 3),
456            make_event("evt-1", "us-east", 100, 0, 1),
457            make_event("evt-2", "eu-west", 100, 0, 2),
458        ];
459
460        deterministic_order(&mut events);
461
462        // Same physical time + logical: ordered by node_id, then event_id
463        assert_eq!(events[0].event_id, "evt-1"); // node 1
464        assert_eq!(events[1].event_id, "evt-2"); // node 2
465        assert_eq!(events[2].event_id, "evt-3"); // node 3
466    }
467
468    #[test]
469    fn test_deterministic_order_by_hlc() {
470        let mut events = vec![
471            make_event("evt-1", "us-east", 300, 0, 1),
472            make_event("evt-2", "eu-west", 100, 0, 2),
473            make_event("evt-3", "ap-east", 200, 0, 3),
474        ];
475
476        deterministic_order(&mut events);
477
478        assert_eq!(events[0].event_id, "evt-2"); // 100ms
479        assert_eq!(events[1].event_id, "evt-3"); // 200ms
480        assert_eq!(events[2].event_id, "evt-1"); // 300ms
481    }
482
483    #[test]
484    fn test_replicated_event_serialization() {
485        let event = make_event("evt-1", "us-east", 1000, 5, 1);
486        let json = serde_json::to_string(&event).unwrap();
487        let parsed: ReplicatedEvent = serde_json::from_str(&json).unwrap();
488        assert_eq!(parsed.event_id, "evt-1");
489        assert_eq!(parsed.origin_region, "us-east");
490        assert_eq!(parsed.hlc_timestamp.physical_ms, 1000);
491    }
492}