use super::hlc::HlcTimestamp;
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VersionVector {
entries: BTreeMap<String, HlcTimestamp>,
}
impl VersionVector {
pub fn new() -> Self {
Self {
entries: BTreeMap::new(),
}
}
pub fn advance(&mut self, region_id: &str, ts: HlcTimestamp) -> bool {
match self.entries.get(region_id) {
Some(existing) if ts <= *existing => false,
_ => {
self.entries.insert(region_id.to_string(), ts);
true
}
}
}
pub fn is_new(&self, region_id: &str, ts: &HlcTimestamp) -> bool {
match self.entries.get(region_id) {
Some(existing) => ts > existing,
None => true,
}
}
pub fn get(&self, region_id: &str) -> Option<&HlcTimestamp> {
self.entries.get(region_id)
}
pub fn merge(&mut self, other: &VersionVector) {
for (region, ts) in &other.entries {
self.advance(region, *ts);
}
}
pub fn entries(&self) -> &BTreeMap<String, HlcTimestamp> {
&self.entries
}
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
}
impl Default for VersionVector {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplicatedEvent {
pub event_id: String,
pub hlc_timestamp: HlcTimestamp,
pub origin_region: String,
pub event_data: serde_json::Value,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ConflictResolution {
Accept,
Skip,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum MergeStrategy {
AppendOnly,
LastWriteWins,
FirstWriteWins,
}
pub struct CrdtResolver {
version_vectors: DashMap<String, VersionVector>,
seen_events: DashMap<String, ()>,
strategies: Vec<(String, MergeStrategy)>,
entity_type_winners: DashMap<String, HlcTimestamp>,
}
impl CrdtResolver {
pub fn new() -> Self {
Self {
version_vectors: DashMap::new(),
seen_events: DashMap::new(),
strategies: Vec::new(),
entity_type_winners: DashMap::new(),
}
}
pub fn with_strategies(strategies: Vec<(String, MergeStrategy)>) -> Self {
let mut sorted = strategies;
sorted.sort_by(|a, b| b.0.len().cmp(&a.0.len()));
Self {
version_vectors: DashMap::new(),
seen_events: DashMap::new(),
strategies: sorted,
entity_type_winners: DashMap::new(),
}
}
fn strategy_for(&self, event_type: &str) -> &MergeStrategy {
for (prefix, strategy) in &self.strategies {
if event_type.starts_with(prefix.as_str()) {
return strategy;
}
}
&MergeStrategy::AppendOnly
}
pub fn resolve(&self, event: &ReplicatedEvent) -> ConflictResolution {
if self.seen_events.contains_key(&event.event_id) {
return ConflictResolution::Skip;
}
let is_new = self
.version_vectors
.get(&event.origin_region)
.is_none_or(|vv| vv.is_new(&event.origin_region, &event.hlc_timestamp));
if !is_new {
return ConflictResolution::Skip;
}
let event_type = event
.event_data
.get("event_type")
.and_then(|v| v.as_str())
.unwrap_or("");
let entity_id = event
.event_data
.get("entity_id")
.and_then(|v| v.as_str())
.unwrap_or("");
match self.strategy_for(event_type) {
MergeStrategy::AppendOnly => ConflictResolution::Accept,
MergeStrategy::LastWriteWins => {
let key = format!("{entity_id}\x00{event_type}");
match self.entity_type_winners.get(&key) {
Some(existing) if event.hlc_timestamp <= *existing => ConflictResolution::Skip,
_ => ConflictResolution::Accept,
}
}
MergeStrategy::FirstWriteWins => {
let key = format!("{entity_id}\x00{event_type}");
if self.entity_type_winners.contains_key(&key) {
ConflictResolution::Skip
} else {
ConflictResolution::Accept
}
}
}
}
pub fn accept(&self, event: &ReplicatedEvent) {
self.seen_events.insert(event.event_id.clone(), ());
let mut vv = self
.version_vectors
.entry(event.origin_region.clone())
.or_default();
vv.advance(&event.origin_region, event.hlc_timestamp);
let event_type = event
.event_data
.get("event_type")
.and_then(|v| v.as_str())
.unwrap_or("");
let entity_id = event
.event_data
.get("entity_id")
.and_then(|v| v.as_str())
.unwrap_or("");
if !event_type.is_empty() && !entity_id.is_empty() {
let key = format!("{entity_id}\x00{event_type}");
self.entity_type_winners
.entry(key)
.and_modify(|existing| {
if event.hlc_timestamp > *existing {
*existing = event.hlc_timestamp;
}
})
.or_insert(event.hlc_timestamp);
}
}
pub fn resolve_and_accept(&self, event: &ReplicatedEvent) -> ConflictResolution {
let resolution = self.resolve(event);
if resolution == ConflictResolution::Accept {
self.accept(event);
}
resolution
}
pub fn version_vector_for(&self, region_id: &str) -> Option<VersionVector> {
self.version_vectors.get(region_id).map(|vv| vv.clone())
}
pub fn all_version_vectors(&self) -> BTreeMap<String, VersionVector> {
self.version_vectors
.iter()
.map(|entry| (entry.key().clone(), entry.value().clone()))
.collect()
}
pub fn merge_version_vector(&self, region_id: &str, remote_vv: &VersionVector) {
let mut vv = self
.version_vectors
.entry(region_id.to_string())
.or_default();
vv.merge(remote_vv);
}
pub fn seen_count(&self) -> usize {
self.seen_events.len()
}
}
impl Default for CrdtResolver {
fn default() -> Self {
Self::new()
}
}
pub fn deterministic_order(events: &mut [ReplicatedEvent]) {
events.sort_by(|a, b| {
a.hlc_timestamp
.cmp(&b.hlc_timestamp)
.then_with(|| a.event_id.cmp(&b.event_id))
});
}
#[cfg(test)]
mod tests {
use super::*;
fn make_event(
id: &str,
region: &str,
physical_ms: u64,
logical: u32,
node_id: u32,
) -> ReplicatedEvent {
ReplicatedEvent {
event_id: id.to_string(),
hlc_timestamp: HlcTimestamp::new(physical_ms, logical, node_id),
origin_region: region.to_string(),
event_data: serde_json::json!({"type": "test"}),
}
}
#[test]
fn test_version_vector_advance() {
let mut vv = VersionVector::new();
let ts1 = HlcTimestamp::new(100, 0, 1);
let ts2 = HlcTimestamp::new(200, 0, 1);
let ts_old = HlcTimestamp::new(50, 0, 1);
assert!(vv.advance("us-east", ts1));
assert!(vv.advance("us-east", ts2)); assert!(!vv.advance("us-east", ts_old));
assert_eq!(vv.get("us-east").unwrap().physical_ms, 200);
}
#[test]
fn test_version_vector_is_new() {
let mut vv = VersionVector::new();
let ts = HlcTimestamp::new(100, 0, 1);
vv.advance("us-east", ts);
assert!(!vv.is_new("us-east", &HlcTimestamp::new(50, 0, 1)));
assert!(!vv.is_new("us-east", &HlcTimestamp::new(100, 0, 1)));
assert!(vv.is_new("us-east", &HlcTimestamp::new(101, 0, 1)));
assert!(vv.is_new("eu-west", &HlcTimestamp::new(1, 0, 1))); }
#[test]
fn test_version_vector_merge() {
let mut vv1 = VersionVector::new();
vv1.advance("us-east", HlcTimestamp::new(100, 0, 1));
vv1.advance("eu-west", HlcTimestamp::new(50, 0, 2));
let mut vv2 = VersionVector::new();
vv2.advance("us-east", HlcTimestamp::new(80, 0, 1));
vv2.advance("eu-west", HlcTimestamp::new(120, 0, 2));
vv2.advance("ap-east", HlcTimestamp::new(90, 0, 3));
vv1.merge(&vv2);
assert_eq!(vv1.get("us-east").unwrap().physical_ms, 100); assert_eq!(vv1.get("eu-west").unwrap().physical_ms, 120); assert_eq!(vv1.get("ap-east").unwrap().physical_ms, 90); assert_eq!(vv1.len(), 3);
}
#[test]
fn test_crdt_resolver_accept_new_event() {
let resolver = CrdtResolver::new();
let event = make_event("evt-1", "us-east", 100, 0, 1);
assert_eq!(resolver.resolve(&event), ConflictResolution::Accept);
resolver.accept(&event);
assert_eq!(resolver.seen_count(), 1);
}
#[test]
fn test_crdt_resolver_skip_duplicate() {
let resolver = CrdtResolver::new();
let event = make_event("evt-1", "us-east", 100, 0, 1);
assert_eq!(
resolver.resolve_and_accept(&event),
ConflictResolution::Accept
);
assert_eq!(resolver.resolve(&event), ConflictResolution::Skip);
}
#[test]
fn test_crdt_resolver_skip_old_version() {
let resolver = CrdtResolver::new();
let new_event = make_event("evt-2", "us-east", 200, 0, 1);
let old_event = make_event("evt-1", "us-east", 100, 0, 1);
resolver.resolve_and_accept(&new_event);
assert_eq!(resolver.resolve(&old_event), ConflictResolution::Skip);
}
#[test]
fn test_crdt_resolver_different_regions_independent() {
let resolver = CrdtResolver::new();
let us_event = make_event("evt-1", "us-east", 100, 0, 1);
let eu_event = make_event("evt-2", "eu-west", 50, 0, 2);
resolver.resolve_and_accept(&us_event);
assert_eq!(resolver.resolve(&eu_event), ConflictResolution::Accept);
}
#[test]
fn test_deterministic_order() {
let mut events = vec![
make_event("evt-3", "ap-east", 100, 0, 3),
make_event("evt-1", "us-east", 100, 0, 1),
make_event("evt-2", "eu-west", 100, 0, 2),
];
deterministic_order(&mut events);
assert_eq!(events[0].event_id, "evt-1"); assert_eq!(events[1].event_id, "evt-2"); assert_eq!(events[2].event_id, "evt-3"); }
#[test]
fn test_deterministic_order_by_hlc() {
let mut events = vec![
make_event("evt-1", "us-east", 300, 0, 1),
make_event("evt-2", "eu-west", 100, 0, 2),
make_event("evt-3", "ap-east", 200, 0, 3),
];
deterministic_order(&mut events);
assert_eq!(events[0].event_id, "evt-2"); assert_eq!(events[1].event_id, "evt-3"); assert_eq!(events[2].event_id, "evt-1"); }
#[test]
fn test_replicated_event_serialization() {
let event = make_event("evt-1", "us-east", 1000, 5, 1);
let json = serde_json::to_string(&event).unwrap();
let parsed: ReplicatedEvent = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.event_id, "evt-1");
assert_eq!(parsed.origin_region, "us-east");
assert_eq!(parsed.hlc_timestamp.physical_ms, 1000);
}
}