Skip to main content

oxigdal_sync/crdt/
lww_register.rs

1//! Last-Write-Wins Register CRDT
2//!
3//! A register that resolves conflicts by keeping the value with the
4//! highest timestamp (last write wins).
5
6use crate::crdt::{Crdt, DeviceAware};
7use crate::vector_clock::{ClockOrdering, VectorClock};
8use crate::{DeviceId, SyncResult};
9use serde::{Deserialize, Serialize};
10
11/// Last-Write-Wins Register
12///
13/// A simple CRDT that stores a single value and resolves conflicts
14/// by keeping the value with the most recent timestamp.
15///
16/// # Example
17///
18/// ```rust
19/// use oxigdal_sync::crdt::{LwwRegister, Crdt};
20///
21/// let mut reg1 = LwwRegister::new("device-1".to_string(), "hello".to_string());
22/// let mut reg2 = LwwRegister::new("device-2".to_string(), "world".to_string());
23///
24/// reg1.set("updated".to_string());
25/// reg1.merge(&reg2).ok();
26/// ```
27#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
28#[serde(bound(serialize = "T: Serialize"))]
29#[serde(bound(deserialize = "T: serde::de::DeserializeOwned"))]
30pub struct LwwRegister<T>
31where
32    T: Clone,
33{
34    /// The current value
35    value: T,
36    /// Vector clock for causality tracking
37    clock: VectorClock,
38    /// Device ID
39    device_id: DeviceId,
40}
41
42impl<T> LwwRegister<T>
43where
44    T: Clone,
45{
46    /// Creates a new LWW register with an initial value
47    ///
48    /// # Arguments
49    ///
50    /// * `device_id` - The device ID
51    /// * `initial_value` - The initial value
52    pub fn new(device_id: DeviceId, initial_value: T) -> Self {
53        Self {
54            value: initial_value,
55            clock: VectorClock::new(device_id.clone()),
56            device_id,
57        }
58    }
59
60    /// Gets the current value
61    pub fn get(&self) -> &T {
62        &self.value
63    }
64
65    /// Sets a new value
66    ///
67    /// This increments the local clock to track the update.
68    ///
69    /// # Arguments
70    ///
71    /// * `value` - The new value to set
72    pub fn set(&mut self, value: T) {
73        self.value = value;
74        self.clock.tick();
75    }
76
77    /// Gets the vector clock
78    pub fn clock(&self) -> &VectorClock {
79        &self.clock
80    }
81
82    /// Updates the value with a specific clock
83    ///
84    /// This is useful for applying remote updates.
85    ///
86    /// # Arguments
87    ///
88    /// * `value` - The new value
89    /// * `clock` - The vector clock for the update
90    pub fn set_with_clock(&mut self, value: T, clock: VectorClock) {
91        match self.clock.compare(&clock) {
92            ClockOrdering::Before => {
93                // Remote update is newer
94                self.value = value;
95                self.clock = clock;
96            }
97            ClockOrdering::Concurrent => {
98                // Concurrent updates - use deterministic tie-breaking
99                if self.tie_break(&clock) {
100                    self.value = value;
101                }
102                self.clock.merge(&clock);
103            }
104            _ => {
105                // Our update is newer or equal, keep it
106                self.clock.merge(&clock);
107            }
108        }
109    }
110
111    /// Deterministic tie-breaking for concurrent updates
112    ///
113    /// Uses device ID comparison as a tie-breaker.
114    fn tie_break(&self, other_clock: &VectorClock) -> bool {
115        other_clock.device_id() > &self.device_id
116    }
117}
118
119impl<T> Crdt for LwwRegister<T>
120where
121    T: Clone + Serialize + for<'de> serde::Deserialize<'de>,
122{
123    fn merge(&mut self, other: &Self) -> SyncResult<()> {
124        match self.clock.compare(&other.clock) {
125            ClockOrdering::Before => {
126                // Other is newer, adopt its value
127                self.value = other.value.clone();
128                self.clock = other.clock.clone();
129            }
130            ClockOrdering::Concurrent => {
131                // Concurrent updates - use tie-breaking
132                if self.tie_break(&other.clock) {
133                    self.value = other.value.clone();
134                }
135                self.clock.merge(&other.clock);
136            }
137            _ => {
138                // We are newer or equal, just merge clocks
139                self.clock.merge(&other.clock);
140            }
141        }
142        Ok(())
143    }
144
145    fn dominated_by(&self, other: &Self) -> bool {
146        matches!(
147            self.clock.compare(&other.clock),
148            ClockOrdering::Before | ClockOrdering::Equal
149        )
150    }
151}
152
153impl<T> DeviceAware for LwwRegister<T>
154where
155    T: Clone + Serialize + for<'de> serde::Deserialize<'de>,
156{
157    fn device_id(&self) -> &DeviceId {
158        &self.device_id
159    }
160
161    fn set_device_id(&mut self, device_id: DeviceId) {
162        self.device_id = device_id.clone();
163        self.clock = self.clock.with_device_id(device_id);
164    }
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170
171    #[test]
172    fn test_lww_register_creation() {
173        let reg = LwwRegister::new("device-1".to_string(), 42);
174        assert_eq!(*reg.get(), 42);
175        assert_eq!(reg.device_id(), "device-1");
176    }
177
178    #[test]
179    fn test_lww_register_set() {
180        let mut reg = LwwRegister::new("device-1".to_string(), 42);
181        reg.set(100);
182        assert_eq!(*reg.get(), 100);
183    }
184
185    #[test]
186    fn test_lww_register_merge_before() {
187        let mut reg1 = LwwRegister::new("device-1".to_string(), 42);
188        let mut reg2 = LwwRegister::new("device-1".to_string(), 42);
189
190        reg2.set(100);
191
192        reg1.merge(&reg2).ok();
193        assert_eq!(*reg1.get(), 100);
194    }
195
196    #[test]
197    fn test_lww_register_merge_after() {
198        let mut reg1 = LwwRegister::new("device-1".to_string(), 42);
199        let reg2 = LwwRegister::new("device-1".to_string(), 42);
200
201        reg1.set(100);
202
203        reg1.merge(&reg2).ok();
204        assert_eq!(*reg1.get(), 100);
205    }
206
207    #[test]
208    fn test_lww_register_concurrent() {
209        let mut reg1 = LwwRegister::new("device-1".to_string(), 42);
210        let mut reg2 = LwwRegister::new("device-2".to_string(), 42);
211
212        reg1.set(100);
213        reg2.set(200);
214
215        let _initial = *reg1.get();
216        reg1.merge(&reg2).ok();
217
218        // Should use tie-breaking (device-2 > device-1)
219        assert_eq!(*reg1.get(), 200);
220    }
221
222    #[test]
223    fn test_dominated_by() {
224        let reg1 = LwwRegister::new("device-1".to_string(), 42);
225        let mut reg2 = LwwRegister::new("device-1".to_string(), 42);
226
227        reg2.set(100);
228
229        assert!(reg1.dominated_by(&reg2));
230        assert!(!reg2.dominated_by(&reg1));
231    }
232}