Skip to main content

mdcs_core/
lwwreg.rs

1//! Last-Write-Wins (LWW) Register CRDT
2//!
3//! The LWW Register always retains the value with the highest timestamp.
4//! In case of a tie, the replica with the highest ID wins.
5//!
6//! This is a simple eventual-consistency mechanism that resolves concurrent
7//! writes by always choosing the "latest" update based on timestamp and
8//! replica ordering.
9
10use crate::lattice::Lattice;
11use serde::{Deserialize, Serialize};
12
13/// A Last-Write-Wins Register CRDT
14///
15/// Stores a value along with a timestamp and replica ID.
16/// The value with the highest timestamp (tie-break on replica_id) always wins.
17#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
18pub struct LWWRegister<T: Ord + Clone, K: Ord + Clone> {
19    /// The current value
20    value: Option<T>,
21    /// The timestamp of the last write
22    timestamp: u64,
23    /// The replica ID that wrote this value (for tie-breaking)
24    replica_id: K,
25}
26
27impl<T: Ord + Clone, K: Ord + Clone> LWWRegister<T, K> {
28    /// Create a new LWW Register with no value
29    pub fn new(replica_id: K) -> Self {
30        Self {
31            value: None,
32            timestamp: 0,
33            replica_id,
34        }
35    }
36
37    /// Set a new value with the given timestamp
38    pub fn set(&mut self, value: T, timestamp: u64, replica_id: K) {
39        if timestamp > self.timestamp
40            || (timestamp == self.timestamp && replica_id >= self.replica_id)
41        {
42            self.value = Some(value);
43            self.timestamp = timestamp;
44            self.replica_id = replica_id;
45        }
46    }
47
48    /// Get the current value if it exists
49    pub fn get(&self) -> Option<&T> {
50        self.value.as_ref()
51    }
52
53    /// Get the timestamp of the current value
54    pub fn timestamp(&self) -> u64 {
55        self.timestamp
56    }
57
58    /// Get the replica ID that wrote the current value
59    pub fn replica_id(&self) -> &K {
60        &self.replica_id
61    }
62
63    /// Check if the register is empty (no value set)
64    pub fn is_empty(&self) -> bool {
65        self.value.is_none()
66    }
67
68    /// Clear the register (set to empty state)
69    pub fn clear(&mut self) {
70        self.value = None;
71        self.timestamp = 0;
72    }
73}
74
75impl<T: Ord + Clone, K: Ord + Clone + Default> Default for LWWRegister<T, K> {
76    fn default() -> Self {
77        Self::new(K::default())
78    }
79}
80
81impl<T: Ord + Clone, K: Ord + Clone + Default> Lattice for LWWRegister<T, K> {
82    fn bottom() -> Self {
83        Self {
84            value: None,
85            timestamp: 0,
86            replica_id: K::default(),
87        }
88    }
89
90    /// Join operation: keep the value with the highest timestamp
91    /// Tie-break on replica_id (higher wins), then on value (higher wins)
92    fn join(&self, other: &Self) -> Self {
93        // Compare by (timestamp, replica_id, value) tuple
94        let self_wins = match self.timestamp.cmp(&other.timestamp) {
95            std::cmp::Ordering::Greater => true,
96            std::cmp::Ordering::Less => false,
97            std::cmp::Ordering::Equal => {
98                match self.replica_id.cmp(&other.replica_id) {
99                    std::cmp::Ordering::Greater => true,
100                    std::cmp::Ordering::Less => false,
101                    std::cmp::Ordering::Equal => {
102                        // Same timestamp and replica_id: compare values for determinism
103                        self.value >= other.value
104                    }
105                }
106            }
107        };
108
109        if self_wins {
110            self.clone()
111        } else {
112            other.clone()
113        }
114    }
115}
116
117#[cfg(test)]
118mod tests {
119    use super::*;
120
121    #[test]
122    fn test_lwwreg_basic_operations() {
123        let mut reg: LWWRegister<i32, String> = LWWRegister::new("replica1".to_string());
124
125        assert!(reg.is_empty());
126        assert_eq!(reg.get(), None);
127
128        // Set a value
129        reg.set(42, 100, "replica1".to_string());
130        assert_eq!(reg.get(), Some(&42));
131        assert_eq!(reg.timestamp(), 100);
132    }
133
134    #[test]
135    fn test_lwwreg_higher_timestamp_wins() {
136        let mut reg: LWWRegister<i32, String> = LWWRegister::new("replica1".to_string());
137
138        reg.set(10, 100, "replica1".to_string());
139        assert_eq!(reg.get(), Some(&10));
140
141        reg.set(20, 200, "replica2".to_string());
142        assert_eq!(reg.get(), Some(&20));
143
144        // Old timestamp doesn't overwrite
145        reg.set(30, 150, "replica1".to_string());
146        assert_eq!(reg.get(), Some(&20));
147    }
148
149    #[test]
150    fn test_lwwreg_tie_break_replica_id() {
151        let mut reg: LWWRegister<i32, String> = LWWRegister::new("replica1".to_string());
152
153        // Same timestamp, different replicas
154        reg.set(10, 100, "replica1".to_string());
155        assert_eq!(reg.get(), Some(&10));
156
157        // Replica with higher ID wins on tie
158        reg.set(20, 100, "replica2".to_string());
159        assert_eq!(reg.get(), Some(&20));
160
161        // Lower replica ID doesn't overwrite
162        reg.set(30, 100, "replica1".to_string());
163        assert_eq!(reg.get(), Some(&20));
164    }
165
166    #[test]
167    fn test_lwwreg_join_idempotent() {
168        let mut reg1: LWWRegister<i32, String> = LWWRegister::new("replica1".to_string());
169        reg1.set(42, 100, "replica1".to_string());
170
171        let joined = reg1.join(&reg1);
172        assert_eq!(joined.get(), Some(&42));
173        assert_eq!(joined.timestamp(), 100);
174    }
175
176    #[test]
177    fn test_lwwreg_join_commutative() {
178        let mut reg1: LWWRegister<i32, String> = LWWRegister::new("replica1".to_string());
179        reg1.set(10, 100, "replica1".to_string());
180
181        let mut reg2: LWWRegister<i32, String> = LWWRegister::new("replica2".to_string());
182        reg2.set(20, 150, "replica2".to_string());
183
184        let joined1 = reg1.join(&reg2);
185        let joined2 = reg2.join(&reg1);
186
187        assert_eq!(joined1.get(), joined2.get());
188        assert_eq!(joined1.timestamp(), joined2.timestamp());
189    }
190
191    #[test]
192    fn test_lwwreg_join_associative() {
193        let mut reg1: LWWRegister<i32, String> = LWWRegister::new("replica1".to_string());
194        reg1.set(10, 100, "replica1".to_string());
195
196        let mut reg2: LWWRegister<i32, String> = LWWRegister::new("replica2".to_string());
197        reg2.set(20, 150, "replica2".to_string());
198
199        let mut reg3: LWWRegister<i32, String> = LWWRegister::new("replica3".to_string());
200        reg3.set(30, 120, "replica3".to_string());
201
202        let left = reg1.join(&reg2).join(&reg3);
203        let right = reg1.join(&reg2.join(&reg3));
204
205        assert_eq!(left.get(), right.get());
206        assert_eq!(left.timestamp(), right.timestamp());
207    }
208
209    #[test]
210    fn test_lwwreg_bottom_is_identity() {
211        let mut reg: LWWRegister<i32, String> = LWWRegister::new("replica1".to_string());
212        reg.set(42, 100, "replica1".to_string());
213
214        let bottom = LWWRegister::bottom();
215        let joined = reg.join(&bottom);
216
217        assert_eq!(joined.get(), reg.get());
218        assert_eq!(joined.timestamp(), reg.timestamp());
219    }
220
221    #[test]
222    fn test_lwwreg_serialization() {
223        let mut reg: LWWRegister<i32, String> = LWWRegister::new("replica1".to_string());
224        reg.set(42, 100, "replica1".to_string());
225
226        let serialized = serde_json::to_string(&reg).unwrap();
227        let deserialized: LWWRegister<i32, String> = serde_json::from_str(&serialized).unwrap();
228
229        assert_eq!(deserialized.get(), Some(&42));
230        assert_eq!(deserialized.timestamp(), 100);
231    }
232}