1use super::hlc::HlcTimestamp;
32use dashmap::DashMap;
33use serde::{Deserialize, Serialize};
34use std::collections::BTreeMap;
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct VersionVector {
41 entries: BTreeMap<String, HlcTimestamp>,
43}
44
45impl VersionVector {
46 pub fn new() -> Self {
48 Self {
49 entries: BTreeMap::new(),
50 }
51 }
52
53 pub fn advance(&mut self, region_id: &str, ts: HlcTimestamp) -> bool {
56 match self.entries.get(region_id) {
57 Some(existing) if ts <= *existing => false,
58 _ => {
59 self.entries.insert(region_id.to_string(), ts);
60 true
61 }
62 }
63 }
64
65 pub fn is_new(&self, region_id: &str, ts: &HlcTimestamp) -> bool {
67 match self.entries.get(region_id) {
68 Some(existing) => ts > existing,
69 None => true,
70 }
71 }
72
73 pub fn get(&self, region_id: &str) -> Option<&HlcTimestamp> {
75 self.entries.get(region_id)
76 }
77
78 pub fn merge(&mut self, other: &VersionVector) {
80 for (region, ts) in &other.entries {
81 self.advance(region, *ts);
82 }
83 }
84
85 pub fn entries(&self) -> &BTreeMap<String, HlcTimestamp> {
87 &self.entries
88 }
89
90 pub fn len(&self) -> usize {
92 self.entries.len()
93 }
94
95 pub fn is_empty(&self) -> bool {
97 self.entries.is_empty()
98 }
99}
100
101impl Default for VersionVector {
102 fn default() -> Self {
103 Self::new()
104 }
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct ReplicatedEvent {
110 pub event_id: String,
112 pub hlc_timestamp: HlcTimestamp,
114 pub origin_region: String,
116 pub event_data: serde_json::Value,
118}
119
120#[derive(Debug, Clone, PartialEq, Eq)]
122pub enum ConflictResolution {
123 Accept,
125 Skip,
127}
128
129#[derive(Debug, Clone, PartialEq, Eq)]
134pub enum MergeStrategy {
135 AppendOnly,
137 LastWriteWins,
139 FirstWriteWins,
141}
142
143pub struct CrdtResolver {
153 version_vectors: DashMap<String, VersionVector>,
155 seen_events: DashMap<String, ()>,
157 strategies: Vec<(String, MergeStrategy)>,
160 entity_type_winners: DashMap<String, HlcTimestamp>,
162}
163
164impl CrdtResolver {
165 pub fn new() -> Self {
167 Self {
168 version_vectors: DashMap::new(),
169 seen_events: DashMap::new(),
170 strategies: Vec::new(),
171 entity_type_winners: DashMap::new(),
172 }
173 }
174
175 pub fn with_strategies(strategies: Vec<(String, MergeStrategy)>) -> Self {
181 let mut sorted = strategies;
182 sorted.sort_by(|a, b| b.0.len().cmp(&a.0.len()));
184 Self {
185 version_vectors: DashMap::new(),
186 seen_events: DashMap::new(),
187 strategies: sorted,
188 entity_type_winners: DashMap::new(),
189 }
190 }
191
192 fn strategy_for(&self, event_type: &str) -> &MergeStrategy {
194 for (prefix, strategy) in &self.strategies {
195 if event_type.starts_with(prefix.as_str()) {
196 return strategy;
197 }
198 }
199 &MergeStrategy::AppendOnly
201 }
202
203 pub fn resolve(&self, event: &ReplicatedEvent) -> ConflictResolution {
208 if self.seen_events.contains_key(&event.event_id) {
210 return ConflictResolution::Skip;
211 }
212
213 let is_new = self
215 .version_vectors
216 .get(&event.origin_region)
217 .is_none_or(|vv| vv.is_new(&event.origin_region, &event.hlc_timestamp));
218
219 if !is_new {
220 return ConflictResolution::Skip;
221 }
222
223 let event_type = event
225 .event_data
226 .get("event_type")
227 .and_then(|v| v.as_str())
228 .unwrap_or("");
229 let entity_id = event
230 .event_data
231 .get("entity_id")
232 .and_then(|v| v.as_str())
233 .unwrap_or("");
234
235 match self.strategy_for(event_type) {
236 MergeStrategy::AppendOnly => ConflictResolution::Accept,
237 MergeStrategy::LastWriteWins => {
238 let key = format!("{entity_id}\x00{event_type}");
239 match self.entity_type_winners.get(&key) {
240 Some(existing) if event.hlc_timestamp <= *existing => ConflictResolution::Skip,
241 _ => ConflictResolution::Accept,
242 }
243 }
244 MergeStrategy::FirstWriteWins => {
245 let key = format!("{entity_id}\x00{event_type}");
246 if self.entity_type_winners.contains_key(&key) {
247 ConflictResolution::Skip
248 } else {
249 ConflictResolution::Accept
250 }
251 }
252 }
253 }
254
255 pub fn accept(&self, event: &ReplicatedEvent) {
257 self.seen_events.insert(event.event_id.clone(), ());
258
259 let mut vv = self
260 .version_vectors
261 .entry(event.origin_region.clone())
262 .or_default();
263 vv.advance(&event.origin_region, event.hlc_timestamp);
264
265 let event_type = event
267 .event_data
268 .get("event_type")
269 .and_then(|v| v.as_str())
270 .unwrap_or("");
271 let entity_id = event
272 .event_data
273 .get("entity_id")
274 .and_then(|v| v.as_str())
275 .unwrap_or("");
276 if !event_type.is_empty() && !entity_id.is_empty() {
277 let key = format!("{entity_id}\x00{event_type}");
278 self.entity_type_winners
279 .entry(key)
280 .and_modify(|existing| {
281 if event.hlc_timestamp > *existing {
282 *existing = event.hlc_timestamp;
283 }
284 })
285 .or_insert(event.hlc_timestamp);
286 }
287 }
288
289 pub fn resolve_and_accept(&self, event: &ReplicatedEvent) -> ConflictResolution {
291 let resolution = self.resolve(event);
292 if resolution == ConflictResolution::Accept {
293 self.accept(event);
294 }
295 resolution
296 }
297
298 pub fn version_vector_for(&self, region_id: &str) -> Option<VersionVector> {
300 self.version_vectors.get(region_id).map(|vv| vv.clone())
301 }
302
303 pub fn all_version_vectors(&self) -> BTreeMap<String, VersionVector> {
305 self.version_vectors
306 .iter()
307 .map(|entry| (entry.key().clone(), entry.value().clone()))
308 .collect()
309 }
310
311 pub fn merge_version_vector(&self, region_id: &str, remote_vv: &VersionVector) {
313 let mut vv = self
314 .version_vectors
315 .entry(region_id.to_string())
316 .or_default();
317 vv.merge(remote_vv);
318 }
319
320 pub fn seen_count(&self) -> usize {
322 self.seen_events.len()
323 }
324}
325
326impl Default for CrdtResolver {
327 fn default() -> Self {
328 Self::new()
329 }
330}
331
332pub fn deterministic_order(events: &mut [ReplicatedEvent]) {
336 events.sort_by(|a, b| {
337 a.hlc_timestamp
338 .cmp(&b.hlc_timestamp)
339 .then_with(|| a.event_id.cmp(&b.event_id))
340 });
341}
342
343#[cfg(test)]
344mod tests {
345 use super::*;
346
347 fn make_event(
348 id: &str,
349 region: &str,
350 physical_ms: u64,
351 logical: u32,
352 node_id: u32,
353 ) -> ReplicatedEvent {
354 ReplicatedEvent {
355 event_id: id.to_string(),
356 hlc_timestamp: HlcTimestamp::new(physical_ms, logical, node_id),
357 origin_region: region.to_string(),
358 event_data: serde_json::json!({"type": "test"}),
359 }
360 }
361
362 #[test]
363 fn test_version_vector_advance() {
364 let mut vv = VersionVector::new();
365 let ts1 = HlcTimestamp::new(100, 0, 1);
366 let ts2 = HlcTimestamp::new(200, 0, 1);
367 let ts_old = HlcTimestamp::new(50, 0, 1);
368
369 assert!(vv.advance("us-east", ts1));
370 assert!(vv.advance("us-east", ts2)); assert!(!vv.advance("us-east", ts_old)); assert_eq!(vv.get("us-east").unwrap().physical_ms, 200);
374 }
375
376 #[test]
377 fn test_version_vector_is_new() {
378 let mut vv = VersionVector::new();
379 let ts = HlcTimestamp::new(100, 0, 1);
380 vv.advance("us-east", ts);
381
382 assert!(!vv.is_new("us-east", &HlcTimestamp::new(50, 0, 1)));
383 assert!(!vv.is_new("us-east", &HlcTimestamp::new(100, 0, 1)));
384 assert!(vv.is_new("us-east", &HlcTimestamp::new(101, 0, 1)));
385 assert!(vv.is_new("eu-west", &HlcTimestamp::new(1, 0, 1))); }
387
388 #[test]
389 fn test_version_vector_merge() {
390 let mut vv1 = VersionVector::new();
391 vv1.advance("us-east", HlcTimestamp::new(100, 0, 1));
392 vv1.advance("eu-west", HlcTimestamp::new(50, 0, 2));
393
394 let mut vv2 = VersionVector::new();
395 vv2.advance("us-east", HlcTimestamp::new(80, 0, 1));
396 vv2.advance("eu-west", HlcTimestamp::new(120, 0, 2));
397 vv2.advance("ap-east", HlcTimestamp::new(90, 0, 3));
398
399 vv1.merge(&vv2);
400
401 assert_eq!(vv1.get("us-east").unwrap().physical_ms, 100); assert_eq!(vv1.get("eu-west").unwrap().physical_ms, 120); assert_eq!(vv1.get("ap-east").unwrap().physical_ms, 90); assert_eq!(vv1.len(), 3);
405 }
406
407 #[test]
408 fn test_crdt_resolver_accept_new_event() {
409 let resolver = CrdtResolver::new();
410 let event = make_event("evt-1", "us-east", 100, 0, 1);
411
412 assert_eq!(resolver.resolve(&event), ConflictResolution::Accept);
413 resolver.accept(&event);
414 assert_eq!(resolver.seen_count(), 1);
415 }
416
417 #[test]
418 fn test_crdt_resolver_skip_duplicate() {
419 let resolver = CrdtResolver::new();
420 let event = make_event("evt-1", "us-east", 100, 0, 1);
421
422 assert_eq!(
423 resolver.resolve_and_accept(&event),
424 ConflictResolution::Accept
425 );
426 assert_eq!(resolver.resolve(&event), ConflictResolution::Skip);
427 }
428
429 #[test]
430 fn test_crdt_resolver_skip_old_version() {
431 let resolver = CrdtResolver::new();
432 let new_event = make_event("evt-2", "us-east", 200, 0, 1);
433 let old_event = make_event("evt-1", "us-east", 100, 0, 1);
434
435 resolver.resolve_and_accept(&new_event);
436 assert_eq!(resolver.resolve(&old_event), ConflictResolution::Skip);
438 }
439
440 #[test]
441 fn test_crdt_resolver_different_regions_independent() {
442 let resolver = CrdtResolver::new();
443 let us_event = make_event("evt-1", "us-east", 100, 0, 1);
444 let eu_event = make_event("evt-2", "eu-west", 50, 0, 2);
445
446 resolver.resolve_and_accept(&us_event);
447 assert_eq!(resolver.resolve(&eu_event), ConflictResolution::Accept);
449 }
450
451 #[test]
452 fn test_deterministic_order() {
453 let mut events = vec![
454 make_event("evt-3", "ap-east", 100, 0, 3),
455 make_event("evt-1", "us-east", 100, 0, 1),
456 make_event("evt-2", "eu-west", 100, 0, 2),
457 ];
458
459 deterministic_order(&mut events);
460
461 assert_eq!(events[0].event_id, "evt-1"); assert_eq!(events[1].event_id, "evt-2"); assert_eq!(events[2].event_id, "evt-3"); }
466
467 #[test]
468 fn test_deterministic_order_by_hlc() {
469 let mut events = vec![
470 make_event("evt-1", "us-east", 300, 0, 1),
471 make_event("evt-2", "eu-west", 100, 0, 2),
472 make_event("evt-3", "ap-east", 200, 0, 3),
473 ];
474
475 deterministic_order(&mut events);
476
477 assert_eq!(events[0].event_id, "evt-2"); assert_eq!(events[1].event_id, "evt-3"); assert_eq!(events[2].event_id, "evt-1"); }
481
482 #[test]
483 fn test_replicated_event_serialization() {
484 let event = make_event("evt-1", "us-east", 1000, 5, 1);
485 let json = serde_json::to_string(&event).unwrap();
486 let parsed: ReplicatedEvent = serde_json::from_str(&json).unwrap();
487 assert_eq!(parsed.event_id, "evt-1");
488 assert_eq!(parsed.origin_region, "us-east");
489 assert_eq!(parsed.hlc_timestamp.physical_ms, 1000);
490 }
491}