1use super::hlc::HlcTimestamp;
32use dashmap::DashMap;
33use serde::{Deserialize, Serialize};
34use std::collections::BTreeMap;
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct VersionVector {
41 entries: BTreeMap<String, HlcTimestamp>,
43}
44
45impl VersionVector {
46 pub fn new() -> Self {
48 Self {
49 entries: BTreeMap::new(),
50 }
51 }
52
53 pub fn advance(&mut self, region_id: &str, ts: HlcTimestamp) -> bool {
56 match self.entries.get(region_id) {
57 Some(existing) if ts <= *existing => false,
58 _ => {
59 self.entries.insert(region_id.to_string(), ts);
60 true
61 }
62 }
63 }
64
65 pub fn is_new(&self, region_id: &str, ts: &HlcTimestamp) -> bool {
67 match self.entries.get(region_id) {
68 Some(existing) => ts > existing,
69 None => true,
70 }
71 }
72
73 pub fn get(&self, region_id: &str) -> Option<&HlcTimestamp> {
75 self.entries.get(region_id)
76 }
77
78 pub fn merge(&mut self, other: &VersionVector) {
80 for (region, ts) in &other.entries {
81 self.advance(region, *ts);
82 }
83 }
84
85 pub fn entries(&self) -> &BTreeMap<String, HlcTimestamp> {
87 &self.entries
88 }
89
90 pub fn len(&self) -> usize {
92 self.entries.len()
93 }
94
95 pub fn is_empty(&self) -> bool {
97 self.entries.is_empty()
98 }
99}
100
101impl Default for VersionVector {
102 fn default() -> Self {
103 Self::new()
104 }
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct ReplicatedEvent {
110 pub event_id: String,
112 pub hlc_timestamp: HlcTimestamp,
114 pub origin_region: String,
116 pub event_data: serde_json::Value,
118}
119
120#[derive(Debug, Clone, PartialEq, Eq)]
122pub enum ConflictResolution {
123 Accept,
125 Skip,
127}
128
129#[derive(Debug, Clone, PartialEq, Eq)]
134pub enum MergeStrategy {
135 AppendOnly,
137 LastWriteWins,
139 FirstWriteWins,
141}
142
143pub struct CrdtResolver {
153 version_vectors: DashMap<String, VersionVector>,
155 seen_events: DashMap<String, ()>,
157 strategies: Vec<(String, MergeStrategy)>,
160 entity_type_winners: DashMap<String, HlcTimestamp>,
162}
163
164impl CrdtResolver {
165 pub fn new() -> Self {
167 Self {
168 version_vectors: DashMap::new(),
169 seen_events: DashMap::new(),
170 strategies: Vec::new(),
171 entity_type_winners: DashMap::new(),
172 }
173 }
174
175 pub fn with_strategies(strategies: Vec<(String, MergeStrategy)>) -> Self {
181 let mut sorted = strategies;
182 sorted.sort_by(|a, b| b.0.len().cmp(&a.0.len()));
184 Self {
185 version_vectors: DashMap::new(),
186 seen_events: DashMap::new(),
187 strategies: sorted,
188 entity_type_winners: DashMap::new(),
189 }
190 }
191
192 fn strategy_for(&self, event_type: &str) -> &MergeStrategy {
194 for (prefix, strategy) in &self.strategies {
195 if event_type.starts_with(prefix.as_str()) {
196 return strategy;
197 }
198 }
199 &MergeStrategy::AppendOnly
201 }
202
203 pub fn resolve(&self, event: &ReplicatedEvent) -> ConflictResolution {
208 if self.seen_events.contains_key(&event.event_id) {
210 return ConflictResolution::Skip;
211 }
212
213 let is_new = self
215 .version_vectors
216 .get(&event.origin_region)
217 .map(|vv| vv.is_new(&event.origin_region, &event.hlc_timestamp))
218 .unwrap_or(true);
219
220 if !is_new {
221 return ConflictResolution::Skip;
222 }
223
224 let event_type = event
226 .event_data
227 .get("event_type")
228 .and_then(|v| v.as_str())
229 .unwrap_or("");
230 let entity_id = event
231 .event_data
232 .get("entity_id")
233 .and_then(|v| v.as_str())
234 .unwrap_or("");
235
236 match self.strategy_for(event_type) {
237 MergeStrategy::AppendOnly => ConflictResolution::Accept,
238 MergeStrategy::LastWriteWins => {
239 let key = format!("{}\x00{}", entity_id, event_type);
240 match self.entity_type_winners.get(&key) {
241 Some(existing) if event.hlc_timestamp <= *existing => {
242 ConflictResolution::Skip
243 }
244 _ => ConflictResolution::Accept,
245 }
246 }
247 MergeStrategy::FirstWriteWins => {
248 let key = format!("{}\x00{}", entity_id, event_type);
249 if self.entity_type_winners.contains_key(&key) {
250 ConflictResolution::Skip
251 } else {
252 ConflictResolution::Accept
253 }
254 }
255 }
256 }
257
258 pub fn accept(&self, event: &ReplicatedEvent) {
260 self.seen_events.insert(event.event_id.clone(), ());
261
262 let mut vv = self
263 .version_vectors
264 .entry(event.origin_region.clone())
265 .or_default();
266 vv.advance(&event.origin_region, event.hlc_timestamp);
267
268 let event_type = event
270 .event_data
271 .get("event_type")
272 .and_then(|v| v.as_str())
273 .unwrap_or("");
274 let entity_id = event
275 .event_data
276 .get("entity_id")
277 .and_then(|v| v.as_str())
278 .unwrap_or("");
279 if !event_type.is_empty() && !entity_id.is_empty() {
280 let key = format!("{}\x00{}", entity_id, event_type);
281 self.entity_type_winners
282 .entry(key)
283 .and_modify(|existing| {
284 if event.hlc_timestamp > *existing {
285 *existing = event.hlc_timestamp;
286 }
287 })
288 .or_insert(event.hlc_timestamp);
289 }
290 }
291
292 pub fn resolve_and_accept(&self, event: &ReplicatedEvent) -> ConflictResolution {
294 let resolution = self.resolve(event);
295 if resolution == ConflictResolution::Accept {
296 self.accept(event);
297 }
298 resolution
299 }
300
301 pub fn version_vector_for(&self, region_id: &str) -> Option<VersionVector> {
303 self.version_vectors.get(region_id).map(|vv| vv.clone())
304 }
305
306 pub fn all_version_vectors(&self) -> BTreeMap<String, VersionVector> {
308 self.version_vectors
309 .iter()
310 .map(|entry| (entry.key().clone(), entry.value().clone()))
311 .collect()
312 }
313
314 pub fn merge_version_vector(&self, region_id: &str, remote_vv: &VersionVector) {
316 let mut vv = self
317 .version_vectors
318 .entry(region_id.to_string())
319 .or_default();
320 vv.merge(remote_vv);
321 }
322
323 pub fn seen_count(&self) -> usize {
325 self.seen_events.len()
326 }
327}
328
329impl Default for CrdtResolver {
330 fn default() -> Self {
331 Self::new()
332 }
333}
334
335pub fn deterministic_order(events: &mut [ReplicatedEvent]) {
339 events.sort_by(|a, b| {
340 a.hlc_timestamp
341 .cmp(&b.hlc_timestamp)
342 .then_with(|| a.event_id.cmp(&b.event_id))
343 });
344}
345
346#[cfg(test)]
347mod tests {
348 use super::*;
349
350 fn make_event(
351 id: &str,
352 region: &str,
353 physical_ms: u64,
354 logical: u32,
355 node_id: u32,
356 ) -> ReplicatedEvent {
357 ReplicatedEvent {
358 event_id: id.to_string(),
359 hlc_timestamp: HlcTimestamp::new(physical_ms, logical, node_id),
360 origin_region: region.to_string(),
361 event_data: serde_json::json!({"type": "test"}),
362 }
363 }
364
365 #[test]
366 fn test_version_vector_advance() {
367 let mut vv = VersionVector::new();
368 let ts1 = HlcTimestamp::new(100, 0, 1);
369 let ts2 = HlcTimestamp::new(200, 0, 1);
370 let ts_old = HlcTimestamp::new(50, 0, 1);
371
372 assert!(vv.advance("us-east", ts1));
373 assert!(vv.advance("us-east", ts2)); assert!(!vv.advance("us-east", ts_old)); assert_eq!(vv.get("us-east").unwrap().physical_ms, 200);
377 }
378
379 #[test]
380 fn test_version_vector_is_new() {
381 let mut vv = VersionVector::new();
382 let ts = HlcTimestamp::new(100, 0, 1);
383 vv.advance("us-east", ts);
384
385 assert!(!vv.is_new("us-east", &HlcTimestamp::new(50, 0, 1)));
386 assert!(!vv.is_new("us-east", &HlcTimestamp::new(100, 0, 1)));
387 assert!(vv.is_new("us-east", &HlcTimestamp::new(101, 0, 1)));
388 assert!(vv.is_new("eu-west", &HlcTimestamp::new(1, 0, 1))); }
390
391 #[test]
392 fn test_version_vector_merge() {
393 let mut vv1 = VersionVector::new();
394 vv1.advance("us-east", HlcTimestamp::new(100, 0, 1));
395 vv1.advance("eu-west", HlcTimestamp::new(50, 0, 2));
396
397 let mut vv2 = VersionVector::new();
398 vv2.advance("us-east", HlcTimestamp::new(80, 0, 1));
399 vv2.advance("eu-west", HlcTimestamp::new(120, 0, 2));
400 vv2.advance("ap-east", HlcTimestamp::new(90, 0, 3));
401
402 vv1.merge(&vv2);
403
404 assert_eq!(vv1.get("us-east").unwrap().physical_ms, 100); assert_eq!(vv1.get("eu-west").unwrap().physical_ms, 120); assert_eq!(vv1.get("ap-east").unwrap().physical_ms, 90); assert_eq!(vv1.len(), 3);
408 }
409
410 #[test]
411 fn test_crdt_resolver_accept_new_event() {
412 let resolver = CrdtResolver::new();
413 let event = make_event("evt-1", "us-east", 100, 0, 1);
414
415 assert_eq!(resolver.resolve(&event), ConflictResolution::Accept);
416 resolver.accept(&event);
417 assert_eq!(resolver.seen_count(), 1);
418 }
419
420 #[test]
421 fn test_crdt_resolver_skip_duplicate() {
422 let resolver = CrdtResolver::new();
423 let event = make_event("evt-1", "us-east", 100, 0, 1);
424
425 assert_eq!(
426 resolver.resolve_and_accept(&event),
427 ConflictResolution::Accept
428 );
429 assert_eq!(resolver.resolve(&event), ConflictResolution::Skip);
430 }
431
432 #[test]
433 fn test_crdt_resolver_skip_old_version() {
434 let resolver = CrdtResolver::new();
435 let new_event = make_event("evt-2", "us-east", 200, 0, 1);
436 let old_event = make_event("evt-1", "us-east", 100, 0, 1);
437
438 resolver.resolve_and_accept(&new_event);
439 assert_eq!(resolver.resolve(&old_event), ConflictResolution::Skip);
441 }
442
443 #[test]
444 fn test_crdt_resolver_different_regions_independent() {
445 let resolver = CrdtResolver::new();
446 let us_event = make_event("evt-1", "us-east", 100, 0, 1);
447 let eu_event = make_event("evt-2", "eu-west", 50, 0, 2);
448
449 resolver.resolve_and_accept(&us_event);
450 assert_eq!(resolver.resolve(&eu_event), ConflictResolution::Accept);
452 }
453
454 #[test]
455 fn test_deterministic_order() {
456 let mut events = vec![
457 make_event("evt-3", "ap-east", 100, 0, 3),
458 make_event("evt-1", "us-east", 100, 0, 1),
459 make_event("evt-2", "eu-west", 100, 0, 2),
460 ];
461
462 deterministic_order(&mut events);
463
464 assert_eq!(events[0].event_id, "evt-1"); assert_eq!(events[1].event_id, "evt-2"); assert_eq!(events[2].event_id, "evt-3"); }
469
470 #[test]
471 fn test_deterministic_order_by_hlc() {
472 let mut events = vec![
473 make_event("evt-1", "us-east", 300, 0, 1),
474 make_event("evt-2", "eu-west", 100, 0, 2),
475 make_event("evt-3", "ap-east", 200, 0, 3),
476 ];
477
478 deterministic_order(&mut events);
479
480 assert_eq!(events[0].event_id, "evt-2"); assert_eq!(events[1].event_id, "evt-3"); assert_eq!(events[2].event_id, "evt-1"); }
484
485 #[test]
486 fn test_replicated_event_serialization() {
487 let event = make_event("evt-1", "us-east", 1000, 5, 1);
488 let json = serde_json::to_string(&event).unwrap();
489 let parsed: ReplicatedEvent = serde_json::from_str(&json).unwrap();
490 assert_eq!(parsed.event_id, "evt-1");
491 assert_eq!(parsed.origin_region, "us-east");
492 assert_eq!(parsed.hlc_timestamp.physical_ms, 1000);
493 }
494}