leptos_sync_core/
crdt.rs

1//! CRDT (Conflict-free Replicated Data Type) implementations
2
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::fmt;
6use std::hash::Hash;
7use uuid::Uuid;
8
9/// Unique identifier for a replica
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
11pub struct ReplicaId(#[serde(with = "uuid_serde")] pub Uuid);
12
13mod uuid_serde {
14    use super::*;
15    use serde::{Deserializer, Serializer};
16
17    pub fn serialize<S>(uuid: &Uuid, serializer: S) -> Result<S::Ok, S::Error>
18    where
19        S: Serializer,
20    {
21        uuid.to_string().serialize(serializer)
22    }
23
24    pub fn deserialize<'de, D>(deserializer: D) -> Result<Uuid, D::Error>
25    where
26        D: Deserializer<'de>,
27    {
28        let s = String::deserialize(deserializer)?;
29        Uuid::parse_str(&s).map_err(serde::de::Error::custom)
30    }
31}
32
33impl Default for ReplicaId {
34    fn default() -> Self {
35        Self(Uuid::new_v4())
36    }
37}
38
39impl fmt::Display for ReplicaId {
40    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41        write!(f, "{}", self.0)
42    }
43}
44
45impl From<Uuid> for ReplicaId {
46    fn from(uuid: Uuid) -> Self {
47        Self(uuid)
48    }
49}
50
51impl From<ReplicaId> for Uuid {
52    fn from(replica_id: ReplicaId) -> Self {
53        replica_id.0
54    }
55}
56
57/// Trait for types that can be merged with other instances
58pub trait Mergeable: Clone + Send + Sync {
59    type Error: std::error::Error + Send + Sync + 'static;
60    
61    /// Merge this instance with another instance
62    fn merge(&mut self, other: &Self) -> Result<(), Self::Error>;
63    
64    /// Check if there's a conflict with another instance
65    fn has_conflict(&self, other: &Self) -> bool;
66}
67
68/// Last-Write-Wins Register
69#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
70pub struct LwwRegister<T> {
71    value: T,
72    timestamp: chrono::DateTime<chrono::Utc>,
73    replica_id: ReplicaId,
74}
75
76impl<T: Default> Default for LwwRegister<T> {
77    fn default() -> Self {
78        Self {
79            value: T::default(),
80            timestamp: chrono::Utc::now(),
81            replica_id: ReplicaId::default(),
82        }
83    }
84}
85
86impl<T> LwwRegister<T> {
87    pub fn new(value: T, replica_id: ReplicaId) -> Self {
88        Self {
89            value,
90            timestamp: chrono::Utc::now(),
91            replica_id,
92        }
93    }
94
95    pub fn value(&self) -> &T {
96        &self.value
97    }
98
99    pub fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
100        self.timestamp
101    }
102
103    pub fn replica_id(&self) -> ReplicaId {
104        self.replica_id
105    }
106
107    pub fn update(&mut self, value: T, replica_id: ReplicaId) {
108        self.value = value;
109        self.timestamp = chrono::Utc::now();
110        self.replica_id = replica_id;
111    }
112
113    pub fn with_timestamp(mut self, timestamp: chrono::DateTime<chrono::Utc>) -> Self {
114        self.timestamp = timestamp;
115        self
116    }
117}
118
119impl<T: Clone + PartialEq + Send + Sync> Mergeable for LwwRegister<T> {
120    type Error = std::io::Error;
121    
122    fn merge(&mut self, other: &Self) -> Result<(), Self::Error> {
123        if other.timestamp > self.timestamp || 
124           (other.timestamp == self.timestamp && other.replica_id.0 > self.replica_id.0) {
125            self.value = other.value.clone();
126            self.timestamp = other.timestamp;
127            self.replica_id = other.replica_id;
128        }
129        Ok(())
130    }
131    
132    fn has_conflict(&self, other: &Self) -> bool {
133        self.timestamp == other.timestamp && self.replica_id != other.replica_id
134    }
135}
136
137/// Last-Write-Wins Map
138#[derive(Debug, Clone)]
139pub struct LwwMap<K, V> {
140    data: HashMap<K, LwwRegister<V>>,
141}
142
143impl<K, V> LwwMap<K, V>
144where
145    K: Clone + Eq + Hash + Send + Sync,
146    V: Clone + PartialEq + Send + Sync,
147{
148    pub fn new() -> Self {
149        Self {
150            data: HashMap::new(),
151        }
152    }
153
154    pub fn insert(&mut self, key: K, value: V, replica_id: ReplicaId) {
155        let register = LwwRegister::new(value, replica_id);
156        self.data.insert(key, register);
157    }
158
159    pub fn get(&self, key: &K) -> Option<&V> {
160        self.data.get(key).map(|register| register.value())
161    }
162
163    pub fn get_register(&self, key: &K) -> Option<&LwwRegister<V>> {
164        self.data.get(key)
165    }
166
167    pub fn remove(&mut self, key: &K) -> Option<V> {
168        self.data.remove(key).map(|register| register.value().clone())
169    }
170
171    pub fn contains_key(&self, key: &K) -> bool {
172        self.data.contains_key(key)
173    }
174
175    pub fn len(&self) -> usize {
176        self.data.len()
177    }
178
179    pub fn is_empty(&self) -> bool {
180        self.data.is_empty()
181    }
182
183    pub fn keys(&self) -> impl Iterator<Item = &K> {
184        self.data.keys()
185    }
186
187    pub fn values(&self) -> impl Iterator<Item = &V> {
188        self.data.values().map(|register| register.value())
189    }
190
191    pub fn iter(&self) -> impl Iterator<Item = (&K, &V)> {
192        self.data.iter().map(|(k, v)| (k, v.value()))
193    }
194}
195
196impl<K, V> Default for LwwMap<K, V>
197where
198    K: Clone + Eq + Hash + Send + Sync,
199    V: Clone + PartialEq + Send + Sync,
200{
201    fn default() -> Self {
202        Self::new()
203    }
204}
205
206impl<K, V> Mergeable for LwwMap<K, V>
207where
208    K: Clone + Eq + Hash + Send + Sync,
209    V: Clone + PartialEq + Send + Sync,
210{
211    type Error = std::io::Error;
212    
213    fn merge(&mut self, other: &Self) -> Result<(), Self::Error> {
214        for (key, other_register) in &other.data {
215            match self.data.get_mut(key) {
216                Some(existing_register) => {
217                    existing_register.merge(other_register)?;
218                }
219                None => {
220                    self.data.insert(key.clone(), other_register.clone());
221                }
222            }
223        }
224        Ok(())
225    }
226    
227    fn has_conflict(&self, other: &Self) -> bool {
228        for (key, other_register) in &other.data {
229            if let Some(existing_register) = self.data.get(key) {
230                if existing_register.has_conflict(other_register) {
231                    return true;
232                }
233            }
234        }
235        false
236    }
237}
238
239/// Counter that can be incremented/decremented
240#[derive(Debug, Clone, Serialize, Deserialize)]
241pub struct GCounter {
242    increments: HashMap<ReplicaId, u64>,
243}
244
245impl GCounter {
246    pub fn new() -> Self {
247        Self {
248            increments: HashMap::new(),
249        }
250    }
251
252    pub fn increment(&mut self, replica_id: ReplicaId) {
253        *self.increments.entry(replica_id).or_insert(0) += 1;
254    }
255
256    pub fn value(&self) -> u64 {
257        self.increments.values().sum()
258    }
259
260    pub fn replica_value(&self, replica_id: ReplicaId) -> u64 {
261        self.increments.get(&replica_id).copied().unwrap_or(0)
262    }
263}
264
265impl Default for GCounter {
266    fn default() -> Self {
267        Self::new()
268    }
269}
270
271impl Mergeable for GCounter {
272    type Error = std::io::Error;
273    
274    fn merge(&mut self, other: &Self) -> Result<(), Self::Error> {
275        for (replica_id, increment) in &other.increments {
276            let current = self.increments.entry(*replica_id).or_insert(0);
277            *current = (*current).max(*increment);
278        }
279        Ok(())
280    }
281    
282    fn has_conflict(&self, _other: &Self) -> bool {
283        // G-Counters are conflict-free by design
284        false
285    }
286}
287
288#[cfg(test)]
289mod tests {
290    use super::*;
291
292    #[test]
293    fn test_replica_id_serialization() {
294        let replica_id = ReplicaId::default();
295        let serialized = serde_json::to_string(&replica_id).unwrap();
296        let deserialized: ReplicaId = serde_json::from_str(&serialized).unwrap();
297        assert_eq!(replica_id, deserialized);
298    }
299
300    #[test]
301    fn test_lww_register_merge() {
302        let mut reg1 = LwwRegister::new("value1", ReplicaId::default());
303        let reg2 = LwwRegister::new("value2", ReplicaId::default());
304        
305        // Wait a bit to ensure different timestamps
306        std::thread::sleep(std::time::Duration::from_millis(1));
307        
308        reg1.merge(&reg2).unwrap();
309        assert_eq!(reg1.value(), &"value2");
310    }
311
312    #[test]
313    fn test_lww_map_operations() {
314        let mut map = LwwMap::new();
315        let replica_id = ReplicaId::default();
316        
317        map.insert("key1".to_string(), "value1".to_string(), replica_id);
318        assert_eq!(map.get(&"key1".to_string()), Some(&"value1".to_string()));
319        assert_eq!(map.len(), 1);
320        
321        map.remove(&"key1".to_string());
322        assert_eq!(map.len(), 0);
323    }
324
325    #[test]
326    fn test_gcounter_operations() {
327        let mut counter = GCounter::new();
328        let replica_id = ReplicaId::default();
329        
330        counter.increment(replica_id);
331        counter.increment(replica_id);
332        
333        assert_eq!(counter.value(), 2);
334        assert_eq!(counter.replica_value(replica_id), 2);
335    }
336}
337
338#[cfg(test)]
339mod property_tests {
340    use super::*;
341    use proptest::prelude::*;
342
343    /// Generate arbitrary ReplicaId for testing
344    fn arb_replica_id() -> impl Strategy<Value = ReplicaId> {
345        any::<[u8; 16]>().prop_map(|bytes| {
346            let uuid = uuid::Uuid::from_bytes(bytes);
347            ReplicaId::from(uuid)
348        })
349    }
350
351    /// Generate arbitrary timestamp for testing
352    fn arb_timestamp() -> impl Strategy<Value = chrono::DateTime<chrono::Utc>> {
353        (0i64..1_000_000_000_000i64).prop_map(|seconds| {
354            chrono::DateTime::from_timestamp(seconds, 0).unwrap_or_else(|| chrono::Utc::now())
355        })
356    }
357
358    /// Generate arbitrary LwwRegister for testing
359    fn arb_lww_register() -> impl Strategy<Value = LwwRegister<String>> {
360        (any::<String>(), arb_replica_id(), arb_timestamp()).prop_map(|(value, replica_id, timestamp)| {
361            LwwRegister {
362                value,
363                timestamp,
364                replica_id,
365            }
366        })
367    }
368
369    /// Generate arbitrary LwwMap for testing
370    fn arb_lww_map() -> impl Strategy<Value = LwwMap<String, String>> {
371        prop::collection::hash_map(
372            any::<String>(),
373            arb_lww_register(),
374            0..10
375        ).prop_map(|data| {
376            LwwMap { data }
377        })
378    }
379
380    /// Generate arbitrary GCounter for testing
381    fn arb_gcounter() -> impl Strategy<Value = GCounter> {
382        prop::collection::hash_map(
383            arb_replica_id(),
384            0u64..100,
385            0..5
386        ).prop_map(|increments| {
387            GCounter { increments }
388        })
389    }
390
391    proptest! {
392        #[test]
393        fn test_lww_register_commutativity(
394            reg1 in arb_lww_register(),
395            reg2 in arb_lww_register(),
396        ) {
397            let mut merged1 = reg1.clone();
398            let mut merged2 = reg2.clone();
399            
400            // Test commutativity: merge(a, b) should equal merge(b, a)
401            merged1.merge(&reg2).unwrap();
402            merged2.merge(&reg1).unwrap();
403            
404            // The final value should be the same regardless of merge order
405            prop_assert_eq!(merged1.value, merged2.value);
406        }
407
408        #[test]
409        fn test_lww_register_associativity(
410            reg1 in arb_lww_register(),
411            reg2 in arb_lww_register(),
412            reg3 in arb_lww_register(),
413        ) {
414            let mut merged_left = reg1.clone();
415            let mut temp = reg2.clone();
416            temp.merge(&reg3).unwrap();
417            merged_left.merge(&temp).unwrap();
418            
419            let mut merged_right = reg1.clone();
420            merged_right.merge(&reg2).unwrap();
421            merged_right.merge(&reg3).unwrap();
422            
423            // Test associativity: merge(merge(a, b), c) == merge(a, merge(b, c))
424            prop_assert_eq!(merged_left.value, merged_right.value);
425        }
426
427        #[test]
428        fn test_lww_register_idempotency(
429            reg in arb_lww_register(),
430        ) {
431            let mut merged = reg.clone();
432            merged.merge(&reg).unwrap();
433            
434            // Test idempotency: merge(a, a) == a
435            prop_assert_eq!(merged.value, reg.value);
436            prop_assert_eq!(merged.timestamp, reg.timestamp);
437            prop_assert_eq!(merged.replica_id, reg.replica_id);
438        }
439
440        #[test]
441        fn test_lww_register_convergence(
442            regs in prop::collection::vec(arb_lww_register(), 2..10),
443        ) {
444            // Test convergence: multiple replicas should converge to the same state
445            let mut final_state = regs[0].clone();
446            
447            for reg in &regs[1..] {
448                final_state.merge(reg).unwrap();
449            }
450            
451            // Now merge in reverse order
452            let mut reverse_final_state = regs[regs.len() - 1].clone();
453            for reg in regs.iter().rev().skip(1) {
454                reverse_final_state.merge(reg).unwrap();
455            }
456            
457            // Both should converge to the same value
458            prop_assert_eq!(final_state.value, reverse_final_state.value);
459        }
460
461        #[test]
462        fn test_lww_register_timestamp_ordering(
463            value in any::<String>(),
464            replica_id in arb_replica_id(),
465            timestamp1 in arb_timestamp(),
466            timestamp2 in arb_timestamp(),
467        ) {
468            let mut reg1 = LwwRegister {
469                value: value.clone(),
470                timestamp: timestamp1,
471                replica_id,
472            };
473            
474            let reg2 = LwwRegister {
475                value: "different_value".to_string(),
476                timestamp: timestamp2,
477                replica_id,
478            };
479            
480            reg1.merge(&reg2).unwrap();
481            
482            // The register should contain the value from the register with the later timestamp
483            if timestamp2 > timestamp1 {
484                prop_assert_eq!(reg1.value, "different_value");
485            } else {
486                prop_assert_eq!(reg1.value, value);
487            }
488        }
489
490        #[test]
491        fn test_lww_register_replica_id_tie_breaking(
492            value1 in any::<String>(),
493            value2 in any::<String>(),
494            replica_id1 in arb_replica_id(),
495            replica_id2 in arb_replica_id(),
496            timestamp in arb_timestamp(),
497        ) {
498            // Skip if replica IDs are the same (no tie to break)
499            prop_assume!(replica_id1 != replica_id2);
500            
501            let mut reg1 = LwwRegister {
502                value: value1.clone(),
503                timestamp,
504                replica_id: replica_id1,
505            };
506            
507            let reg2 = LwwRegister {
508                value: value2.clone(),
509                timestamp,
510                replica_id: replica_id2,
511            };
512            
513            reg1.merge(&reg2).unwrap();
514            
515            // When timestamps are equal, the register with the "larger" replica_id should win
516            let winner = if replica_id2.0 > replica_id1.0 { value2 } else { value1 };
517            prop_assert_eq!(reg1.value, winner);
518        }
519    }
520
521    proptest! {
522        #[test]
523        fn test_lww_map_commutativity(
524            map1 in arb_lww_map(),
525            map2 in arb_lww_map(),
526        ) {
527            let mut merged1 = map1.clone();
528            let mut merged2 = map2.clone();
529            
530            merged1.merge(&map2).unwrap();
531            merged2.merge(&map1).unwrap();
532            
533            // Test commutativity: merge(a, b) should equal merge(b, a)
534            prop_assert_eq!(merged1.data.len(), merged2.data.len());
535            
536            for key in merged1.data.keys() {
537                prop_assert!(merged2.data.contains_key(key));
538                let reg1 = &merged1.data[key];
539                let reg2 = &merged2.data[key];
540                prop_assert_eq!(&reg1.value, &reg2.value);
541            }
542        }
543
544        #[test]
545        fn test_lww_map_associativity(
546            map1 in arb_lww_map(),
547            map2 in arb_lww_map(),
548            map3 in arb_lww_map(),
549        ) {
550            let mut merged_left = map1.clone();
551            let mut temp = map2.clone();
552            temp.merge(&map3).unwrap();
553            merged_left.merge(&temp).unwrap();
554            
555            let mut merged_right = map1.clone();
556            merged_right.merge(&map2).unwrap();
557            merged_right.merge(&map3).unwrap();
558            
559            // Test associativity
560            prop_assert_eq!(merged_left.data.len(), merged_right.data.len());
561            
562            for key in merged_left.data.keys() {
563                prop_assert!(merged_right.data.contains_key(key));
564                let reg1 = &merged_left.data[key];
565                let reg2 = &merged_right.data[key];
566                prop_assert_eq!(&reg1.value, &reg2.value);
567            }
568        }
569
570        #[test]
571        fn test_lww_map_idempotency(
572            map in arb_lww_map(),
573        ) {
574            let mut merged = map.clone();
575            merged.merge(&map).unwrap();
576            
577            // Test idempotency: merge(a, a) == a
578            prop_assert_eq!(merged.data.len(), map.data.len());
579            
580            for (key, original_register) in &map.data {
581                let merged_register = &merged.data[key];
582                prop_assert_eq!(&merged_register.value, &original_register.value);
583                prop_assert_eq!(merged_register.timestamp, original_register.timestamp);
584                prop_assert_eq!(merged_register.replica_id, original_register.replica_id);
585            }
586        }
587
588        #[test]
589        fn test_lww_map_convergence(
590            maps in prop::collection::vec(arb_lww_map(), 2..5),
591        ) {
592            // Test convergence: multiple maps should converge to the same state
593            let mut final_state = maps[0].clone();
594            
595            for map in &maps[1..] {
596                final_state.merge(map).unwrap();
597            }
598            
599            // Now merge in reverse order
600            let mut reverse_final_state = maps[maps.len() - 1].clone();
601            for map in maps.iter().rev().skip(1) {
602                reverse_final_state.merge(map).unwrap();
603            }
604            
605            // Both should converge to the same state
606            prop_assert_eq!(final_state.data.len(), reverse_final_state.data.len());
607            
608            for key in final_state.data.keys() {
609                prop_assert!(reverse_final_state.data.contains_key(key));
610                let reg1 = &final_state.data[key];
611                let reg2 = &reverse_final_state.data[key];
612                prop_assert_eq!(&reg1.value, &reg2.value);
613            }
614        }
615    }
616
617    proptest! {
618        #[test]
619        fn test_gcounter_commutativity(
620            counter1 in arb_gcounter(),
621            counter2 in arb_gcounter(),
622        ) {
623            let mut merged1 = counter1.clone();
624            let mut merged2 = counter2.clone();
625            
626            merged1.merge(&counter2).unwrap();
627            merged2.merge(&counter1).unwrap();
628            
629            // Test commutativity: merge(a, b) should equal merge(b, a)
630            let value1 = merged1.value();
631            let value2 = merged2.value();
632            prop_assert_eq!(merged1.increments, merged2.increments);
633            prop_assert_eq!(value1, value2);
634        }
635
636        #[test]
637        fn test_gcounter_associativity(
638            counter1 in arb_gcounter(),
639            counter2 in arb_gcounter(),
640            counter3 in arb_gcounter(),
641        ) {
642            let mut merged_left = counter1.clone();
643            let mut temp = counter2.clone();
644            temp.merge(&counter3).unwrap();
645            merged_left.merge(&temp).unwrap();
646            
647            let mut merged_right = counter1.clone();
648            merged_right.merge(&counter2).unwrap();
649            merged_right.merge(&counter3).unwrap();
650            
651            // Test associativity
652            let value_left = merged_left.value();
653            let value_right = merged_right.value();
654            prop_assert_eq!(merged_left.increments, merged_right.increments);
655            prop_assert_eq!(value_left, value_right);
656        }
657
658        #[test]
659        fn test_gcounter_idempotency(
660            counter in arb_gcounter(),
661        ) {
662            let mut merged = counter.clone();
663            merged.merge(&counter).unwrap();
664            
665            // Test idempotency: merge(a, a) == a
666            let merged_value = merged.value();
667            let original_value = counter.value();
668            prop_assert_eq!(merged.increments, counter.increments);
669            prop_assert_eq!(merged_value, original_value);
670        }
671
672        #[test]
673        fn test_gcounter_convergence(
674            counters in prop::collection::vec(arb_gcounter(), 2..5),
675        ) {
676            // Test convergence: multiple counters should converge to the same state
677            let mut final_state = counters[0].clone();
678            
679            for counter in &counters[1..] {
680                final_state.merge(counter).unwrap();
681            }
682            
683            // Now merge in reverse order
684            let mut reverse_final_state = counters[counters.len() - 1].clone();
685            for counter in counters.iter().rev().skip(1) {
686                reverse_final_state.merge(counter).unwrap();
687            }
688            
689            // Both should converge to the same state
690            let final_value = final_state.value();
691            let reverse_value = reverse_final_state.value();
692            prop_assert_eq!(final_state.increments, reverse_final_state.increments);
693            prop_assert_eq!(final_value, reverse_value);
694        }
695
696        #[test]
697        fn test_gcounter_monotonicity(
698            replica_id in arb_replica_id(),
699            initial_value in 0u64..50,
700            increment_amount in 1u64..10,
701        ) {
702            let mut counter = GCounter::new();
703            counter.increments.insert(replica_id, initial_value);
704            
705            let old_value = counter.value();
706            
707            // Increment the counter
708            for _ in 0..increment_amount {
709                counter.increment(replica_id);
710            }
711            
712            let new_value = counter.value();
713            
714            // Test monotonicity: counter should only increase
715            prop_assert!(new_value > old_value);
716            prop_assert_eq!(new_value, old_value + increment_amount);
717        }
718
719        #[test]
720        fn test_gcounter_maximum_merge(
721            replica_id in arb_replica_id(),
722            value1 in 0u64..100,
723            value2 in 0u64..100,
724        ) {
725            let mut counter1 = GCounter::new();
726            counter1.increments.insert(replica_id, value1);
727            
728            let mut counter2 = GCounter::new();
729            counter2.increments.insert(replica_id, value2);
730            
731            counter1.merge(&counter2).unwrap();
732            
733            // After merge, should have the maximum of both values
734            let expected_value = value1.max(value2);
735            prop_assert_eq!(counter1.replica_value(replica_id), expected_value);
736        }
737    }
738
739    proptest! {
740        #[test]
741        fn test_crdt_serialization_roundtrip(
742            register in arb_lww_register(),
743        ) {
744            // Test that CRDTs can be serialized and deserialized correctly
745            let serialized = serde_json::to_string(&register).unwrap();
746            let deserialized: LwwRegister<String> = serde_json::from_str(&serialized).unwrap();
747            
748            prop_assert_eq!(register.value, deserialized.value);
749            prop_assert_eq!(register.timestamp, deserialized.timestamp);
750            prop_assert_eq!(register.replica_id, deserialized.replica_id);
751        }
752
753        #[test]
754        fn test_crdt_merge_preserves_serialization(
755            reg1 in arb_lww_register(),
756            reg2 in arb_lww_register(),
757        ) {
758            let mut merged = reg1.clone();
759            merged.merge(&reg2).unwrap();
760            
761            // Test that merged CRDT can still be serialized
762            let serialized = serde_json::to_string(&merged).unwrap();
763            let deserialized: LwwRegister<String> = serde_json::from_str(&serialized).unwrap();
764            
765            prop_assert_eq!(merged.value, deserialized.value);
766            prop_assert_eq!(merged.timestamp, deserialized.timestamp);
767            prop_assert_eq!(merged.replica_id, deserialized.replica_id);
768        }
769    }
770}