1use super::hlc::HlcTimestamp;
32use dashmap::DashMap;
33use serde::{Deserialize, Serialize};
34use std::collections::BTreeMap;
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct VersionVector {
41 entries: BTreeMap<String, HlcTimestamp>,
43}
44
45impl VersionVector {
46 pub fn new() -> Self {
48 Self {
49 entries: BTreeMap::new(),
50 }
51 }
52
53 pub fn advance(&mut self, region_id: &str, ts: HlcTimestamp) -> bool {
56 match self.entries.get(region_id) {
57 Some(existing) if ts <= *existing => false,
58 _ => {
59 self.entries.insert(region_id.to_string(), ts);
60 true
61 }
62 }
63 }
64
65 pub fn is_new(&self, region_id: &str, ts: &HlcTimestamp) -> bool {
67 match self.entries.get(region_id) {
68 Some(existing) => ts > existing,
69 None => true,
70 }
71 }
72
73 pub fn get(&self, region_id: &str) -> Option<&HlcTimestamp> {
75 self.entries.get(region_id)
76 }
77
78 pub fn merge(&mut self, other: &VersionVector) {
80 for (region, ts) in &other.entries {
81 self.advance(region, *ts);
82 }
83 }
84
85 pub fn entries(&self) -> &BTreeMap<String, HlcTimestamp> {
87 &self.entries
88 }
89
90 pub fn len(&self) -> usize {
92 self.entries.len()
93 }
94
95 pub fn is_empty(&self) -> bool {
97 self.entries.is_empty()
98 }
99}
100
101impl Default for VersionVector {
102 fn default() -> Self {
103 Self::new()
104 }
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct ReplicatedEvent {
110 pub event_id: String,
112 pub hlc_timestamp: HlcTimestamp,
114 pub origin_region: String,
116 pub event_data: serde_json::Value,
118}
119
120#[derive(Debug, Clone, PartialEq, Eq)]
122pub enum ConflictResolution {
123 Accept,
125 Skip,
127}
128
129pub struct CrdtResolver {
133 version_vectors: DashMap<String, VersionVector>,
135 seen_events: DashMap<String, ()>,
137}
138
139impl CrdtResolver {
140 pub fn new() -> Self {
142 Self {
143 version_vectors: DashMap::new(),
144 seen_events: DashMap::new(),
145 }
146 }
147
148 pub fn resolve(&self, event: &ReplicatedEvent) -> ConflictResolution {
152 if self.seen_events.contains_key(&event.event_id) {
154 return ConflictResolution::Skip;
155 }
156
157 let is_new = self
159 .version_vectors
160 .get(&event.origin_region)
161 .map(|vv| vv.is_new(&event.origin_region, &event.hlc_timestamp))
162 .unwrap_or(true);
163
164 if !is_new {
165 return ConflictResolution::Skip;
166 }
167
168 ConflictResolution::Accept
169 }
170
171 pub fn accept(&self, event: &ReplicatedEvent) {
173 self.seen_events.insert(event.event_id.clone(), ());
174
175 let mut vv = self
176 .version_vectors
177 .entry(event.origin_region.clone())
178 .or_default();
179 vv.advance(&event.origin_region, event.hlc_timestamp);
180 }
181
182 pub fn resolve_and_accept(&self, event: &ReplicatedEvent) -> ConflictResolution {
184 let resolution = self.resolve(event);
185 if resolution == ConflictResolution::Accept {
186 self.accept(event);
187 }
188 resolution
189 }
190
191 pub fn version_vector_for(&self, region_id: &str) -> Option<VersionVector> {
193 self.version_vectors.get(region_id).map(|vv| vv.clone())
194 }
195
196 pub fn all_version_vectors(&self) -> BTreeMap<String, VersionVector> {
198 self.version_vectors
199 .iter()
200 .map(|entry| (entry.key().clone(), entry.value().clone()))
201 .collect()
202 }
203
204 pub fn merge_version_vector(&self, region_id: &str, remote_vv: &VersionVector) {
206 let mut vv = self
207 .version_vectors
208 .entry(region_id.to_string())
209 .or_default();
210 vv.merge(remote_vv);
211 }
212
213 pub fn seen_count(&self) -> usize {
215 self.seen_events.len()
216 }
217}
218
219impl Default for CrdtResolver {
220 fn default() -> Self {
221 Self::new()
222 }
223}
224
225pub fn deterministic_order(events: &mut [ReplicatedEvent]) {
229 events.sort_by(|a, b| {
230 a.hlc_timestamp
231 .cmp(&b.hlc_timestamp)
232 .then_with(|| a.event_id.cmp(&b.event_id))
233 });
234}
235
236#[cfg(test)]
237mod tests {
238 use super::*;
239
240 fn make_event(
241 id: &str,
242 region: &str,
243 physical_ms: u64,
244 logical: u32,
245 node_id: u32,
246 ) -> ReplicatedEvent {
247 ReplicatedEvent {
248 event_id: id.to_string(),
249 hlc_timestamp: HlcTimestamp::new(physical_ms, logical, node_id),
250 origin_region: region.to_string(),
251 event_data: serde_json::json!({"type": "test"}),
252 }
253 }
254
255 #[test]
256 fn test_version_vector_advance() {
257 let mut vv = VersionVector::new();
258 let ts1 = HlcTimestamp::new(100, 0, 1);
259 let ts2 = HlcTimestamp::new(200, 0, 1);
260 let ts_old = HlcTimestamp::new(50, 0, 1);
261
262 assert!(vv.advance("us-east", ts1));
263 assert!(vv.advance("us-east", ts2)); assert!(!vv.advance("us-east", ts_old)); assert_eq!(vv.get("us-east").unwrap().physical_ms, 200);
267 }
268
269 #[test]
270 fn test_version_vector_is_new() {
271 let mut vv = VersionVector::new();
272 let ts = HlcTimestamp::new(100, 0, 1);
273 vv.advance("us-east", ts);
274
275 assert!(!vv.is_new("us-east", &HlcTimestamp::new(50, 0, 1)));
276 assert!(!vv.is_new("us-east", &HlcTimestamp::new(100, 0, 1)));
277 assert!(vv.is_new("us-east", &HlcTimestamp::new(101, 0, 1)));
278 assert!(vv.is_new("eu-west", &HlcTimestamp::new(1, 0, 1))); }
280
281 #[test]
282 fn test_version_vector_merge() {
283 let mut vv1 = VersionVector::new();
284 vv1.advance("us-east", HlcTimestamp::new(100, 0, 1));
285 vv1.advance("eu-west", HlcTimestamp::new(50, 0, 2));
286
287 let mut vv2 = VersionVector::new();
288 vv2.advance("us-east", HlcTimestamp::new(80, 0, 1));
289 vv2.advance("eu-west", HlcTimestamp::new(120, 0, 2));
290 vv2.advance("ap-east", HlcTimestamp::new(90, 0, 3));
291
292 vv1.merge(&vv2);
293
294 assert_eq!(vv1.get("us-east").unwrap().physical_ms, 100); assert_eq!(vv1.get("eu-west").unwrap().physical_ms, 120); assert_eq!(vv1.get("ap-east").unwrap().physical_ms, 90); assert_eq!(vv1.len(), 3);
298 }
299
300 #[test]
301 fn test_crdt_resolver_accept_new_event() {
302 let resolver = CrdtResolver::new();
303 let event = make_event("evt-1", "us-east", 100, 0, 1);
304
305 assert_eq!(resolver.resolve(&event), ConflictResolution::Accept);
306 resolver.accept(&event);
307 assert_eq!(resolver.seen_count(), 1);
308 }
309
310 #[test]
311 fn test_crdt_resolver_skip_duplicate() {
312 let resolver = CrdtResolver::new();
313 let event = make_event("evt-1", "us-east", 100, 0, 1);
314
315 assert_eq!(
316 resolver.resolve_and_accept(&event),
317 ConflictResolution::Accept
318 );
319 assert_eq!(resolver.resolve(&event), ConflictResolution::Skip);
320 }
321
322 #[test]
323 fn test_crdt_resolver_skip_old_version() {
324 let resolver = CrdtResolver::new();
325 let new_event = make_event("evt-2", "us-east", 200, 0, 1);
326 let old_event = make_event("evt-1", "us-east", 100, 0, 1);
327
328 resolver.resolve_and_accept(&new_event);
329 assert_eq!(resolver.resolve(&old_event), ConflictResolution::Skip);
331 }
332
333 #[test]
334 fn test_crdt_resolver_different_regions_independent() {
335 let resolver = CrdtResolver::new();
336 let us_event = make_event("evt-1", "us-east", 100, 0, 1);
337 let eu_event = make_event("evt-2", "eu-west", 50, 0, 2);
338
339 resolver.resolve_and_accept(&us_event);
340 assert_eq!(resolver.resolve(&eu_event), ConflictResolution::Accept);
342 }
343
344 #[test]
345 fn test_deterministic_order() {
346 let mut events = vec![
347 make_event("evt-3", "ap-east", 100, 0, 3),
348 make_event("evt-1", "us-east", 100, 0, 1),
349 make_event("evt-2", "eu-west", 100, 0, 2),
350 ];
351
352 deterministic_order(&mut events);
353
354 assert_eq!(events[0].event_id, "evt-1"); assert_eq!(events[1].event_id, "evt-2"); assert_eq!(events[2].event_id, "evt-3"); }
359
360 #[test]
361 fn test_deterministic_order_by_hlc() {
362 let mut events = vec![
363 make_event("evt-1", "us-east", 300, 0, 1),
364 make_event("evt-2", "eu-west", 100, 0, 2),
365 make_event("evt-3", "ap-east", 200, 0, 3),
366 ];
367
368 deterministic_order(&mut events);
369
370 assert_eq!(events[0].event_id, "evt-2"); assert_eq!(events[1].event_id, "evt-3"); assert_eq!(events[2].event_id, "evt-1"); }
374
375 #[test]
376 fn test_replicated_event_serialization() {
377 let event = make_event("evt-1", "us-east", 1000, 5, 1);
378 let json = serde_json::to_string(&event).unwrap();
379 let parsed: ReplicatedEvent = serde_json::from_str(&json).unwrap();
380 assert_eq!(parsed.event_id, "evt-1");
381 assert_eq!(parsed.origin_region, "us-east");
382 assert_eq!(parsed.hlc_timestamp.physical_ms, 1000);
383 }
384}