Skip to main content

constraint_crdt/
vclock.rs

1//! # Vector Clock
2//!
3//! Causal ordering for fleet node operations. Before you can merge states,
4//! you need to know which events happened-before which.
5//!
6//! Vector clocks give us:
7//! - **Happened-before**: `a < b` means a definitely happened before b
8//! - **Concurrent**: neither `a < b` nor `b < a` — both happened independently
9//! - **Causality**: merge operations are only valid between causally related states
10
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::fmt;
14
15/// A vector clock for causal ordering across fleet nodes.
16#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
17pub struct VectorClock {
18    /// Per-node logical timestamps
19    clock: HashMap<String, u64>,
20}
21
22/// The causal relationship between two vector clocks.
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum CausalOrder {
25    /// a happened-before b (a is strictly older)
26    Before,
27    /// b happened-before a (a is strictly newer)
28    After,
29    /// Neither happened before the other — concurrent events
30    Concurrent,
31    /// Identical clocks
32    Equal,
33}
34
35impl VectorClock {
36    /// Create a new empty vector clock.
37    pub fn new() -> Self {
38        Self {
39            clock: HashMap::new(),
40        }
41    }
42
43    /// Create with a single node's timestamp.
44    pub fn from_node(node: &str, time: u64) -> Self {
45        let mut clock = HashMap::new();
46        clock.insert(node.to_string(), time);
47        Self { clock }
48    }
49
50    /// Increment a node's counter (after a local event).
51    pub fn increment(&mut self, node: &str) -> u64 {
52        let entry = self.clock.entry(node.to_string()).or_insert(0);
53        *entry += 1;
54        *entry
55    }
56
57    /// Get a node's counter.
58    pub fn get(&self, node: &str) -> u64 {
59        self.clock.get(node).copied().unwrap_or(0)
60    }
61
62    /// Merge with another vector clock (element-wise max).
63    /// Returns self after merging.
64    pub fn merge(&mut self, other: &Self) {
65        for (node, time) in &other.clock {
66            let entry = self.clock.entry(node.clone()).or_insert(0);
67            *entry = (*entry).max(*time);
68        }
69    }
70
71    /// Compare causal ordering with another clock.
72    pub fn compare(&self, other: &Self) -> CausalOrder {
73        let all_nodes: std::collections::HashSet<&String> =
74            self.clock.keys().chain(other.clock.keys()).collect();
75
76        let mut self_less = false;
77        let mut other_less = false;
78
79        for node in &all_nodes {
80            let s = self.clock.get(*node).copied().unwrap_or(0);
81            let o = other.clock.get(*node).copied().unwrap_or(0);
82            if s < o { self_less = true; }
83            if s > o { other_less = true; }
84        }
85
86        match (self_less, other_less) {
87            (false, false) => CausalOrder::Equal,
88            (true, false) => CausalOrder::Before,
89            (false, true) => CausalOrder::After,
90            (true, true) => CausalOrder::Concurrent,
91        }
92    }
93
94    /// Is this clock happened-before or equal to other?
95    pub fn happened_before_or_equal(&self, other: &Self) -> bool {
96        matches!(self.compare(other), CausalOrder::Before | CausalOrder::Equal)
97    }
98
99    /// Is this clock concurrent with other?
100    pub fn is_concurrent(&self, other: &Self) -> bool {
101        self.compare(other) == CausalOrder::Concurrent
102    }
103
104    /// Number of nodes in this clock.
105    pub fn node_count(&self) -> usize {
106        self.clock.len()
107    }
108
109    /// Total logical time (sum of all counters).
110    pub fn total_time(&self) -> u64 {
111        self.clock.values().sum()
112    }
113
114    /// All nodes tracked.
115    pub fn nodes(&self) -> Vec<&String> {
116        self.clock.keys().collect()
117    }
118
119    /// Serialize to JSON.
120    pub fn to_json(&self) -> String {
121        serde_json::to_string(self).unwrap_or_default()
122    }
123
124    /// Deserialize from JSON.
125    pub fn from_json(s: &str) -> Option<Self> {
126        serde_json::from_str(s).ok()
127    }
128}
129
130impl fmt::Display for VectorClock {
131    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132        let mut entries: Vec<_> = self.clock.iter().collect();
133        entries.sort_by_key(|(k, _)| *k);
134        write!(f, "VC{{")?;
135        for (i, (node, time)) in entries.iter().enumerate() {
136            if i > 0 { write!(f, ", ")?; }
137            write!(f, "{}:{}", node, time)?;
138        }
139        write!(f, "}}")
140    }
141}
142
143#[cfg(test)]
144mod tests {
145    use super::*;
146
147    #[test]
148    fn test_increment() {
149        let mut vc = VectorClock::new();
150        assert_eq!(vc.increment("a"), 1);
151        assert_eq!(vc.increment("a"), 2);
152        assert_eq!(vc.increment("b"), 1);
153        assert_eq!(vc.get("a"), 2);
154        assert_eq!(vc.get("b"), 1);
155        assert_eq!(vc.get("c"), 0); // Unknown node
156    }
157
158    #[test]
159    fn test_compare_equal() {
160        let mut a = VectorClock::new();
161        a.increment("a");
162        assert_eq!(a.compare(&a), CausalOrder::Equal);
163    }
164
165    #[test]
166    fn test_compare_before() {
167        let mut a = VectorClock::new();
168        a.increment("a"); // a: {a:1}
169
170        let mut b = VectorClock::new();
171        b.increment("a"); // b: {a:1}
172        b.increment("a"); // b: {a:2}
173
174        assert_eq!(a.compare(&b), CausalOrder::Before);
175        assert_eq!(b.compare(&a), CausalOrder::After);
176    }
177
178    #[test]
179    fn test_compare_concurrent() {
180        let mut a = VectorClock::new();
181        a.increment("a"); // a: {a:1}
182
183        let mut b = VectorClock::new();
184        b.increment("b"); // b: {b:1}
185
186        assert_eq!(a.compare(&b), CausalOrder::Concurrent);
187        assert!(a.is_concurrent(&b));
188    }
189
190    #[test]
191    fn test_happened_before() {
192        let mut a = VectorClock::new();
193        a.increment("a");
194        let mut b = a.clone();
195        b.increment("a");
196        assert!(a.happened_before_or_equal(&b));
197        assert!(!b.happened_before_or_equal(&a));
198    }
199
200    #[test]
201    fn test_merge() {
202        let mut a = VectorClock::new();
203        a.increment("a"); // {a:1}
204
205        let mut b = VectorClock::new();
206        b.increment("b"); // {b:1}
207
208        a.merge(&b); // {a:1, b:1}
209        assert_eq!(a.get("a"), 1);
210        assert_eq!(a.get("b"), 1);
211    }
212
213    #[test]
214    fn test_merge_takes_max() {
215        let mut a = VectorClock::from_node("x", 5);
216        let mut b = VectorClock::from_node("x", 10);
217        a.merge(&b);
218        assert_eq!(a.get("x"), 10);
219    }
220
221    #[test]
222    fn test_display() {
223        let mut vc = VectorClock::new();
224        vc.increment("forgemaster");
225        vc.increment("oracle1");
226        let s = format!("{}", vc);
227        assert!(s.contains("forgemaster:1"));
228        assert!(s.contains("oracle1:1"));
229    }
230
231    #[test]
232    fn test_json_roundtrip() {
233        let mut vc = VectorClock::new();
234        vc.increment("a");
235        vc.increment("b");
236        let json = vc.to_json();
237        let restored = VectorClock::from_json(&json).unwrap();
238        assert_eq!(vc, restored);
239    }
240
241    #[test]
242    fn test_total_time() {
243        let mut vc = VectorClock::new();
244        vc.increment("a"); // 1
245        vc.increment("a"); // 2
246        vc.increment("b"); // 1
247        assert_eq!(vc.total_time(), 3);
248    }
249
250    #[test]
251    fn test_three_node_causality() {
252        // a → b → c chain
253        let mut a = VectorClock::new();
254        a.increment("a"); // {a:1}
255
256        let mut b = a.clone();
257        b.increment("b"); // {a:1, b:1}
258
259        let mut c = b.clone();
260        c.increment("c"); // {a:1, b:1, c:1}
261
262        assert_eq!(a.compare(&c), CausalOrder::Before);
263        assert_eq!(c.compare(&a), CausalOrder::After);
264        assert!(a.happened_before_or_equal(&c));
265    }
266}