1use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::fmt;
6use std::hash::Hash;
7use uuid::Uuid;
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
11pub struct ReplicaId(#[serde(with = "uuid_serde")] pub Uuid);
12
13mod uuid_serde {
14 use super::*;
15 use serde::{Deserializer, Serializer};
16
17 pub fn serialize<S>(uuid: &Uuid, serializer: S) -> Result<S::Ok, S::Error>
18 where
19 S: Serializer,
20 {
21 uuid.to_string().serialize(serializer)
22 }
23
24 pub fn deserialize<'de, D>(deserializer: D) -> Result<Uuid, D::Error>
25 where
26 D: Deserializer<'de>,
27 {
28 let s = String::deserialize(deserializer)?;
29 Uuid::parse_str(&s).map_err(serde::de::Error::custom)
30 }
31}
32
33impl Default for ReplicaId {
34 fn default() -> Self {
35 Self(Uuid::new_v4())
36 }
37}
38
39impl fmt::Display for ReplicaId {
40 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41 write!(f, "{}", self.0)
42 }
43}
44
45impl From<Uuid> for ReplicaId {
46 fn from(uuid: Uuid) -> Self {
47 Self(uuid)
48 }
49}
50
51impl From<ReplicaId> for Uuid {
52 fn from(replica_id: ReplicaId) -> Self {
53 replica_id.0
54 }
55}
56
57pub trait Mergeable: Clone + Send + Sync {
59 type Error: std::error::Error + Send + Sync + 'static;
60
61 fn merge(&mut self, other: &Self) -> Result<(), Self::Error>;
63
64 fn has_conflict(&self, other: &Self) -> bool;
66}
67
68#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
70pub struct LwwRegister<T> {
71 value: T,
72 timestamp: chrono::DateTime<chrono::Utc>,
73 replica_id: ReplicaId,
74}
75
76impl<T: Default> Default for LwwRegister<T> {
77 fn default() -> Self {
78 Self {
79 value: T::default(),
80 timestamp: chrono::Utc::now(),
81 replica_id: ReplicaId::default(),
82 }
83 }
84}
85
86impl<T> LwwRegister<T> {
87 pub fn new(value: T, replica_id: ReplicaId) -> Self {
88 Self {
89 value,
90 timestamp: chrono::Utc::now(),
91 replica_id,
92 }
93 }
94
95 pub fn value(&self) -> &T {
96 &self.value
97 }
98
99 pub fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
100 self.timestamp
101 }
102
103 pub fn replica_id(&self) -> ReplicaId {
104 self.replica_id
105 }
106
107 pub fn update(&mut self, value: T, replica_id: ReplicaId) {
108 self.value = value;
109 self.timestamp = chrono::Utc::now();
110 self.replica_id = replica_id;
111 }
112
113 pub fn with_timestamp(mut self, timestamp: chrono::DateTime<chrono::Utc>) -> Self {
114 self.timestamp = timestamp;
115 self
116 }
117}
118
119impl<T: Clone + PartialEq + Send + Sync> Mergeable for LwwRegister<T> {
120 type Error = std::io::Error;
121
122 fn merge(&mut self, other: &Self) -> Result<(), Self::Error> {
123 if other.timestamp > self.timestamp ||
124 (other.timestamp == self.timestamp && other.replica_id.0 > self.replica_id.0) {
125 self.value = other.value.clone();
126 self.timestamp = other.timestamp;
127 self.replica_id = other.replica_id;
128 }
129 Ok(())
130 }
131
132 fn has_conflict(&self, other: &Self) -> bool {
133 self.timestamp == other.timestamp && self.replica_id != other.replica_id
134 }
135}
136
137#[derive(Debug, Clone)]
139pub struct LwwMap<K, V> {
140 data: HashMap<K, LwwRegister<V>>,
141}
142
143impl<K, V> LwwMap<K, V>
144where
145 K: Clone + Eq + Hash + Send + Sync,
146 V: Clone + PartialEq + Send + Sync,
147{
148 pub fn new() -> Self {
149 Self {
150 data: HashMap::new(),
151 }
152 }
153
154 pub fn insert(&mut self, key: K, value: V, replica_id: ReplicaId) {
155 let register = LwwRegister::new(value, replica_id);
156 self.data.insert(key, register);
157 }
158
159 pub fn get(&self, key: &K) -> Option<&V> {
160 self.data.get(key).map(|register| register.value())
161 }
162
163 pub fn get_register(&self, key: &K) -> Option<&LwwRegister<V>> {
164 self.data.get(key)
165 }
166
167 pub fn remove(&mut self, key: &K) -> Option<V> {
168 self.data.remove(key).map(|register| register.value().clone())
169 }
170
171 pub fn contains_key(&self, key: &K) -> bool {
172 self.data.contains_key(key)
173 }
174
175 pub fn len(&self) -> usize {
176 self.data.len()
177 }
178
179 pub fn is_empty(&self) -> bool {
180 self.data.is_empty()
181 }
182
183 pub fn keys(&self) -> impl Iterator<Item = &K> {
184 self.data.keys()
185 }
186
187 pub fn values(&self) -> impl Iterator<Item = &V> {
188 self.data.values().map(|register| register.value())
189 }
190
191 pub fn iter(&self) -> impl Iterator<Item = (&K, &V)> {
192 self.data.iter().map(|(k, v)| (k, v.value()))
193 }
194}
195
196impl<K, V> Default for LwwMap<K, V>
197where
198 K: Clone + Eq + Hash + Send + Sync,
199 V: Clone + PartialEq + Send + Sync,
200{
201 fn default() -> Self {
202 Self::new()
203 }
204}
205
206impl<K, V> Mergeable for LwwMap<K, V>
207where
208 K: Clone + Eq + Hash + Send + Sync,
209 V: Clone + PartialEq + Send + Sync,
210{
211 type Error = std::io::Error;
212
213 fn merge(&mut self, other: &Self) -> Result<(), Self::Error> {
214 for (key, other_register) in &other.data {
215 match self.data.get_mut(key) {
216 Some(existing_register) => {
217 existing_register.merge(other_register)?;
218 }
219 None => {
220 self.data.insert(key.clone(), other_register.clone());
221 }
222 }
223 }
224 Ok(())
225 }
226
227 fn has_conflict(&self, other: &Self) -> bool {
228 for (key, other_register) in &other.data {
229 if let Some(existing_register) = self.data.get(key) {
230 if existing_register.has_conflict(other_register) {
231 return true;
232 }
233 }
234 }
235 false
236 }
237}
238
239#[derive(Debug, Clone, Serialize, Deserialize)]
241pub struct GCounter {
242 increments: HashMap<ReplicaId, u64>,
243}
244
245impl GCounter {
246 pub fn new() -> Self {
247 Self {
248 increments: HashMap::new(),
249 }
250 }
251
252 pub fn increment(&mut self, replica_id: ReplicaId) {
253 *self.increments.entry(replica_id).or_insert(0) += 1;
254 }
255
256 pub fn value(&self) -> u64 {
257 self.increments.values().sum()
258 }
259
260 pub fn replica_value(&self, replica_id: ReplicaId) -> u64 {
261 self.increments.get(&replica_id).copied().unwrap_or(0)
262 }
263}
264
265impl Default for GCounter {
266 fn default() -> Self {
267 Self::new()
268 }
269}
270
271impl Mergeable for GCounter {
272 type Error = std::io::Error;
273
274 fn merge(&mut self, other: &Self) -> Result<(), Self::Error> {
275 for (replica_id, increment) in &other.increments {
276 let current = self.increments.entry(*replica_id).or_insert(0);
277 *current = (*current).max(*increment);
278 }
279 Ok(())
280 }
281
282 fn has_conflict(&self, _other: &Self) -> bool {
283 false
285 }
286}
287
288#[cfg(test)]
289mod tests {
290 use super::*;
291
292 #[test]
293 fn test_replica_id_serialization() {
294 let replica_id = ReplicaId::default();
295 let serialized = serde_json::to_string(&replica_id).unwrap();
296 let deserialized: ReplicaId = serde_json::from_str(&serialized).unwrap();
297 assert_eq!(replica_id, deserialized);
298 }
299
300 #[test]
301 fn test_lww_register_merge() {
302 let mut reg1 = LwwRegister::new("value1", ReplicaId::default());
303 let reg2 = LwwRegister::new("value2", ReplicaId::default());
304
305 std::thread::sleep(std::time::Duration::from_millis(1));
307
308 reg1.merge(®2).unwrap();
309 assert_eq!(reg1.value(), &"value2");
310 }
311
312 #[test]
313 fn test_lww_map_operations() {
314 let mut map = LwwMap::new();
315 let replica_id = ReplicaId::default();
316
317 map.insert("key1".to_string(), "value1".to_string(), replica_id);
318 assert_eq!(map.get(&"key1".to_string()), Some(&"value1".to_string()));
319 assert_eq!(map.len(), 1);
320
321 map.remove(&"key1".to_string());
322 assert_eq!(map.len(), 0);
323 }
324
325 #[test]
326 fn test_gcounter_operations() {
327 let mut counter = GCounter::new();
328 let replica_id = ReplicaId::default();
329
330 counter.increment(replica_id);
331 counter.increment(replica_id);
332
333 assert_eq!(counter.value(), 2);
334 assert_eq!(counter.replica_value(replica_id), 2);
335 }
336}
337
338#[cfg(test)]
339mod property_tests {
340 use super::*;
341 use proptest::prelude::*;
342
343 fn arb_replica_id() -> impl Strategy<Value = ReplicaId> {
345 any::<[u8; 16]>().prop_map(|bytes| {
346 let uuid = uuid::Uuid::from_bytes(bytes);
347 ReplicaId::from(uuid)
348 })
349 }
350
351 fn arb_timestamp() -> impl Strategy<Value = chrono::DateTime<chrono::Utc>> {
353 (0i64..1_000_000_000_000i64).prop_map(|seconds| {
354 chrono::DateTime::from_timestamp(seconds, 0).unwrap_or_else(|| chrono::Utc::now())
355 })
356 }
357
358 fn arb_lww_register() -> impl Strategy<Value = LwwRegister<String>> {
360 (any::<String>(), arb_replica_id(), arb_timestamp()).prop_map(|(value, replica_id, timestamp)| {
361 LwwRegister {
362 value,
363 timestamp,
364 replica_id,
365 }
366 })
367 }
368
369 fn arb_lww_map() -> impl Strategy<Value = LwwMap<String, String>> {
371 prop::collection::hash_map(
372 any::<String>(),
373 arb_lww_register(),
374 0..10
375 ).prop_map(|data| {
376 LwwMap { data }
377 })
378 }
379
380 fn arb_gcounter() -> impl Strategy<Value = GCounter> {
382 prop::collection::hash_map(
383 arb_replica_id(),
384 0u64..100,
385 0..5
386 ).prop_map(|increments| {
387 GCounter { increments }
388 })
389 }
390
391 proptest! {
392 #[test]
393 fn test_lww_register_commutativity(
394 reg1 in arb_lww_register(),
395 reg2 in arb_lww_register(),
396 ) {
397 let mut merged1 = reg1.clone();
398 let mut merged2 = reg2.clone();
399
400 merged1.merge(®2).unwrap();
402 merged2.merge(®1).unwrap();
403
404 prop_assert_eq!(merged1.value, merged2.value);
406 }
407
408 #[test]
409 fn test_lww_register_associativity(
410 reg1 in arb_lww_register(),
411 reg2 in arb_lww_register(),
412 reg3 in arb_lww_register(),
413 ) {
414 let mut merged_left = reg1.clone();
415 let mut temp = reg2.clone();
416 temp.merge(®3).unwrap();
417 merged_left.merge(&temp).unwrap();
418
419 let mut merged_right = reg1.clone();
420 merged_right.merge(®2).unwrap();
421 merged_right.merge(®3).unwrap();
422
423 prop_assert_eq!(merged_left.value, merged_right.value);
425 }
426
427 #[test]
428 fn test_lww_register_idempotency(
429 reg in arb_lww_register(),
430 ) {
431 let mut merged = reg.clone();
432 merged.merge(®).unwrap();
433
434 prop_assert_eq!(merged.value, reg.value);
436 prop_assert_eq!(merged.timestamp, reg.timestamp);
437 prop_assert_eq!(merged.replica_id, reg.replica_id);
438 }
439
440 #[test]
441 fn test_lww_register_convergence(
442 regs in prop::collection::vec(arb_lww_register(), 2..10),
443 ) {
444 let mut final_state = regs[0].clone();
446
447 for reg in ®s[1..] {
448 final_state.merge(reg).unwrap();
449 }
450
451 let mut reverse_final_state = regs[regs.len() - 1].clone();
453 for reg in regs.iter().rev().skip(1) {
454 reverse_final_state.merge(reg).unwrap();
455 }
456
457 prop_assert_eq!(final_state.value, reverse_final_state.value);
459 }
460
461 #[test]
462 fn test_lww_register_timestamp_ordering(
463 value in any::<String>(),
464 replica_id in arb_replica_id(),
465 timestamp1 in arb_timestamp(),
466 timestamp2 in arb_timestamp(),
467 ) {
468 let mut reg1 = LwwRegister {
469 value: value.clone(),
470 timestamp: timestamp1,
471 replica_id,
472 };
473
474 let reg2 = LwwRegister {
475 value: "different_value".to_string(),
476 timestamp: timestamp2,
477 replica_id,
478 };
479
480 reg1.merge(®2).unwrap();
481
482 if timestamp2 > timestamp1 {
484 prop_assert_eq!(reg1.value, "different_value");
485 } else {
486 prop_assert_eq!(reg1.value, value);
487 }
488 }
489
490 #[test]
491 fn test_lww_register_replica_id_tie_breaking(
492 value1 in any::<String>(),
493 value2 in any::<String>(),
494 replica_id1 in arb_replica_id(),
495 replica_id2 in arb_replica_id(),
496 timestamp in arb_timestamp(),
497 ) {
498 prop_assume!(replica_id1 != replica_id2);
500
501 let mut reg1 = LwwRegister {
502 value: value1.clone(),
503 timestamp,
504 replica_id: replica_id1,
505 };
506
507 let reg2 = LwwRegister {
508 value: value2.clone(),
509 timestamp,
510 replica_id: replica_id2,
511 };
512
513 reg1.merge(®2).unwrap();
514
515 let winner = if replica_id2.0 > replica_id1.0 { value2 } else { value1 };
517 prop_assert_eq!(reg1.value, winner);
518 }
519 }
520
521 proptest! {
522 #[test]
523 fn test_lww_map_commutativity(
524 map1 in arb_lww_map(),
525 map2 in arb_lww_map(),
526 ) {
527 let mut merged1 = map1.clone();
528 let mut merged2 = map2.clone();
529
530 merged1.merge(&map2).unwrap();
531 merged2.merge(&map1).unwrap();
532
533 prop_assert_eq!(merged1.data.len(), merged2.data.len());
535
536 for key in merged1.data.keys() {
537 prop_assert!(merged2.data.contains_key(key));
538 let reg1 = &merged1.data[key];
539 let reg2 = &merged2.data[key];
540 prop_assert_eq!(®1.value, ®2.value);
541 }
542 }
543
544 #[test]
545 fn test_lww_map_associativity(
546 map1 in arb_lww_map(),
547 map2 in arb_lww_map(),
548 map3 in arb_lww_map(),
549 ) {
550 let mut merged_left = map1.clone();
551 let mut temp = map2.clone();
552 temp.merge(&map3).unwrap();
553 merged_left.merge(&temp).unwrap();
554
555 let mut merged_right = map1.clone();
556 merged_right.merge(&map2).unwrap();
557 merged_right.merge(&map3).unwrap();
558
559 prop_assert_eq!(merged_left.data.len(), merged_right.data.len());
561
562 for key in merged_left.data.keys() {
563 prop_assert!(merged_right.data.contains_key(key));
564 let reg1 = &merged_left.data[key];
565 let reg2 = &merged_right.data[key];
566 prop_assert_eq!(®1.value, ®2.value);
567 }
568 }
569
570 #[test]
571 fn test_lww_map_idempotency(
572 map in arb_lww_map(),
573 ) {
574 let mut merged = map.clone();
575 merged.merge(&map).unwrap();
576
577 prop_assert_eq!(merged.data.len(), map.data.len());
579
580 for (key, original_register) in &map.data {
581 let merged_register = &merged.data[key];
582 prop_assert_eq!(&merged_register.value, &original_register.value);
583 prop_assert_eq!(merged_register.timestamp, original_register.timestamp);
584 prop_assert_eq!(merged_register.replica_id, original_register.replica_id);
585 }
586 }
587
588 #[test]
589 fn test_lww_map_convergence(
590 maps in prop::collection::vec(arb_lww_map(), 2..5),
591 ) {
592 let mut final_state = maps[0].clone();
594
595 for map in &maps[1..] {
596 final_state.merge(map).unwrap();
597 }
598
599 let mut reverse_final_state = maps[maps.len() - 1].clone();
601 for map in maps.iter().rev().skip(1) {
602 reverse_final_state.merge(map).unwrap();
603 }
604
605 prop_assert_eq!(final_state.data.len(), reverse_final_state.data.len());
607
608 for key in final_state.data.keys() {
609 prop_assert!(reverse_final_state.data.contains_key(key));
610 let reg1 = &final_state.data[key];
611 let reg2 = &reverse_final_state.data[key];
612 prop_assert_eq!(®1.value, ®2.value);
613 }
614 }
615 }
616
617 proptest! {
618 #[test]
619 fn test_gcounter_commutativity(
620 counter1 in arb_gcounter(),
621 counter2 in arb_gcounter(),
622 ) {
623 let mut merged1 = counter1.clone();
624 let mut merged2 = counter2.clone();
625
626 merged1.merge(&counter2).unwrap();
627 merged2.merge(&counter1).unwrap();
628
629 let value1 = merged1.value();
631 let value2 = merged2.value();
632 prop_assert_eq!(merged1.increments, merged2.increments);
633 prop_assert_eq!(value1, value2);
634 }
635
636 #[test]
637 fn test_gcounter_associativity(
638 counter1 in arb_gcounter(),
639 counter2 in arb_gcounter(),
640 counter3 in arb_gcounter(),
641 ) {
642 let mut merged_left = counter1.clone();
643 let mut temp = counter2.clone();
644 temp.merge(&counter3).unwrap();
645 merged_left.merge(&temp).unwrap();
646
647 let mut merged_right = counter1.clone();
648 merged_right.merge(&counter2).unwrap();
649 merged_right.merge(&counter3).unwrap();
650
651 let value_left = merged_left.value();
653 let value_right = merged_right.value();
654 prop_assert_eq!(merged_left.increments, merged_right.increments);
655 prop_assert_eq!(value_left, value_right);
656 }
657
658 #[test]
659 fn test_gcounter_idempotency(
660 counter in arb_gcounter(),
661 ) {
662 let mut merged = counter.clone();
663 merged.merge(&counter).unwrap();
664
665 let merged_value = merged.value();
667 let original_value = counter.value();
668 prop_assert_eq!(merged.increments, counter.increments);
669 prop_assert_eq!(merged_value, original_value);
670 }
671
672 #[test]
673 fn test_gcounter_convergence(
674 counters in prop::collection::vec(arb_gcounter(), 2..5),
675 ) {
676 let mut final_state = counters[0].clone();
678
679 for counter in &counters[1..] {
680 final_state.merge(counter).unwrap();
681 }
682
683 let mut reverse_final_state = counters[counters.len() - 1].clone();
685 for counter in counters.iter().rev().skip(1) {
686 reverse_final_state.merge(counter).unwrap();
687 }
688
689 let final_value = final_state.value();
691 let reverse_value = reverse_final_state.value();
692 prop_assert_eq!(final_state.increments, reverse_final_state.increments);
693 prop_assert_eq!(final_value, reverse_value);
694 }
695
696 #[test]
697 fn test_gcounter_monotonicity(
698 replica_id in arb_replica_id(),
699 initial_value in 0u64..50,
700 increment_amount in 1u64..10,
701 ) {
702 let mut counter = GCounter::new();
703 counter.increments.insert(replica_id, initial_value);
704
705 let old_value = counter.value();
706
707 for _ in 0..increment_amount {
709 counter.increment(replica_id);
710 }
711
712 let new_value = counter.value();
713
714 prop_assert!(new_value > old_value);
716 prop_assert_eq!(new_value, old_value + increment_amount);
717 }
718
719 #[test]
720 fn test_gcounter_maximum_merge(
721 replica_id in arb_replica_id(),
722 value1 in 0u64..100,
723 value2 in 0u64..100,
724 ) {
725 let mut counter1 = GCounter::new();
726 counter1.increments.insert(replica_id, value1);
727
728 let mut counter2 = GCounter::new();
729 counter2.increments.insert(replica_id, value2);
730
731 counter1.merge(&counter2).unwrap();
732
733 let expected_value = value1.max(value2);
735 prop_assert_eq!(counter1.replica_value(replica_id), expected_value);
736 }
737 }
738
739 proptest! {
740 #[test]
741 fn test_crdt_serialization_roundtrip(
742 register in arb_lww_register(),
743 ) {
744 let serialized = serde_json::to_string(®ister).unwrap();
746 let deserialized: LwwRegister<String> = serde_json::from_str(&serialized).unwrap();
747
748 prop_assert_eq!(register.value, deserialized.value);
749 prop_assert_eq!(register.timestamp, deserialized.timestamp);
750 prop_assert_eq!(register.replica_id, deserialized.replica_id);
751 }
752
753 #[test]
754 fn test_crdt_merge_preserves_serialization(
755 reg1 in arb_lww_register(),
756 reg2 in arb_lww_register(),
757 ) {
758 let mut merged = reg1.clone();
759 merged.merge(®2).unwrap();
760
761 let serialized = serde_json::to_string(&merged).unwrap();
763 let deserialized: LwwRegister<String> = serde_json::from_str(&serialized).unwrap();
764
765 prop_assert_eq!(merged.value, deserialized.value);
766 prop_assert_eq!(merged.timestamp, deserialized.timestamp);
767 prop_assert_eq!(merged.replica_id, deserialized.replica_id);
768 }
769 }
770}