allsource-core 0.10.4

High-performance event store core built in Rust
Documentation
/// CRDT-based Conflict Resolution for Geo-Replicated Events.
///
/// When events are ingested concurrently at different regions, conflicts are
/// resolved deterministically without coordination using CRDTs (Conflict-free
/// Replicated Data Types).
///
/// # Strategy
///
/// Events are immutable append-only facts — they use a **G-Set** (grow-only set)
/// CRDT by nature. The conflict resolution challenge is in **ordering**: when two
/// regions each write events at the same logical time, we need a deterministic
/// total order for consistency.
///
/// ## Resolution Rules (in priority order)
///
/// 1. **HLC timestamp**: Higher physical time wins
/// 2. **Logical counter**: If physical times are equal, higher logical counter wins
/// 3. **Node ID**: If both are equal, lower node ID wins (deterministic tiebreak)
/// 4. **Event ID (UUID)**: Final tiebreak on lexicographic UUID comparison
///
/// This produces a total order that all regions converge to identically,
/// without any coordination.
///
/// # Version Vectors
///
/// Each region maintains a version vector tracking the latest HLC timestamp
/// seen from every other region. During replication:
/// - Incoming events are accepted if their HLC > the vector entry for their origin
/// - The vector is updated on receipt
/// - This prevents duplicate delivery and enables efficient delta sync
use super::hlc::HlcTimestamp;
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;

/// Version vector tracking causal progress per region.
///
/// Maps region_id → latest HLC timestamp seen from that region.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VersionVector {
    /// Region ID → latest HLC timestamp from that region.
    entries: BTreeMap<String, HlcTimestamp>,
}

impl VersionVector {
    /// Create an empty version vector.
    pub fn new() -> Self {
        Self {
            entries: BTreeMap::new(),
        }
    }

    /// Update the vector entry for a region if the given timestamp is newer.
    /// Returns true if the vector was actually updated (i.e., the event is new).
    pub fn advance(&mut self, region_id: &str, ts: HlcTimestamp) -> bool {
        match self.entries.get(region_id) {
            Some(existing) if ts <= *existing => false,
            _ => {
                self.entries.insert(region_id.to_string(), ts);
                true
            }
        }
    }

    /// Check if a timestamp from a region is new (not yet seen).
    pub fn is_new(&self, region_id: &str, ts: &HlcTimestamp) -> bool {
        match self.entries.get(region_id) {
            Some(existing) => ts > existing,
            None => true,
        }
    }

    /// Get the latest timestamp for a region.
    pub fn get(&self, region_id: &str) -> Option<&HlcTimestamp> {
        self.entries.get(region_id)
    }

    /// Merge another version vector into this one (pointwise max).
    pub fn merge(&mut self, other: &VersionVector) {
        for (region, ts) in &other.entries {
            self.advance(region, *ts);
        }
    }

    /// Get all entries.
    pub fn entries(&self) -> &BTreeMap<String, HlcTimestamp> {
        &self.entries
    }

    /// Number of regions tracked.
    pub fn len(&self) -> usize {
        self.entries.len()
    }

    /// Check if empty.
    pub fn is_empty(&self) -> bool {
        self.entries.is_empty()
    }
}

impl Default for VersionVector {
    fn default() -> Self {
        Self::new()
    }
}

/// A replicated event carrying CRDT metadata for conflict resolution.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplicatedEvent {
    /// The event ID (UUID as string).
    pub event_id: String,
    /// HLC timestamp assigned at the origin region.
    pub hlc_timestamp: HlcTimestamp,
    /// Region ID where the event was originally ingested.
    pub origin_region: String,
    /// Serialized event data (the full Event struct as JSON).
    pub event_data: serde_json::Value,
}

/// Conflict resolution outcome.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ConflictResolution {
    /// Accept: the incoming event should be applied.
    Accept,
    /// Skip: the incoming event is a duplicate or superseded.
    Skip,
}

/// CRDT-based conflict resolver for geo-replicated events.
///
/// Thread-safe: uses DashMap internally for the version vector.
pub struct CrdtResolver {
    /// Per-region version vectors.
    version_vectors: DashMap<String, VersionVector>,
    /// Set of event IDs already seen (deduplication).
    seen_events: DashMap<String, ()>,
}

