Skip to main content

oxigdal_sync/
vector_clock.rs

1//! Vector clock implementation for causality tracking
2//!
3//! Vector clocks provide a mechanism to capture causal relationships between
4//! events in distributed systems. They allow us to determine if events are
5//! causally related or concurrent.
6
7use crate::{DeviceId, Timestamp, VersionVector};
8use serde::{Deserialize, Serialize};
9use std::cmp::Ordering;
10use std::collections::HashMap;
11
12/// Comparison result for vector clocks
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum ClockOrdering {
15    /// This clock is less than (happened before) the other
16    Before,
17    /// This clock is greater than (happened after) the other
18    After,
19    /// Clocks are equal (same causal history)
20    Equal,
21    /// Clocks are concurrent (independent causal histories)
22    Concurrent,
23}
24
25/// Vector clock for tracking causality in distributed systems
26///
27/// A vector clock is a data structure used for determining the partial ordering
28/// of events in a distributed system and detecting causality violations.
29#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
30pub struct VectorClock {
31    /// Map of device ID to logical timestamp
32    clock: VersionVector,
33    /// The device ID that owns this clock
34    device_id: DeviceId,
35}
36
37impl VectorClock {
38    /// Creates a new vector clock for the given device
39    ///
40    /// # Arguments
41    ///
42    /// * `device_id` - The identifier of the device owning this clock
43    ///
44    /// # Example
45    ///
46    /// ```rust
47    /// use oxigdal_sync::vector_clock::VectorClock;
48    ///
49    /// let clock = VectorClock::new("device-1".to_string());
50    /// ```
51    pub fn new(device_id: DeviceId) -> Self {
52        let mut clock = HashMap::new();
53        clock.insert(device_id.clone(), 0);
54
55        Self { clock, device_id }
56    }
57
58    /// Creates a vector clock from an existing version vector
59    ///
60    /// # Arguments
61    ///
62    /// * `device_id` - The device ID
63    /// * `clock` - Existing version vector
64    pub fn from_version_vector(device_id: DeviceId, clock: VersionVector) -> Self {
65        Self { clock, device_id }
66    }
67
68    /// Increments the clock for this device
69    ///
70    /// This should be called when a new event occurs on this device.
71    ///
72    /// # Example
73    ///
74    /// ```rust
75    /// use oxigdal_sync::vector_clock::VectorClock;
76    ///
77    /// let mut clock = VectorClock::new("device-1".to_string());
78    /// clock.tick();
79    /// assert_eq!(clock.get_time(&"device-1".to_string()), Some(1));
80    /// ```
81    pub fn tick(&mut self) -> Timestamp {
82        let entry = self.clock.entry(self.device_id.clone()).or_insert(0);
83        *entry += 1;
84        *entry
85    }
86
87    /// Gets the timestamp for a specific device
88    ///
89    /// # Arguments
90    ///
91    /// * `device_id` - The device to query
92    ///
93    /// # Returns
94    ///
95    /// The timestamp for the device, or None if the device is not in the clock
96    pub fn get_time(&self, device_id: &DeviceId) -> Option<Timestamp> {
97        self.clock.get(device_id).copied()
98    }
99
100    /// Gets the current timestamp for this device
101    pub fn current_time(&self) -> Timestamp {
102        self.clock.get(&self.device_id).copied().unwrap_or(0)
103    }
104
105    /// Merges another vector clock into this one
106    ///
107    /// Takes the maximum timestamp for each device. This is used when
108    /// receiving an event from another device.
109    ///
110    /// # Arguments
111    ///
112    /// * `other` - The clock to merge
113    ///
114    /// # Example
115    ///
116    /// ```rust
117    /// use oxigdal_sync::vector_clock::VectorClock;
118    ///
119    /// let mut clock1 = VectorClock::new("device-1".to_string());
120    /// let mut clock2 = VectorClock::new("device-2".to_string());
121    ///
122    /// clock1.tick();
123    /// clock2.tick();
124    ///
125    /// clock1.merge(&clock2);
126    /// assert_eq!(clock1.get_time(&"device-2".to_string()), Some(1));
127    /// ```
128    pub fn merge(&mut self, other: &VectorClock) {
129        for (device_id, &timestamp) in &other.clock {
130            let entry = self.clock.entry(device_id.clone()).or_insert(0);
131            *entry = (*entry).max(timestamp);
132        }
133    }
134
135    /// Compares this clock with another to determine causal ordering
136    ///
137    /// # Arguments
138    ///
139    /// * `other` - The clock to compare with
140    ///
141    /// # Returns
142    ///
143    /// The ordering relationship between the clocks
144    ///
145    /// # Example
146    ///
147    /// ```rust
148    /// use oxigdal_sync::vector_clock::{VectorClock, ClockOrdering};
149    ///
150    /// let mut clock1 = VectorClock::new("device-1".to_string());
151    /// let mut clock2 = VectorClock::new("device-1".to_string());
152    ///
153    /// clock2.tick();
154    ///
155    /// assert_eq!(clock1.compare(&clock2), ClockOrdering::Before);
156    /// assert_eq!(clock2.compare(&clock1), ClockOrdering::After);
157    /// ```
158    pub fn compare(&self, other: &VectorClock) -> ClockOrdering {
159        let mut less_than = false;
160        let mut greater_than = false;
161
162        // Collect all device IDs from both clocks
163        let mut all_devices = self.clock.keys().collect::<Vec<_>>();
164        for device in other.clock.keys() {
165            if !all_devices.contains(&device) {
166                all_devices.push(device);
167            }
168        }
169
170        // Compare timestamps for each device
171        for device_id in all_devices {
172            let self_time = self.clock.get(device_id).copied().unwrap_or(0);
173            let other_time = other.clock.get(device_id).copied().unwrap_or(0);
174
175            match self_time.cmp(&other_time) {
176                Ordering::Less => less_than = true,
177                Ordering::Greater => greater_than = true,
178                Ordering::Equal => {}
179            }
180        }
181
182        match (less_than, greater_than) {
183            (false, false) => ClockOrdering::Equal,
184            (true, false) => ClockOrdering::Before,
185            (false, true) => ClockOrdering::After,
186            (true, true) => ClockOrdering::Concurrent,
187        }
188    }
189
190    /// Checks if this clock happened before the other
191    pub fn happened_before(&self, other: &VectorClock) -> bool {
192        matches!(self.compare(other), ClockOrdering::Before)
193    }
194
195    /// Checks if this clock happened after the other
196    pub fn happened_after(&self, other: &VectorClock) -> bool {
197        matches!(self.compare(other), ClockOrdering::After)
198    }
199
200    /// Checks if clocks are concurrent (causally independent)
201    pub fn is_concurrent(&self, other: &VectorClock) -> bool {
202        matches!(self.compare(other), ClockOrdering::Concurrent)
203    }
204
205    /// Gets the device ID associated with this clock
206    pub fn device_id(&self) -> &DeviceId {
207        &self.device_id
208    }
209
210    /// Gets a reference to the underlying version vector
211    pub fn as_version_vector(&self) -> &VersionVector {
212        &self.clock
213    }
214
215    /// Converts the clock into a version vector
216    pub fn into_version_vector(self) -> VersionVector {
217        self.clock
218    }
219
220    /// Creates a clone with a new device ID
221    pub fn with_device_id(&self, device_id: DeviceId) -> Self {
222        Self {
223            clock: self.clock.clone(),
224            device_id,
225        }
226    }
227
228    /// Updates the timestamp for a specific device
229    ///
230    /// # Arguments
231    ///
232    /// * `device_id` - The device to update
233    /// * `timestamp` - The new timestamp
234    pub fn update(&mut self, device_id: DeviceId, timestamp: Timestamp) {
235        let entry = self.clock.entry(device_id).or_insert(0);
236        *entry = (*entry).max(timestamp);
237    }
238
239    /// Gets the sum of all timestamps (useful for debugging)
240    pub fn sum(&self) -> Timestamp {
241        self.clock.values().sum()
242    }
243
244    /// Checks if the clock is empty (no devices)
245    pub fn is_empty(&self) -> bool {
246        self.clock.is_empty()
247    }
248
249    /// Gets the number of devices in the clock
250    pub fn len(&self) -> usize {
251        self.clock.len()
252    }
253
254    /// Gets all device IDs in the clock
255    pub fn devices(&self) -> Vec<DeviceId> {
256        self.clock.keys().cloned().collect()
257    }
258}
259
260impl Default for VectorClock {
261    fn default() -> Self {
262        Self::new("default".to_string())
263    }
264}
265
266/// Extension trait for working with causality
267pub trait CausalOrdering {
268    /// Checks if this event causally precedes another
269    fn precedes(&self, other: &Self) -> bool;
270
271    /// Checks if this event is concurrent with another
272    fn concurrent_with(&self, other: &Self) -> bool;
273}
274
275impl CausalOrdering for VectorClock {
276    fn precedes(&self, other: &Self) -> bool {
277        self.happened_before(other)
278    }
279
280    fn concurrent_with(&self, other: &Self) -> bool {
281        self.is_concurrent(other)
282    }
283}
284
285#[cfg(test)]
286mod tests {
287    use super::*;
288
289    #[test]
290    fn test_vector_clock_creation() {
291        let clock = VectorClock::new("device-1".to_string());
292        assert_eq!(clock.get_time(&"device-1".to_string()), Some(0));
293        assert_eq!(clock.device_id(), "device-1");
294    }
295
296    #[test]
297    fn test_tick() {
298        let mut clock = VectorClock::new("device-1".to_string());
299        assert_eq!(clock.tick(), 1);
300        assert_eq!(clock.tick(), 2);
301        assert_eq!(clock.get_time(&"device-1".to_string()), Some(2));
302    }
303
304    #[test]
305    fn test_merge() {
306        let mut clock1 = VectorClock::new("device-1".to_string());
307        let mut clock2 = VectorClock::new("device-2".to_string());
308
309        clock1.tick();
310        clock1.tick();
311        clock2.tick();
312
313        clock1.merge(&clock2);
314
315        assert_eq!(clock1.get_time(&"device-1".to_string()), Some(2));
316        assert_eq!(clock1.get_time(&"device-2".to_string()), Some(1));
317    }
318
319    #[test]
320    fn test_compare_equal() {
321        let clock1 = VectorClock::new("device-1".to_string());
322        let clock2 = VectorClock::new("device-1".to_string());
323
324        assert_eq!(clock1.compare(&clock2), ClockOrdering::Equal);
325    }
326
327    #[test]
328    fn test_compare_before() {
329        let clock1 = VectorClock::new("device-1".to_string());
330        let mut clock2 = VectorClock::new("device-1".to_string());
331
332        clock2.tick();
333
334        assert_eq!(clock1.compare(&clock2), ClockOrdering::Before);
335        assert!(clock1.happened_before(&clock2));
336    }
337
338    #[test]
339    fn test_compare_after() {
340        let mut clock1 = VectorClock::new("device-1".to_string());
341        let clock2 = VectorClock::new("device-1".to_string());
342
343        clock1.tick();
344
345        assert_eq!(clock1.compare(&clock2), ClockOrdering::After);
346        assert!(clock1.happened_after(&clock2));
347    }
348
349    #[test]
350    fn test_compare_concurrent() {
351        let mut clock1 = VectorClock::new("device-1".to_string());
352        let mut clock2 = VectorClock::new("device-2".to_string());
353
354        clock1.tick();
355        clock2.tick();
356
357        assert_eq!(clock1.compare(&clock2), ClockOrdering::Concurrent);
358        assert!(clock1.is_concurrent(&clock2));
359    }
360
361    #[test]
362    fn test_complex_merge() {
363        let mut clock1 = VectorClock::new("device-1".to_string());
364        let mut clock2 = VectorClock::new("device-2".to_string());
365        let mut clock3 = VectorClock::new("device-3".to_string());
366
367        clock1.tick();
368        clock1.tick();
369
370        clock2.tick();
371        clock2.merge(&clock1);
372        clock2.tick();
373
374        clock3.tick();
375
376        clock1.merge(&clock2);
377        clock1.merge(&clock3);
378
379        assert_eq!(clock1.get_time(&"device-1".to_string()), Some(2));
380        assert_eq!(clock1.get_time(&"device-2".to_string()), Some(2));
381        assert_eq!(clock1.get_time(&"device-3".to_string()), Some(1));
382    }
383}