Skip to main content

crdt_kit/
lww_register.rs

1use crate::clock::{HybridClock, HybridTimestamp};
2use crate::{Crdt, DeltaCrdt};
3
4/// A last-writer-wins register (LWW-Register).
5///
6/// Resolves concurrent writes by keeping the value with the highest
7/// [`HybridTimestamp`]. This provides causally consistent ordering even
8/// with clock drift between nodes.
9///
10/// # Example
11///
12/// ```
13/// use crdt_kit::prelude::*;
14/// use crdt_kit::clock::HybridClock;
15///
16/// let mut clock1 = HybridClock::new(1);
17/// let mut clock2 = HybridClock::new(2);
18///
19/// let mut r1 = LWWRegister::new("hello", &mut clock1);
20/// let mut r2 = LWWRegister::new("world", &mut clock2);
21///
22/// r1.merge(&r2);
23/// // Value is determined by HLC timestamp ordering
24/// ```
25#[derive(Debug, Clone)]
26#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
27pub struct LWWRegister<T: Clone> {
28    value: T,
29    timestamp: HybridTimestamp,
30}
31
32impl<T: Clone + PartialEq> PartialEq for LWWRegister<T> {
33    fn eq(&self, other: &Self) -> bool {
34        self.value == other.value && self.timestamp == other.timestamp
35    }
36}
37
38impl<T: Clone + Eq> Eq for LWWRegister<T> {}
39
40impl<T: Clone> LWWRegister<T> {
41    /// Create a new LWW-Register with an initial value.
42    ///
43    /// Uses the provided [`HybridClock`] for a causally consistent timestamp.
44    pub fn new(value: T, clock: &mut HybridClock) -> Self {
45        Self {
46            value,
47            timestamp: clock.now(),
48        }
49    }
50
51    /// Create a new LWW-Register with an explicit timestamp.
52    ///
53    /// Useful for testing or deserialization.
54    pub fn with_timestamp(value: T, timestamp: HybridTimestamp) -> Self {
55        Self { value, timestamp }
56    }
57
58    /// Update the register's value.
59    ///
60    /// Uses the provided [`HybridClock`] for a causally consistent timestamp.
61    pub fn set(&mut self, value: T, clock: &mut HybridClock) {
62        let ts = clock.now();
63        if ts >= self.timestamp {
64            self.value = value;
65            self.timestamp = ts;
66        }
67    }
68
69    /// Update the register's value with an explicit timestamp.
70    ///
71    /// Only applies if the new timestamp is >= the current one.
72    pub fn set_with_timestamp(&mut self, value: T, timestamp: HybridTimestamp) {
73        if timestamp >= self.timestamp {
74            self.value = value;
75            self.timestamp = timestamp;
76        }
77    }
78
79    /// Get the current value.
80    #[must_use]
81    pub fn value(&self) -> &T {
82        &self.value
83    }
84
85    /// Get the current timestamp.
86    #[must_use]
87    pub fn timestamp(&self) -> HybridTimestamp {
88        self.timestamp
89    }
90}
91
92/// Delta for [`LWWRegister`]: the register state if newer, or `None` if the
93/// other replica is already up to date.
94#[derive(Debug, Clone, PartialEq, Eq)]
95#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
96pub struct LWWRegisterDelta<T: Clone> {
97    /// `Some((value, timestamp))` if the source is newer, `None` otherwise.
98    pub update: Option<(T, HybridTimestamp)>,
99}
100
101impl<T: Clone> DeltaCrdt for LWWRegister<T> {
102    type Delta = LWWRegisterDelta<T>;
103
104    fn delta(&self, other: &Self) -> LWWRegisterDelta<T> {
105        if self.timestamp > other.timestamp {
106            LWWRegisterDelta {
107                update: Some((self.value.clone(), self.timestamp)),
108            }
109        } else {
110            LWWRegisterDelta { update: None }
111        }
112    }
113
114    fn apply_delta(&mut self, delta: &LWWRegisterDelta<T>) {
115        if let Some((ref value, ts)) = delta.update {
116            if ts > self.timestamp {
117                self.value = value.clone();
118                self.timestamp = ts;
119            }
120        }
121    }
122}
123
124impl<T: Clone> Crdt for LWWRegister<T> {
125    fn merge(&mut self, other: &Self) {
126        if other.timestamp > self.timestamp {
127            self.value = other.value.clone();
128            self.timestamp = other.timestamp;
129        }
130    }
131}
132
133#[cfg(test)]
134mod tests {
135    use super::*;
136    use crate::clock::HybridTimestamp;
137
138    fn ts(physical: u64, logical: u16, node_id: u16) -> HybridTimestamp {
139        HybridTimestamp {
140            physical,
141            logical,
142            node_id,
143        }
144    }
145
146    #[test]
147    fn new_register_holds_value() {
148        let r = LWWRegister::with_timestamp(42, ts(1, 0, 1));
149        assert_eq!(*r.value(), 42);
150    }
151
152    #[test]
153    fn set_updates_value() {
154        let mut r = LWWRegister::with_timestamp(1, ts(1, 0, 1));
155        r.set_with_timestamp(2, ts(2, 0, 1));
156        assert_eq!(*r.value(), 2);
157    }
158
159    #[test]
160    fn merge_keeps_later_timestamp() {
161        let mut r1 = LWWRegister::with_timestamp("old", ts(1, 0, 1));
162        let r2 = LWWRegister::with_timestamp("new", ts(2, 0, 2));
163
164        r1.merge(&r2);
165        assert_eq!(*r1.value(), "new");
166    }
167
168    #[test]
169    fn merge_keeps_self_if_later() {
170        let mut r1 = LWWRegister::with_timestamp("new", ts(2, 0, 1));
171        let r2 = LWWRegister::with_timestamp("old", ts(1, 0, 2));
172
173        r1.merge(&r2);
174        assert_eq!(*r1.value(), "new");
175    }
176
177    #[test]
178    fn merge_breaks_tie_by_node_id() {
179        let mut r1 = LWWRegister::with_timestamp("first", ts(1, 0, 1));
180        let r2 = LWWRegister::with_timestamp("second", ts(1, 0, 2));
181
182        r1.merge(&r2);
183        // node_id 2 > node_id 1, so r2 wins the tie
184        assert_eq!(*r1.value(), "second");
185    }
186
187    #[test]
188    fn with_clock_creates_register() {
189        let mut clock = HybridClock::new(1);
190        let r = LWWRegister::new("hello", &mut clock);
191        assert_eq!(*r.value(), "hello");
192    }
193
194    #[test]
195    fn set_with_clock_advances_timestamp() {
196        let mut clock = HybridClock::new(1);
197        let mut r = LWWRegister::new("v1", &mut clock);
198        let ts1 = r.timestamp();
199
200        r.set("v2", &mut clock);
201        assert_eq!(*r.value(), "v2");
202        assert!(r.timestamp() > ts1);
203    }
204
205    #[test]
206    fn hlc_register_merge_respects_causality() {
207        let mut clock1 = HybridClock::new(1);
208        let mut clock2 = HybridClock::new(2);
209
210        let mut r1 = LWWRegister::new("first", &mut clock1);
211        let r2 = LWWRegister::new("second", &mut clock2);
212
213        r1.merge(&r2);
214        assert!(*r1.value() == "first" || *r1.value() == "second");
215    }
216
217    #[test]
218    fn merge_is_idempotent() {
219        let mut r1 = LWWRegister::with_timestamp("x", ts(1, 0, 1));
220        let r2 = LWWRegister::with_timestamp("y", ts(2, 0, 2));
221
222        r1.merge(&r2);
223        let after_first = r1.clone();
224        r1.merge(&r2);
225
226        assert_eq!(r1, after_first);
227    }
228
229    #[test]
230    fn delta_apply_equivalent_to_merge() {
231        let r1 = LWWRegister::with_timestamp("old", ts(1, 0, 1));
232        let r2 = LWWRegister::with_timestamp("new", ts(2, 0, 2));
233
234        let mut via_merge = r1.clone();
235        via_merge.merge(&r2);
236
237        let mut via_delta = r1.clone();
238        let d = r2.delta(&r1);
239        via_delta.apply_delta(&d);
240
241        assert_eq!(*via_merge.value(), *via_delta.value());
242    }
243
244    #[test]
245    fn delta_is_empty_when_other_is_newer() {
246        let r1 = LWWRegister::with_timestamp("old", ts(1, 0, 1));
247        let r2 = LWWRegister::with_timestamp("new", ts(2, 0, 2));
248
249        let d = r1.delta(&r2);
250        assert!(d.update.is_none());
251    }
252}