1use super::hlc::HlcTimestamp;
32use dashmap::DashMap;
33use serde::{Deserialize, Serialize};
34use std::collections::BTreeMap;
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct VersionVector {
41 entries: BTreeMap<String, HlcTimestamp>,
43}
44
45impl VersionVector {
46 pub fn new() -> Self {
48 Self {
49 entries: BTreeMap::new(),
50 }
51 }
52
53 pub fn advance(&mut self, region_id: &str, ts: HlcTimestamp) -> bool {
56 match self.entries.get(region_id) {
57 Some(existing) if ts <= *existing => false,
58 _ => {
59 self.entries.insert(region_id.to_string(), ts);
60 true
61 }
62 }
63 }
64
65 pub fn is_new(&self, region_id: &str, ts: &HlcTimestamp) -> bool {
67 match self.entries.get(region_id) {
68 Some(existing) => ts > existing,
69 None => true,
70 }
71 }
72
73 pub fn get(&self, region_id: &str) -> Option<&HlcTimestamp> {
75 self.entries.get(region_id)
76 }
77
78 pub fn merge(&mut self, other: &VersionVector) {
80 for (region, ts) in &other.entries {
81 self.advance(region, *ts);
82 }
83 }
84
85 pub fn entries(&self) -> &BTreeMap<String, HlcTimestamp> {
87 &self.entries
88 }
89
90 pub fn len(&self) -> usize {
92 self.entries.len()
93 }
94
95 pub fn is_empty(&self) -> bool {
97 self.entries.is_empty()
98 }
99}
100
101impl Default for VersionVector {
102 fn default() -> Self {
103 Self::new()
104 }
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct ReplicatedEvent {
110 pub event_id: String,
112 pub hlc_timestamp: HlcTimestamp,
114 pub origin_region: String,
116 pub event_data: serde_json::Value,
118}
119
120#[derive(Debug, Clone, PartialEq, Eq)]
122pub enum ConflictResolution {
123 Accept,
125 Skip,
127}
128
129#[derive(Debug, Clone, PartialEq, Eq)]
134pub enum MergeStrategy {
135 AppendOnly,
137 LastWriteWins,
139 FirstWriteWins,
141}
142
143pub struct CrdtResolver {
153 version_vectors: DashMap<String, VersionVector>,
155 seen_events: DashMap<String, ()>,
157 strategies: Vec<(String, MergeStrategy)>,
160 entity_type_winners: DashMap<String, HlcTimestamp>,
162}
163
164impl CrdtResolver {
165 pub fn new() -> Self {
167 Self {
168 version_vectors: DashMap::new(),
169 seen_events: DashMap::new(),
170 strategies: Vec::new(),
171 entity_type_winners: DashMap::new(),
172 }
173 }
174
175 pub fn with_strategies(strategies: Vec<(String, MergeStrategy)>) -> Self {
181 let mut sorted = strategies;
182 sorted.sort_by(|a, b| b.0.len().cmp(&a.0.len()));
184 Self {
185 version_vectors: DashMap::new(),
186 seen_events: DashMap::new(),
187 strategies: sorted,
188 entity_type_winners: DashMap::new(),
189 }
190 }
191
192 fn strategy_for(&self, event_type: &str) -> &MergeStrategy {
194 for (prefix, strategy) in &self.strategies {
195 if event_type.starts_with(prefix.as_str()) {
196 return strategy;
197 }
198 }
199 &MergeStrategy::AppendOnly
201 }
202
203 pub fn resolve(&self, event: &ReplicatedEvent) -> ConflictResolution {
208 if self.seen_events.contains_key(&event.event_id) {
210 return ConflictResolution::Skip;
211 }
212
213 let is_new = self
215 .version_vectors
216 .get(&event.origin_region)
217 .map(|vv| vv.is_new(&event.origin_region, &event.hlc_timestamp))
218 .unwrap_or(true);
219
220 if !is_new {
221 return ConflictResolution::Skip;
222 }
223
224 let event_type = event
226 .event_data
227 .get("event_type")
228 .and_then(|v| v.as_str())
229 .unwrap_or("");
230 let entity_id = event
231 .event_data
232 .get("entity_id")
233 .and_then(|v| v.as_str())
234 .unwrap_or("");
235
236 match self.strategy_for(event_type) {
237 MergeStrategy::AppendOnly => ConflictResolution::Accept,
238 MergeStrategy::LastWriteWins => {
239 let key = format!("{}\x00{}", entity_id, event_type);
240 match self.entity_type_winners.get(&key) {
241 Some(existing) if event.hlc_timestamp <= *existing => ConflictResolution::Skip,
242 _ => ConflictResolution::Accept,
243 }
244 }
245 MergeStrategy::FirstWriteWins => {
246 let key = format!("{}\x00{}", entity_id, event_type);
247 if self.entity_type_winners.contains_key(&key) {
248 ConflictResolution::Skip
249 } else {
250 ConflictResolution::Accept
251 }
252 }
253 }
254 }
255
256 pub fn accept(&self, event: &ReplicatedEvent) {
258 self.seen_events.insert(event.event_id.clone(), ());
259
260 let mut vv = self
261 .version_vectors
262 .entry(event.origin_region.clone())
263 .or_default();
264 vv.advance(&event.origin_region, event.hlc_timestamp);
265
266 let event_type = event
268 .event_data
269 .get("event_type")
270 .and_then(|v| v.as_str())
271 .unwrap_or("");
272 let entity_id = event
273 .event_data
274 .get("entity_id")
275 .and_then(|v| v.as_str())
276 .unwrap_or("");
277 if !event_type.is_empty() && !entity_id.is_empty() {
278 let key = format!("{}\x00{}", entity_id, event_type);
279 self.entity_type_winners
280 .entry(key)
281 .and_modify(|existing| {
282 if event.hlc_timestamp > *existing {
283 *existing = event.hlc_timestamp;
284 }
285 })
286 .or_insert(event.hlc_timestamp);
287 }
288 }
289
290 pub fn resolve_and_accept(&self, event: &ReplicatedEvent) -> ConflictResolution {
292 let resolution = self.resolve(event);
293 if resolution == ConflictResolution::Accept {
294 self.accept(event);
295 }
296 resolution
297 }
298
299 pub fn version_vector_for(&self, region_id: &str) -> Option<VersionVector> {
301 self.version_vectors.get(region_id).map(|vv| vv.clone())
302 }
303
304 pub fn all_version_vectors(&self) -> BTreeMap<String, VersionVector> {
306 self.version_vectors
307 .iter()
308 .map(|entry| (entry.key().clone(), entry.value().clone()))
309 .collect()
310 }
311
312 pub fn merge_version_vector(&self, region_id: &str, remote_vv: &VersionVector) {
314 let mut vv = self
315 .version_vectors
316 .entry(region_id.to_string())
317 .or_default();
318 vv.merge(remote_vv);
319 }
320
321 pub fn seen_count(&self) -> usize {
323 self.seen_events.len()
324 }
325}
326
327impl Default for CrdtResolver {
328 fn default() -> Self {
329 Self::new()
330 }
331}
332
333pub fn deterministic_order(events: &mut [ReplicatedEvent]) {
337 events.sort_by(|a, b| {
338 a.hlc_timestamp
339 .cmp(&b.hlc_timestamp)
340 .then_with(|| a.event_id.cmp(&b.event_id))
341 });
342}
343
344#[cfg(test)]
345mod tests {
346 use super::*;
347
348 fn make_event(
349 id: &str,
350 region: &str,
351 physical_ms: u64,
352 logical: u32,
353 node_id: u32,
354 ) -> ReplicatedEvent {
355 ReplicatedEvent {
356 event_id: id.to_string(),
357 hlc_timestamp: HlcTimestamp::new(physical_ms, logical, node_id),
358 origin_region: region.to_string(),
359 event_data: serde_json::json!({"type": "test"}),
360 }
361 }
362
363 #[test]
364 fn test_version_vector_advance() {
365 let mut vv = VersionVector::new();
366 let ts1 = HlcTimestamp::new(100, 0, 1);
367 let ts2 = HlcTimestamp::new(200, 0, 1);
368 let ts_old = HlcTimestamp::new(50, 0, 1);
369
370 assert!(vv.advance("us-east", ts1));
371 assert!(vv.advance("us-east", ts2)); assert!(!vv.advance("us-east", ts_old)); assert_eq!(vv.get("us-east").unwrap().physical_ms, 200);
375 }
376
377 #[test]
378 fn test_version_vector_is_new() {
379 let mut vv = VersionVector::new();
380 let ts = HlcTimestamp::new(100, 0, 1);
381 vv.advance("us-east", ts);
382
383 assert!(!vv.is_new("us-east", &HlcTimestamp::new(50, 0, 1)));
384 assert!(!vv.is_new("us-east", &HlcTimestamp::new(100, 0, 1)));
385 assert!(vv.is_new("us-east", &HlcTimestamp::new(101, 0, 1)));
386 assert!(vv.is_new("eu-west", &HlcTimestamp::new(1, 0, 1))); }
388
389 #[test]
390 fn test_version_vector_merge() {
391 let mut vv1 = VersionVector::new();
392 vv1.advance("us-east", HlcTimestamp::new(100, 0, 1));
393 vv1.advance("eu-west", HlcTimestamp::new(50, 0, 2));
394
395 let mut vv2 = VersionVector::new();
396 vv2.advance("us-east", HlcTimestamp::new(80, 0, 1));
397 vv2.advance("eu-west", HlcTimestamp::new(120, 0, 2));
398 vv2.advance("ap-east", HlcTimestamp::new(90, 0, 3));
399
400 vv1.merge(&vv2);
401
402 assert_eq!(vv1.get("us-east").unwrap().physical_ms, 100); assert_eq!(vv1.get("eu-west").unwrap().physical_ms, 120); assert_eq!(vv1.get("ap-east").unwrap().physical_ms, 90); assert_eq!(vv1.len(), 3);
406 }
407
408 #[test]
409 fn test_crdt_resolver_accept_new_event() {
410 let resolver = CrdtResolver::new();
411 let event = make_event("evt-1", "us-east", 100, 0, 1);
412
413 assert_eq!(resolver.resolve(&event), ConflictResolution::Accept);
414 resolver.accept(&event);
415 assert_eq!(resolver.seen_count(), 1);
416 }
417
418 #[test]
419 fn test_crdt_resolver_skip_duplicate() {
420 let resolver = CrdtResolver::new();
421 let event = make_event("evt-1", "us-east", 100, 0, 1);
422
423 assert_eq!(
424 resolver.resolve_and_accept(&event),
425 ConflictResolution::Accept
426 );
427 assert_eq!(resolver.resolve(&event), ConflictResolution::Skip);
428 }
429
430 #[test]
431 fn test_crdt_resolver_skip_old_version() {
432 let resolver = CrdtResolver::new();
433 let new_event = make_event("evt-2", "us-east", 200, 0, 1);
434 let old_event = make_event("evt-1", "us-east", 100, 0, 1);
435
436 resolver.resolve_and_accept(&new_event);
437 assert_eq!(resolver.resolve(&old_event), ConflictResolution::Skip);
439 }
440
441 #[test]
442 fn test_crdt_resolver_different_regions_independent() {
443 let resolver = CrdtResolver::new();
444 let us_event = make_event("evt-1", "us-east", 100, 0, 1);
445 let eu_event = make_event("evt-2", "eu-west", 50, 0, 2);
446
447 resolver.resolve_and_accept(&us_event);
448 assert_eq!(resolver.resolve(&eu_event), ConflictResolution::Accept);
450 }
451
452 #[test]
453 fn test_deterministic_order() {
454 let mut events = vec![
455 make_event("evt-3", "ap-east", 100, 0, 3),
456 make_event("evt-1", "us-east", 100, 0, 1),
457 make_event("evt-2", "eu-west", 100, 0, 2),
458 ];
459
460 deterministic_order(&mut events);
461
462 assert_eq!(events[0].event_id, "evt-1"); assert_eq!(events[1].event_id, "evt-2"); assert_eq!(events[2].event_id, "evt-3"); }
467
468 #[test]
469 fn test_deterministic_order_by_hlc() {
470 let mut events = vec![
471 make_event("evt-1", "us-east", 300, 0, 1),
472 make_event("evt-2", "eu-west", 100, 0, 2),
473 make_event("evt-3", "ap-east", 200, 0, 3),
474 ];
475
476 deterministic_order(&mut events);
477
478 assert_eq!(events[0].event_id, "evt-2"); assert_eq!(events[1].event_id, "evt-3"); assert_eq!(events[2].event_id, "evt-1"); }
482
483 #[test]
484 fn test_replicated_event_serialization() {
485 let event = make_event("evt-1", "us-east", 1000, 5, 1);
486 let json = serde_json::to_string(&event).unwrap();
487 let parsed: ReplicatedEvent = serde_json::from_str(&json).unwrap();
488 assert_eq!(parsed.event_id, "evt-1");
489 assert_eq!(parsed.origin_region, "us-east");
490 assert_eq!(parsed.hlc_timestamp.physical_ms, 1000);
491 }
492}