1use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::fmt;
6use std::hash::Hash;
7use uuid::Uuid;
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
11pub struct ReplicaId(#[serde(with = "uuid_serde")] pub Uuid);
12
13mod uuid_serde {
14 use super::*;
15 use serde::{Deserializer, Serializer};
16
17 pub fn serialize<S>(uuid: &Uuid, serializer: S) -> Result<S::Ok, S::Error>
18 where
19 S: Serializer,
20 {
21 uuid.to_string().serialize(serializer)
22 }
23
24 pub fn deserialize<'de, D>(deserializer: D) -> Result<Uuid, D::Error>
25 where
26 D: Deserializer<'de>,
27 {
28 let s = String::deserialize(deserializer)?;
29 Uuid::parse_str(&s).map_err(serde::de::Error::custom)
30 }
31}
32
33impl Default for ReplicaId {
34 fn default() -> Self {
35 Self(Uuid::new_v4())
36 }
37}
38
39impl fmt::Display for ReplicaId {
40 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41 write!(f, "{}", self.0)
42 }
43}
44
45impl From<Uuid> for ReplicaId {
46 fn from(uuid: Uuid) -> Self {
47 Self(uuid)
48 }
49}
50
51impl From<ReplicaId> for Uuid {
52 fn from(replica_id: ReplicaId) -> Self {
53 replica_id.0
54 }
55}
56
57pub trait Mergeable: Clone + Send + Sync {
59 type Error: std::error::Error + Send + Sync + 'static;
60
61 fn merge(&mut self, other: &Self) -> Result<(), Self::Error>;
63
64 fn has_conflict(&self, other: &Self) -> bool;
66}
67
68pub trait CRDT {
70 fn replica_id(&self) -> &ReplicaId;
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
75pub struct LwwRegister<T> {
76 value: T,
77 timestamp: chrono::DateTime<chrono::Utc>,
78 replica_id: ReplicaId,
79}
80
81impl<T: Default> Default for LwwRegister<T> {
82 fn default() -> Self {
83 Self {
84 value: T::default(),
85 timestamp: chrono::Utc::now(),
86 replica_id: ReplicaId::default(),
87 }
88 }
89}
90
91impl<T> LwwRegister<T> {
92 pub fn new(value: T, replica_id: ReplicaId) -> Self {
93 Self {
94 value,
95 timestamp: chrono::Utc::now(),
96 replica_id,
97 }
98 }
99
100 pub fn value(&self) -> &T {
101 &self.value
102 }
103
104 pub fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
105 self.timestamp
106 }
107
108 pub fn replica_id(&self) -> ReplicaId {
109 self.replica_id
110 }
111
112 pub fn update(&mut self, value: T, replica_id: ReplicaId) {
113 self.value = value;
114 self.timestamp = chrono::Utc::now();
115 self.replica_id = replica_id;
116 }
117
118 pub fn with_timestamp(mut self, timestamp: chrono::DateTime<chrono::Utc>) -> Self {
119 self.timestamp = timestamp;
120 self
121 }
122}
123
124impl<T: Clone + PartialEq + Send + Sync> Mergeable for LwwRegister<T> {
125 type Error = std::io::Error;
126
127 fn merge(&mut self, other: &Self) -> Result<(), Self::Error> {
128 if other.timestamp > self.timestamp ||
129 (other.timestamp == self.timestamp && other.replica_id.0 > self.replica_id.0) {
130 self.value = other.value.clone();
131 self.timestamp = other.timestamp;
132 self.replica_id = other.replica_id;
133 }
134 Ok(())
135 }
136
137 fn has_conflict(&self, other: &Self) -> bool {
138 self.timestamp == other.timestamp && self.replica_id != other.replica_id
139 }
140}
141
142impl<T> CRDT for LwwRegister<T> {
143 fn replica_id(&self) -> &ReplicaId {
144 &self.replica_id
145 }
146}
147
148#[derive(Debug, Clone)]
150pub struct LwwMap<K, V> {
151 data: HashMap<K, LwwRegister<V>>,
152}
153
154impl<K, V> LwwMap<K, V>
155where
156 K: Clone + Eq + Hash + Send + Sync,
157 V: Clone + PartialEq + Send + Sync,
158{
159 pub fn new() -> Self {
160 Self {
161 data: HashMap::new(),
162 }
163 }
164
165 pub fn insert(&mut self, key: K, value: V, replica_id: ReplicaId) {
166 let register = LwwRegister::new(value, replica_id);
167 self.data.insert(key, register);
168 }
169
170 pub fn get(&self, key: &K) -> Option<&V> {
171 self.data.get(key).map(|register| register.value())
172 }
173
174 pub fn get_register(&self, key: &K) -> Option<&LwwRegister<V>> {
175 self.data.get(key)
176 }
177
178 pub fn remove(&mut self, key: &K) -> Option<V> {
179 self.data.remove(key).map(|register| register.value().clone())
180 }
181
182 pub fn contains_key(&self, key: &K) -> bool {
183 self.data.contains_key(key)
184 }
185
186 pub fn len(&self) -> usize {
187 self.data.len()
188 }
189
190 pub fn is_empty(&self) -> bool {
191 self.data.is_empty()
192 }
193
194 pub fn keys(&self) -> impl Iterator<Item = &K> {
195 self.data.keys()
196 }
197
198 pub fn values(&self) -> impl Iterator<Item = &V> {
199 self.data.values().map(|register| register.value())
200 }
201
202 pub fn iter(&self) -> impl Iterator<Item = (&K, &V)> {
203 self.data.iter().map(|(k, v)| (k, v.value()))
204 }
205}
206
207impl<K, V> Default for LwwMap<K, V>
208where
209 K: Clone + Eq + Hash + Send + Sync,
210 V: Clone + PartialEq + Send + Sync,
211{
212 fn default() -> Self {
213 Self::new()
214 }
215}
216
217impl<K, V> Mergeable for LwwMap<K, V>
218where
219 K: Clone + Eq + Hash + Send + Sync,
220 V: Clone + PartialEq + Send + Sync,
221{
222 type Error = std::io::Error;
223
224 fn merge(&mut self, other: &Self) -> Result<(), Self::Error> {
225 for (key, other_register) in &other.data {
226 match self.data.get_mut(key) {
227 Some(existing_register) => {
228 existing_register.merge(other_register)?;
229 }
230 None => {
231 self.data.insert(key.clone(), other_register.clone());
232 }
233 }
234 }
235 Ok(())
236 }
237
238 fn has_conflict(&self, other: &Self) -> bool {
239 for (key, other_register) in &other.data {
240 if let Some(existing_register) = self.data.get(key) {
241 if existing_register.has_conflict(other_register) {
242 return true;
243 }
244 }
245 }
246 false
247 }
248}
249
250impl<K, V> CRDT for LwwMap<K, V> {
251 fn replica_id(&self) -> &ReplicaId {
252 static DEFAULT_REPLICA: std::sync::LazyLock<ReplicaId> = std::sync::LazyLock::new(|| ReplicaId::from(uuid::Uuid::nil()));
255 &DEFAULT_REPLICA
256 }
257}
258
259#[derive(Debug, Clone, Serialize, Deserialize)]
261pub struct GCounter {
262 increments: HashMap<ReplicaId, u64>,
263}
264
265impl GCounter {
266 pub fn new() -> Self {
267 Self {
268 increments: HashMap::new(),
269 }
270 }
271
272 pub fn increment(&mut self, replica_id: ReplicaId) {
273 *self.increments.entry(replica_id).or_insert(0) += 1;
274 }
275
276 pub fn value(&self) -> u64 {
277 self.increments.values().sum()
278 }
279
280 pub fn replica_value(&self, replica_id: ReplicaId) -> u64 {
281 self.increments.get(&replica_id).copied().unwrap_or(0)
282 }
283}
284
285impl Default for GCounter {
286 fn default() -> Self {
287 Self::new()
288 }
289}
290
291impl Mergeable for GCounter {
292 type Error = std::io::Error;
293
294 fn merge(&mut self, other: &Self) -> Result<(), Self::Error> {
295 for (replica_id, increment) in &other.increments {
296 let current = self.increments.entry(*replica_id).or_insert(0);
297 *current = (*current).max(*increment);
298 }
299 Ok(())
300 }
301
302 fn has_conflict(&self, _other: &Self) -> bool {
303 false
305 }
306}
307
308impl CRDT for GCounter {
309 fn replica_id(&self) -> &ReplicaId {
310 static DEFAULT_REPLICA: std::sync::LazyLock<ReplicaId> = std::sync::LazyLock::new(|| ReplicaId::from(uuid::Uuid::nil()));
313 &DEFAULT_REPLICA
314 }
315}
316
317#[cfg(test)]
318mod tests {
319 use super::*;
320
321 #[test]
322 fn test_replica_id_serialization() {
323 let replica_id = ReplicaId::default();
324 let serialized = serde_json::to_string(&replica_id).unwrap();
325 let deserialized: ReplicaId = serde_json::from_str(&serialized).unwrap();
326 assert_eq!(replica_id, deserialized);
327 }
328
329 #[test]
330 fn test_lww_register_merge() {
331 let mut reg1 = LwwRegister::new("value1", ReplicaId::default());
332 let reg2 = LwwRegister::new("value2", ReplicaId::default());
333
334 std::thread::sleep(std::time::Duration::from_millis(1));
336
337 reg1.merge(®2).unwrap();
338 assert_eq!(reg1.value(), &"value2");
339 }
340
341 #[test]
342 fn test_lww_map_operations() {
343 let mut map = LwwMap::new();
344 let replica_id = ReplicaId::default();
345
346 map.insert("key1".to_string(), "value1".to_string(), replica_id);
347 assert_eq!(map.get(&"key1".to_string()), Some(&"value1".to_string()));
348 assert_eq!(map.len(), 1);
349
350 map.remove(&"key1".to_string());
351 assert_eq!(map.len(), 0);
352 }
353
354 #[test]
355 fn test_gcounter_operations() {
356 let mut counter = GCounter::new();
357 let replica_id = ReplicaId::default();
358
359 counter.increment(replica_id);
360 counter.increment(replica_id);
361
362 assert_eq!(counter.value(), 2);
363 assert_eq!(counter.replica_value(replica_id), 2);
364 }
365}
366
367#[cfg(test)]
368mod property_tests {
369 use super::*;
370 use proptest::prelude::*;
371
372 fn arb_replica_id() -> impl Strategy<Value = ReplicaId> {
374 any::<[u8; 16]>().prop_map(|bytes| {
375 let uuid = uuid::Uuid::from_bytes(bytes);
376 ReplicaId::from(uuid)
377 })
378 }
379
380 fn arb_timestamp() -> impl Strategy<Value = chrono::DateTime<chrono::Utc>> {
382 (0i64..1_000_000_000_000i64).prop_map(|seconds| {
383 chrono::DateTime::from_timestamp(seconds, 0).unwrap_or_else(|| chrono::Utc::now())
384 })
385 }
386
387 fn arb_lww_register() -> impl Strategy<Value = LwwRegister<String>> {
389 (any::<String>(), arb_replica_id(), arb_timestamp()).prop_map(|(value, replica_id, timestamp)| {
390 LwwRegister {
391 value,
392 timestamp,
393 replica_id,
394 }
395 })
396 }
397
398 fn arb_lww_map() -> impl Strategy<Value = LwwMap<String, String>> {
400 prop::collection::hash_map(
401 any::<String>(),
402 arb_lww_register(),
403 0..10
404 ).prop_map(|data| {
405 LwwMap { data }
406 })
407 }
408
409 fn arb_gcounter() -> impl Strategy<Value = GCounter> {
411 prop::collection::hash_map(
412 arb_replica_id(),
413 0u64..100,
414 0..5
415 ).prop_map(|increments| {
416 GCounter { increments }
417 })
418 }
419
420 proptest! {
421 #[test]
422 fn test_lww_register_commutativity(
423 reg1 in arb_lww_register(),
424 reg2 in arb_lww_register(),
425 ) {
426 let mut merged1 = reg1.clone();
427 let mut merged2 = reg2.clone();
428
429 merged1.merge(®2).unwrap();
431 merged2.merge(®1).unwrap();
432
433 prop_assert_eq!(merged1.value, merged2.value);
435 }
436
437 #[test]
438 fn test_lww_register_associativity(
439 reg1 in arb_lww_register(),
440 reg2 in arb_lww_register(),
441 reg3 in arb_lww_register(),
442 ) {
443 let mut merged_left = reg1.clone();
444 let mut temp = reg2.clone();
445 temp.merge(®3).unwrap();
446 merged_left.merge(&temp).unwrap();
447
448 let mut merged_right = reg1.clone();
449 merged_right.merge(®2).unwrap();
450 merged_right.merge(®3).unwrap();
451
452 prop_assert_eq!(merged_left.value, merged_right.value);
454 }
455
456 #[test]
457 fn test_lww_register_idempotency(
458 reg in arb_lww_register(),
459 ) {
460 let mut merged = reg.clone();
461 merged.merge(®).unwrap();
462
463 prop_assert_eq!(merged.value, reg.value);
465 prop_assert_eq!(merged.timestamp, reg.timestamp);
466 prop_assert_eq!(merged.replica_id, reg.replica_id);
467 }
468
469 #[test]
470 fn test_lww_register_convergence(
471 regs in prop::collection::vec(arb_lww_register(), 2..10),
472 ) {
473 let mut final_state = regs[0].clone();
475
476 for reg in ®s[1..] {
477 final_state.merge(reg).unwrap();
478 }
479
480 let mut reverse_final_state = regs[regs.len() - 1].clone();
482 for reg in regs.iter().rev().skip(1) {
483 reverse_final_state.merge(reg).unwrap();
484 }
485
486 prop_assert_eq!(final_state.value, reverse_final_state.value);
488 }
489
490 #[test]
491 fn test_lww_register_timestamp_ordering(
492 value in any::<String>(),
493 replica_id in arb_replica_id(),
494 timestamp1 in arb_timestamp(),
495 timestamp2 in arb_timestamp(),
496 ) {
497 let mut reg1 = LwwRegister {
498 value: value.clone(),
499 timestamp: timestamp1,
500 replica_id,
501 };
502
503 let reg2 = LwwRegister {
504 value: "different_value".to_string(),
505 timestamp: timestamp2,
506 replica_id,
507 };
508
509 reg1.merge(®2).unwrap();
510
511 if timestamp2 > timestamp1 {
513 prop_assert_eq!(reg1.value, "different_value");
514 } else {
515 prop_assert_eq!(reg1.value, value);
516 }
517 }
518
519 #[test]
520 fn test_lww_register_replica_id_tie_breaking(
521 value1 in any::<String>(),
522 value2 in any::<String>(),
523 replica_id1 in arb_replica_id(),
524 replica_id2 in arb_replica_id(),
525 timestamp in arb_timestamp(),
526 ) {
527 prop_assume!(replica_id1 != replica_id2);
529
530 let mut reg1 = LwwRegister {
531 value: value1.clone(),
532 timestamp,
533 replica_id: replica_id1,
534 };
535
536 let reg2 = LwwRegister {
537 value: value2.clone(),
538 timestamp,
539 replica_id: replica_id2,
540 };
541
542 reg1.merge(®2).unwrap();
543
544 let winner = if replica_id2.0 > replica_id1.0 { value2 } else { value1 };
546 prop_assert_eq!(reg1.value, winner);
547 }
548 }
549
550 proptest! {
551 #[test]
552 fn test_lww_map_commutativity(
553 map1 in arb_lww_map(),
554 map2 in arb_lww_map(),
555 ) {
556 let mut merged1 = map1.clone();
557 let mut merged2 = map2.clone();
558
559 merged1.merge(&map2).unwrap();
560 merged2.merge(&map1).unwrap();
561
562 prop_assert_eq!(merged1.data.len(), merged2.data.len());
564
565 for key in merged1.data.keys() {
566 prop_assert!(merged2.data.contains_key(key));
567 let reg1 = &merged1.data[key];
568 let reg2 = &merged2.data[key];
569 prop_assert_eq!(®1.value, ®2.value);
570 }
571 }
572
573 #[test]
574 fn test_lww_map_associativity(
575 map1 in arb_lww_map(),
576 map2 in arb_lww_map(),
577 map3 in arb_lww_map(),
578 ) {
579 let mut merged_left = map1.clone();
580 let mut temp = map2.clone();
581 temp.merge(&map3).unwrap();
582 merged_left.merge(&temp).unwrap();
583
584 let mut merged_right = map1.clone();
585 merged_right.merge(&map2).unwrap();
586 merged_right.merge(&map3).unwrap();
587
588 prop_assert_eq!(merged_left.data.len(), merged_right.data.len());
590
591 for key in merged_left.data.keys() {
592 prop_assert!(merged_right.data.contains_key(key));
593 let reg1 = &merged_left.data[key];
594 let reg2 = &merged_right.data[key];
595 prop_assert_eq!(®1.value, ®2.value);
596 }
597 }
598
599 #[test]
600 fn test_lww_map_idempotency(
601 map in arb_lww_map(),
602 ) {
603 let mut merged = map.clone();
604 merged.merge(&map).unwrap();
605
606 prop_assert_eq!(merged.data.len(), map.data.len());
608
609 for (key, original_register) in &map.data {
610 let merged_register = &merged.data[key];
611 prop_assert_eq!(&merged_register.value, &original_register.value);
612 prop_assert_eq!(merged_register.timestamp, original_register.timestamp);
613 prop_assert_eq!(merged_register.replica_id, original_register.replica_id);
614 }
615 }
616
617 #[test]
618 fn test_lww_map_convergence(
619 maps in prop::collection::vec(arb_lww_map(), 2..5),
620 ) {
621 let mut final_state = maps[0].clone();
623
624 for map in &maps[1..] {
625 final_state.merge(map).unwrap();
626 }
627
628 let mut reverse_final_state = maps[maps.len() - 1].clone();
630 for map in maps.iter().rev().skip(1) {
631 reverse_final_state.merge(map).unwrap();
632 }
633
634 prop_assert_eq!(final_state.data.len(), reverse_final_state.data.len());
636
637 for key in final_state.data.keys() {
638 prop_assert!(reverse_final_state.data.contains_key(key));
639 let reg1 = &final_state.data[key];
640 let reg2 = &reverse_final_state.data[key];
641 prop_assert_eq!(®1.value, ®2.value);
642 }
643 }
644 }
645
646 proptest! {
647 #[test]
648 fn test_gcounter_commutativity(
649 counter1 in arb_gcounter(),
650 counter2 in arb_gcounter(),
651 ) {
652 let mut merged1 = counter1.clone();
653 let mut merged2 = counter2.clone();
654
655 merged1.merge(&counter2).unwrap();
656 merged2.merge(&counter1).unwrap();
657
658 let value1 = merged1.value();
660 let value2 = merged2.value();
661 prop_assert_eq!(merged1.increments, merged2.increments);
662 prop_assert_eq!(value1, value2);
663 }
664
665 #[test]
666 fn test_gcounter_associativity(
667 counter1 in arb_gcounter(),
668 counter2 in arb_gcounter(),
669 counter3 in arb_gcounter(),
670 ) {
671 let mut merged_left = counter1.clone();
672 let mut temp = counter2.clone();
673 temp.merge(&counter3).unwrap();
674 merged_left.merge(&temp).unwrap();
675
676 let mut merged_right = counter1.clone();
677 merged_right.merge(&counter2).unwrap();
678 merged_right.merge(&counter3).unwrap();
679
680 let value_left = merged_left.value();
682 let value_right = merged_right.value();
683 prop_assert_eq!(merged_left.increments, merged_right.increments);
684 prop_assert_eq!(value_left, value_right);
685 }
686
687 #[test]
688 fn test_gcounter_idempotency(
689 counter in arb_gcounter(),
690 ) {
691 let mut merged = counter.clone();
692 merged.merge(&counter).unwrap();
693
694 let merged_value = merged.value();
696 let original_value = counter.value();
697 prop_assert_eq!(merged.increments, counter.increments);
698 prop_assert_eq!(merged_value, original_value);
699 }
700
701 #[test]
702 fn test_gcounter_convergence(
703 counters in prop::collection::vec(arb_gcounter(), 2..5),
704 ) {
705 let mut final_state = counters[0].clone();
707
708 for counter in &counters[1..] {
709 final_state.merge(counter).unwrap();
710 }
711
712 let mut reverse_final_state = counters[counters.len() - 1].clone();
714 for counter in counters.iter().rev().skip(1) {
715 reverse_final_state.merge(counter).unwrap();
716 }
717
718 let final_value = final_state.value();
720 let reverse_value = reverse_final_state.value();
721 prop_assert_eq!(final_state.increments, reverse_final_state.increments);
722 prop_assert_eq!(final_value, reverse_value);
723 }
724
725 #[test]
726 fn test_gcounter_monotonicity(
727 replica_id in arb_replica_id(),
728 initial_value in 0u64..50,
729 increment_amount in 1u64..10,
730 ) {
731 let mut counter = GCounter::new();
732 counter.increments.insert(replica_id, initial_value);
733
734 let old_value = counter.value();
735
736 for _ in 0..increment_amount {
738 counter.increment(replica_id);
739 }
740
741 let new_value = counter.value();
742
743 prop_assert!(new_value > old_value);
745 prop_assert_eq!(new_value, old_value + increment_amount);
746 }
747
748 #[test]
749 fn test_gcounter_maximum_merge(
750 replica_id in arb_replica_id(),
751 value1 in 0u64..100,
752 value2 in 0u64..100,
753 ) {
754 let mut counter1 = GCounter::new();
755 counter1.increments.insert(replica_id, value1);
756
757 let mut counter2 = GCounter::new();
758 counter2.increments.insert(replica_id, value2);
759
760 counter1.merge(&counter2).unwrap();
761
762 let expected_value = value1.max(value2);
764 prop_assert_eq!(counter1.replica_value(replica_id), expected_value);
765 }
766 }
767
768 proptest! {
769 #[test]
770 fn test_crdt_serialization_roundtrip(
771 register in arb_lww_register(),
772 ) {
773 let serialized = serde_json::to_string(®ister).unwrap();
775 let deserialized: LwwRegister<String> = serde_json::from_str(&serialized).unwrap();
776
777 prop_assert_eq!(register.value, deserialized.value);
778 prop_assert_eq!(register.timestamp, deserialized.timestamp);
779 prop_assert_eq!(register.replica_id, deserialized.replica_id);
780 }
781
782 #[test]
783 fn test_crdt_merge_preserves_serialization(
784 reg1 in arb_lww_register(),
785 reg2 in arb_lww_register(),
786 ) {
787 let mut merged = reg1.clone();
788 merged.merge(®2).unwrap();
789
790 let serialized = serde_json::to_string(&merged).unwrap();
792 let deserialized: LwwRegister<String> = serde_json::from_str(&serialized).unwrap();
793
794 prop_assert_eq!(merged.value, deserialized.value);
795 prop_assert_eq!(merged.timestamp, deserialized.timestamp);
796 prop_assert_eq!(merged.replica_id, deserialized.replica_id);
797 }
798 }
799}