impl CrdtResolver {
    /// Create a new CRDT resolver.
    pub fn new() -> Self {
        Self {
            version_vectors: DashMap::new(),
            seen_events: DashMap::new(),
        }
    }

    /// Resolve whether an incoming replicated event should be accepted.
    ///
    /// Returns `Accept` if the event is new, `Skip` if it's a duplicate.
    pub fn resolve(&self, event: &ReplicatedEvent) -> ConflictResolution {
        // Dedup by event ID
        if self.seen_events.contains_key(&event.event_id) {
            return ConflictResolution::Skip;
        }

        // Check version vector for this region
        let is_new = self
            .version_vectors
            .get(&event.origin_region)
            .map(|vv| vv.is_new(&event.origin_region, &event.hlc_timestamp))
            .unwrap_or(true);

        if !is_new {
            return ConflictResolution::Skip;
        }

        ConflictResolution::Accept
    }

    /// Mark an event as accepted: update version vector and dedup set.
    pub fn accept(&self, event: &ReplicatedEvent) {
        self.seen_events.insert(event.event_id.clone(), ());

        let mut vv = self
            .version_vectors
            .entry(event.origin_region.clone())
            .or_default();
        vv.advance(&event.origin_region, event.hlc_timestamp);
    }

    /// Resolve and accept in one step. Returns the resolution.
    pub fn resolve_and_accept(&self, event: &ReplicatedEvent) -> ConflictResolution {
        let resolution = self.resolve(event);
        if resolution == ConflictResolution::Accept {
            self.accept(event);
        }
        resolution
    }

    /// Get the version vector for a specific region.
    pub fn version_vector_for(&self, region_id: &str) -> Option<VersionVector> {
        self.version_vectors.get(region_id).map(|vv| vv.clone())
    }

    /// Get all version vectors (for status/debug).
    pub fn all_version_vectors(&self) -> BTreeMap<String, VersionVector> {
        self.version_vectors
            .iter()
            .map(|entry| (entry.key().clone(), entry.value().clone()))
            .collect()
    }

    /// Merge a remote version vector into our local state.
    pub fn merge_version_vector(&self, region_id: &str, remote_vv: &VersionVector) {
        let mut vv = self
            .version_vectors
            .entry(region_id.to_string())
            .or_default();
        vv.merge(remote_vv);
    }

    /// Number of unique events seen.
    pub fn seen_count(&self) -> usize {
        self.seen_events.len()
    }
}

impl Default for CrdtResolver {
    fn default() -> Self {
        Self::new()
    }
}

/// Sort replicated events into deterministic total order.
///
/// This produces the same ordering on every node, regardless of arrival order.
pub fn deterministic_order(events: &mut [ReplicatedEvent]) {
    events.sort_by(|a, b| {
        a.hlc_timestamp
            .cmp(&b.hlc_timestamp)
            .then_with(|| a.event_id.cmp(&b.event_id))
    });
}

#[cfg(test)]
mod tests {
    use super::*;

    fn make_event(
        id: &str,
        region: &str,
        physical_ms: u64,
        logical: u32,
        node_id: u32,
    ) -> ReplicatedEvent {
        ReplicatedEvent {
            event_id: id.to_string(),
            hlc_timestamp: HlcTimestamp::new(physical_ms, logical, node_id),
            origin_region: region.to_string(),
            event_data: serde_json::json!({"type": "test"}),
        }
    }

    #[test]
    fn test_version_vector_advance() {
        let mut vv = VersionVector::new();
        let ts1 = HlcTimestamp::new(100, 0, 1);
        let ts2 = HlcTimestamp::new(200, 0, 1);
        let ts_old = HlcTimestamp::new(50, 0, 1);

        assert!(vv.advance("us-east", ts1));
        assert!(vv.advance("us-east", ts2)); // newer
        assert!(!vv.advance("us-east", ts_old)); // older, rejected

        assert_eq!(vv.get("us-east").unwrap().physical_ms, 200);
    }

    #[test]
    fn test_version_vector_is_new() {
        let mut vv = VersionVector::new();
        let ts = HlcTimestamp::new(100, 0, 1);
        vv.advance("us-east", ts);

        assert!(!vv.is_new("us-east", &HlcTimestamp::new(50, 0, 1)));
        assert!(!vv.is_new("us-east", &HlcTimestamp::new(100, 0, 1)));
        assert!(vv.is_new("us-east", &HlcTimestamp::new(101, 0, 1)));
        assert!(vv.is_new("eu-west", &HlcTimestamp::new(1, 0, 1))); // unknown region
    }

