spec_ai_config/sync/
vector_clock.rs

1use anyhow::{Context, Result};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4
5/// Vector clock for tracking causality in distributed systems
6/// Each instance maintains its own logical clock version
7#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
8pub struct VectorClock {
9    /// Map of instance_id to version number
10    versions: HashMap<String, i64>,
11}
12
13/// Result of comparing two vector clocks
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum ClockOrder {
16    /// First clock happens-before second (causally ordered)
17    Before,
18    /// Second clock happens-before first (causally ordered)
19    After,
20    /// Clocks are concurrent (need conflict resolution)
21    Concurrent,
22    /// Clocks are identical
23    Equal,
24}
25
26impl VectorClock {
27    /// Create a new empty vector clock
28    pub fn new() -> Self {
29        Self {
30            versions: HashMap::new(),
31        }
32    }
33
34    /// Create a vector clock from a JSON string
35    pub fn from_json(json: &str) -> Result<Self> {
36        if json.is_empty() || json == "{}" {
37            return Ok(Self::new());
38        }
39        serde_json::from_str(json).context("parsing vector clock JSON")
40    }
41
42    /// Serialize to JSON string for storage
43    pub fn to_json(&self) -> Result<String> {
44        serde_json::to_string(&self).context("serializing vector clock")
45    }
46
47    /// Get the version for a specific instance (0 if not present)
48    pub fn get(&self, instance_id: &str) -> i64 {
49        self.versions.get(instance_id).copied().unwrap_or(0)
50    }
51
52    /// Set the version for a specific instance
53    pub fn set(&mut self, instance_id: String, version: i64) {
54        self.versions.insert(instance_id, version);
55    }
56
57    /// Increment the version for the given instance
58    pub fn increment(&mut self, instance_id: &str) -> i64 {
59        let version = self.get(instance_id) + 1;
60        self.versions.insert(instance_id.to_string(), version);
61        version
62    }
63
64    /// Merge another vector clock, taking the maximum version for each instance
65    /// This is used when receiving updates from other instances
66    pub fn merge(&mut self, other: &VectorClock) {
67        for (instance_id, &other_version) in &other.versions {
68            let current_version = self.get(instance_id);
69            if other_version > current_version {
70                self.versions.insert(instance_id.clone(), other_version);
71            }
72        }
73    }
74
75    /// Compare this clock with another to determine causality relationship
76    /// Returns:
77    /// - Before: self happened-before other (self is older)
78    /// - After: other happened-before self (self is newer)
79    /// - Concurrent: neither happened-before the other (conflict)
80    /// - Equal: clocks are identical
81    pub fn compare(&self, other: &VectorClock) -> ClockOrder {
82        let mut self_less_or_equal = true;
83        let mut other_less_or_equal = true;
84
85        // Get all instance IDs from both clocks
86        let all_instances: std::collections::HashSet<_> =
87            self.versions.keys().chain(other.versions.keys()).collect();
88
89        for instance_id in all_instances {
90            let self_version = self.get(instance_id);
91            let other_version = other.get(instance_id);
92
93            if self_version > other_version {
94                self_less_or_equal = false;
95            }
96            if other_version > self_version {
97                other_less_or_equal = false;
98            }
99        }
100
101        match (self_less_or_equal, other_less_or_equal) {
102            (true, true) => ClockOrder::Equal,
103            (true, false) => ClockOrder::Before,
104            (false, true) => ClockOrder::After,
105            (false, false) => ClockOrder::Concurrent,
106        }
107    }
108
109    /// Check if this clock happened-before another (causally precedes)
110    pub fn happens_before(&self, other: &VectorClock) -> bool {
111        matches!(self.compare(other), ClockOrder::Before)
112    }
113
114    /// Check if this clock is concurrent with another (conflict)
115    pub fn is_concurrent(&self, other: &VectorClock) -> bool {
116        matches!(self.compare(other), ClockOrder::Concurrent)
117    }
118
119    /// Check if this clock is equal to another
120    pub fn is_equal(&self, other: &VectorClock) -> bool {
121        matches!(self.compare(other), ClockOrder::Equal)
122    }
123
124    /// Get all instance IDs tracked by this clock
125    pub fn instances(&self) -> Vec<String> {
126        self.versions.keys().cloned().collect()
127    }
128
129    /// Get the number of instances tracked
130    pub fn instance_count(&self) -> usize {
131        self.versions.len()
132    }
133
134    /// Check if this clock is empty (no versions tracked)
135    pub fn is_empty(&self) -> bool {
136        self.versions.is_empty()
137    }
138
139    /// Clear all versions
140    pub fn clear(&mut self) {
141        self.versions.clear();
142    }
143}
144
145impl Default for VectorClock {
146    fn default() -> Self {
147        Self::new()
148    }
149}
150
151impl std::fmt::Display for VectorClock {
152    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153        write!(f, "{{")?;
154        let mut first = true;
155        for (instance_id, version) in &self.versions {
156            if !first {
157                write!(f, ", ")?;
158            }
159            write!(f, "{}: {}", instance_id, version)?;
160            first = false;
161        }
162        write!(f, "}}")
163    }
164}
165
166#[cfg(test)]
167mod tests {
168    use super::*;
169
170    #[test]
171    fn test_new_clock_is_empty() {
172        let clock = VectorClock::new();
173        assert!(clock.is_empty());
174        assert_eq!(clock.get("instance1"), 0);
175    }
176
177    #[test]
178    fn test_increment() {
179        let mut clock = VectorClock::new();
180        assert_eq!(clock.increment("instance1"), 1);
181        assert_eq!(clock.increment("instance1"), 2);
182        assert_eq!(clock.get("instance1"), 2);
183    }
184
185    #[test]
186    fn test_merge() {
187        let mut clock1 = VectorClock::new();
188        clock1.set("a".to_string(), 1);
189        clock1.set("b".to_string(), 2);
190
191        let mut clock2 = VectorClock::new();
192        clock2.set("b".to_string(), 3);
193        clock2.set("c".to_string(), 1);
194
195        clock1.merge(&clock2);
196
197        assert_eq!(clock1.get("a"), 1);
198        assert_eq!(clock1.get("b"), 3); // max(2, 3)
199        assert_eq!(clock1.get("c"), 1);
200    }
201
202    #[test]
203    fn test_compare_equal() {
204        let mut clock1 = VectorClock::new();
205        clock1.set("a".to_string(), 1);
206
207        let mut clock2 = VectorClock::new();
208        clock2.set("a".to_string(), 1);
209
210        assert_eq!(clock1.compare(&clock2), ClockOrder::Equal);
211        assert!(clock1.is_equal(&clock2));
212    }
213
214    #[test]
215    fn test_compare_before() {
216        let mut clock1 = VectorClock::new();
217        clock1.set("a".to_string(), 1);
218
219        let mut clock2 = VectorClock::new();
220        clock2.set("a".to_string(), 2);
221
222        assert_eq!(clock1.compare(&clock2), ClockOrder::Before);
223        assert!(clock1.happens_before(&clock2));
224    }
225
226    #[test]
227    fn test_compare_after() {
228        let mut clock1 = VectorClock::new();
229        clock1.set("a".to_string(), 2);
230
231        let mut clock2 = VectorClock::new();
232        clock2.set("a".to_string(), 1);
233
234        assert_eq!(clock1.compare(&clock2), ClockOrder::After);
235        assert!(!clock1.happens_before(&clock2));
236    }
237
238    #[test]
239    fn test_compare_concurrent() {
240        let mut clock1 = VectorClock::new();
241        clock1.set("a".to_string(), 2);
242        clock1.set("b".to_string(), 1);
243
244        let mut clock2 = VectorClock::new();
245        clock2.set("a".to_string(), 1);
246        clock2.set("b".to_string(), 2);
247
248        assert_eq!(clock1.compare(&clock2), ClockOrder::Concurrent);
249        assert!(clock1.is_concurrent(&clock2));
250    }
251
252    #[test]
253    fn test_json_serialization() {
254        let mut clock = VectorClock::new();
255        clock.set("instance1".to_string(), 5);
256        clock.set("instance2".to_string(), 3);
257
258        let json = clock.to_json().unwrap();
259        let parsed = VectorClock::from_json(&json).unwrap();
260
261        assert_eq!(clock, parsed);
262        assert_eq!(parsed.get("instance1"), 5);
263        assert_eq!(parsed.get("instance2"), 3);
264    }
265
266    #[test]
267    fn test_empty_json() {
268        let clock = VectorClock::from_json("{}").unwrap();
269        assert!(clock.is_empty());
270
271        let clock2 = VectorClock::from_json("").unwrap();
272        assert!(clock2.is_empty());
273    }
274
275    #[test]
276    fn test_display() {
277        let mut clock = VectorClock::new();
278        clock.set("a".to_string(), 1);
279        clock.set("b".to_string(), 2);
280
281        let display = format!("{}", clock);
282        assert!(display.contains("a: 1"));
283        assert!(display.contains("b: 2"));
284    }
285}