Skip to main content

synckit_core/sync/
vector_clock.rs

1//! Vector Clock implementation for causality tracking
2//!
3//! This implementation follows the TLA+ verified specification in
4//! protocol/tla/vector_clock.tla
5//!
6//! Properties verified:
7//! - CausalityPreserved: Happens-before relationship is correct
8//! - Transitivity: If A→B and B→C, then A→C
9//! - Monotonicity: Clock values only increase
10//! - ConcurrentDetection: Concurrent operations detected correctly
11//! - MergeCorrectness: Clock merging preserves causality
12
13use crate::ClientID;
14use serde::{Deserialize, Serialize};
15use std::cmp::Ordering;
16use std::collections::HashMap;
17
18/// Vector clock for tracking causality between operations
19#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
20pub struct VectorClock {
21    /// Map from ClientID to logical clock value
22    pub clocks: HashMap<ClientID, u64>,
23}
24
25impl VectorClock {
26    /// Create a new empty vector clock
27    pub fn new() -> Self {
28        Self {
29            clocks: HashMap::new(),
30        }
31    }
32
33    /// Create a VectorClock from a Timestamp
34    pub fn from_timestamp(timestamp: &crate::sync::Timestamp) -> Self {
35        let mut clock = Self::new();
36        clock
37            .clocks
38            .insert(timestamp.client_id.clone(), timestamp.clock);
39        clock
40    }
41
42    /// Increment the clock for a specific client
43    pub fn tick(&mut self, client_id: &ClientID) {
44        let counter = self.clocks.entry(client_id.clone()).or_insert(0);
45        *counter += 1;
46    }
47
48    /// Get the clock value for a specific client
49    pub fn get(&self, client_id: &ClientID) -> u64 {
50        *self.clocks.get(client_id).unwrap_or(&0)
51    }
52
53    /// Update the clock for a specific client to a specific value
54    pub fn update(&mut self, client_id: &ClientID, value: u64) {
55        self.clocks.insert(client_id.clone(), value);
56    }
57
58    /// Get all client clocks
59    pub fn clocks(&self) -> &HashMap<ClientID, u64> {
60        &self.clocks
61    }
62
63    /// Merge with another vector clock (take max of each entry)
64    ///
65    /// This operation is used when receiving remote operations.
66    /// It ensures that all causal dependencies are tracked.
67    pub fn merge(&mut self, other: &VectorClock) {
68        for (client_id, &other_clock) in &other.clocks {
69            let entry = self.clocks.entry(client_id.clone()).or_insert(0);
70            *entry = (*entry).max(other_clock);
71        }
72    }
73
74    /// Compare two vector clocks to determine happens-before relationship
75    ///
76    /// Returns:
77    /// - Ordering::Less: self happened before other (self < other)
78    /// - Ordering::Greater: other happened before self (self > other)
79    /// - Ordering::Equal: clocks are identical (rare in distributed systems)
80    ///
81    /// Note: This function returns Equal for concurrent events where neither
82    /// happened before the other. Use `is_concurrent` to explicitly check.
83    pub fn compare(&self, other: &VectorClock) -> Ordering {
84        let mut less = false;
85        let mut greater = false;
86
87        // Get all unique client IDs from both clocks
88        let all_clients: std::collections::HashSet<_> =
89            self.clocks.keys().chain(other.clocks.keys()).collect();
90
91        for client_id in all_clients {
92            let self_clock = self.get(client_id);
93            let other_clock = other.get(client_id);
94
95            match self_clock.cmp(&other_clock) {
96                std::cmp::Ordering::Less => less = true,
97                std::cmp::Ordering::Greater => greater = true,
98                std::cmp::Ordering::Equal => {}
99            }
100        }
101
102        match (less, greater) {
103            (true, false) => Ordering::Less,    // self < other (happened before)
104            (false, true) => Ordering::Greater, // self > other (happened after)
105            (false, false) => Ordering::Equal,  // Identical clocks
106            (true, true) => Ordering::Equal,    // Concurrent (neither happened before)
107        }
108    }
109
110    /// Check if two vector clocks are concurrent (neither happened before the other)
111    pub fn is_concurrent(&self, other: &VectorClock) -> bool {
112        let mut less = false;
113        let mut greater = false;
114
115        let all_clients: std::collections::HashSet<_> =
116            self.clocks.keys().chain(other.clocks.keys()).collect();
117
118        for client_id in all_clients {
119            let self_clock = self.get(client_id);
120            let other_clock = other.get(client_id);
121
122            match self_clock.cmp(&other_clock) {
123                std::cmp::Ordering::Less => less = true,
124                std::cmp::Ordering::Greater => greater = true,
125                std::cmp::Ordering::Equal => {}
126            }
127        }
128
129        // Concurrent if we found both less and greater comparisons
130        less && greater
131    }
132
133    /// Check if self happened before other (self < other)
134    pub fn happened_before(&self, other: &VectorClock) -> bool {
135        self.compare(other) == Ordering::Less
136    }
137}
138
139impl Default for VectorClock {
140    fn default() -> Self {
141        Self::new()
142    }
143}
144
145#[cfg(test)]
146mod tests {
147    use super::*;
148
149    #[test]
150    fn test_tick() {
151        let mut clock = VectorClock::new();
152        assert_eq!(clock.get(&"c1".to_string()), 0);
153
154        clock.tick(&"c1".to_string());
155        assert_eq!(clock.get(&"c1".to_string()), 1);
156
157        clock.tick(&"c1".to_string());
158        assert_eq!(clock.get(&"c1".to_string()), 2);
159    }
160
161    #[test]
162    fn test_merge() {
163        let mut clock1 = VectorClock::new();
164        clock1.tick(&"c1".to_string());
165        clock1.tick(&"c1".to_string()); // c1: 2
166
167        let mut clock2 = VectorClock::new();
168        clock2.tick(&"c2".to_string());
169        clock2.tick(&"c2".to_string());
170        clock2.tick(&"c2".to_string()); // c2: 3
171
172        // Merge clock2 into clock1
173        clock1.merge(&clock2);
174
175        // Should have max of both
176        assert_eq!(clock1.get(&"c1".to_string()), 2);
177        assert_eq!(clock1.get(&"c2".to_string()), 3);
178    }
179
180    #[test]
181    fn test_compare_happened_before() {
182        let mut clock1 = VectorClock::new();
183        clock1.tick(&"c1".to_string()); // {c1: 1}
184
185        let mut clock2 = VectorClock::new();
186        clock2.tick(&"c1".to_string());
187        clock2.tick(&"c1".to_string()); // {c1: 2}
188
189        // clock1 happened before clock2
190        assert_eq!(clock1.compare(&clock2), Ordering::Less);
191        assert!(clock1.happened_before(&clock2));
192
193        // clock2 happened after clock1
194        assert_eq!(clock2.compare(&clock1), Ordering::Greater);
195        assert!(!clock2.happened_before(&clock1));
196    }
197
198    #[test]
199    fn test_concurrent() {
200        let mut clock1 = VectorClock::new();
201        clock1.tick(&"c1".to_string()); // {c1: 1}
202
203        let mut clock2 = VectorClock::new();
204        clock2.tick(&"c2".to_string()); // {c2: 1}
205
206        // These are concurrent (neither happened before the other)
207        assert!(clock1.is_concurrent(&clock2));
208        assert!(clock2.is_concurrent(&clock1));
209
210        // compare returns Equal for concurrent clocks
211        assert_eq!(clock1.compare(&clock2), Ordering::Equal);
212    }
213
214    #[test]
215    fn test_identical_clocks() {
216        let mut clock1 = VectorClock::new();
217        clock1.tick(&"c1".to_string());
218
219        let mut clock2 = VectorClock::new();
220        clock2.tick(&"c1".to_string());
221
222        // Identical clocks
223        assert_eq!(clock1.compare(&clock2), Ordering::Equal);
224        assert!(!clock1.is_concurrent(&clock2)); // Not concurrent, just equal
225    }
226
227    #[test]
228    fn test_merge_preserves_causality() {
229        // Test the MergeCorrectness property from TLA+
230        let mut clock_a = VectorClock::new();
231        clock_a.tick(&"c1".to_string());
232
233        let mut clock_b = VectorClock::new();
234        clock_b.tick(&"c2".to_string());
235
236        let mut clock_merged = clock_a.clone();
237        clock_merged.merge(&clock_b);
238
239        // Merged clock should be >= both inputs
240        assert!(clock_merged.compare(&clock_a) != Ordering::Less);
241        assert!(clock_merged.compare(&clock_b) != Ordering::Less);
242    }
243}