    #[test]
    fn test_version_vector_merge() {
        let mut vv1 = VersionVector::new();
        vv1.advance("us-east", HlcTimestamp::new(100, 0, 1));
        vv1.advance("eu-west", HlcTimestamp::new(50, 0, 2));

        let mut vv2 = VersionVector::new();
        vv2.advance("us-east", HlcTimestamp::new(80, 0, 1));
        vv2.advance("eu-west", HlcTimestamp::new(120, 0, 2));
        vv2.advance("ap-east", HlcTimestamp::new(90, 0, 3));

        vv1.merge(&vv2);

        assert_eq!(vv1.get("us-east").unwrap().physical_ms, 100); // kept higher
        assert_eq!(vv1.get("eu-west").unwrap().physical_ms, 120); // took higher
        assert_eq!(vv1.get("ap-east").unwrap().physical_ms, 90); // new entry
        assert_eq!(vv1.len(), 3);
    }

    #[test]
    fn test_crdt_resolver_accept_new_event() {
        let resolver = CrdtResolver::new();
        let event = make_event("evt-1", "us-east", 100, 0, 1);

        assert_eq!(resolver.resolve(&event), ConflictResolution::Accept);
        resolver.accept(&event);
        assert_eq!(resolver.seen_count(), 1);
    }

    #[test]
    fn test_crdt_resolver_skip_duplicate() {
        let resolver = CrdtResolver::new();
        let event = make_event("evt-1", "us-east", 100, 0, 1);

        assert_eq!(
            resolver.resolve_and_accept(&event),
            ConflictResolution::Accept
        );
        assert_eq!(resolver.resolve(&event), ConflictResolution::Skip);
    }

    #[test]
    fn test_crdt_resolver_skip_old_version() {
        let resolver = CrdtResolver::new();
        let new_event = make_event("evt-2", "us-east", 200, 0, 1);
        let old_event = make_event("evt-1", "us-east", 100, 0, 1);

        resolver.resolve_and_accept(&new_event);
        // old_event has lower HLC than what we've seen from us-east
        assert_eq!(resolver.resolve(&old_event), ConflictResolution::Skip);
    }

    #[test]
    fn test_crdt_resolver_different_regions_independent() {
        let resolver = CrdtResolver::new();
        let us_event = make_event("evt-1", "us-east", 100, 0, 1);
        let eu_event = make_event("evt-2", "eu-west", 50, 0, 2);

        resolver.resolve_and_accept(&us_event);
        // eu-west at lower timestamp is still accepted (different region)
        assert_eq!(resolver.resolve(&eu_event), ConflictResolution::Accept);
    }

    #[test]
    fn test_deterministic_order() {
        let mut events = vec![
            make_event("evt-3", "ap-east", 100, 0, 3),
            make_event("evt-1", "us-east", 100, 0, 1),
            make_event("evt-2", "eu-west", 100, 0, 2),
        ];

        deterministic_order(&mut events);

        // Same physical time + logical: ordered by node_id, then event_id
        assert_eq!(events[0].event_id, "evt-1"); // node 1
        assert_eq!(events[1].event_id, "evt-2"); // node 2
        assert_eq!(events[2].event_id, "evt-3"); // node 3
    }

    #[test]
    fn test_deterministic_order_by_hlc() {
        let mut events = vec![
            make_event("evt-1", "us-east", 300, 0, 1),
            make_event("evt-2", "eu-west", 100, 0, 2),
            make_event("evt-3", "ap-east", 200, 0, 3),
        ];

        deterministic_order(&mut events);

        assert_eq!(events[0].event_id, "evt-2"); // 100ms
        assert_eq!(events[1].event_id, "evt-3"); // 200ms
        assert_eq!(events[2].event_id, "evt-1"); // 300ms
    }

    #[test]
    fn test_replicated_event_serialization() {
        let event = make_event("evt-1", "us-east", 1000, 5, 1);
        let json = serde_json::to_string(&event).unwrap();
        let parsed: ReplicatedEvent = serde_json::from_str(&json).unwrap();
        assert_eq!(parsed.event_id, "evt-1");
        assert_eq!(parsed.origin_region, "us-east");
        assert_eq!(parsed.hlc_timestamp.physical_ms, 1000);
    }
}