Skip to main content

kyu_delta/
vector_clock.rs

1use hashbrown::HashMap;
2use smol_str::SmolStr;
3
4/// Logical vector clock for causal ordering between workers.
5///
6/// Each entry maps a worker ID to its logical timestamp.
7/// Used only when workers have causal dependencies. For most workloads,
8/// this is `None` on `DeltaBatch` — timestamp alone handles
9/// last-write-wins convergence.
10#[derive(Clone, Debug, Default, PartialEq, Eq)]
11pub struct VectorClock {
12    entries: HashMap<SmolStr, u64>,
13}
14
15impl VectorClock {
16    pub fn new() -> Self {
17        Self::default()
18    }
19
20    /// Get the timestamp for a specific worker (0 if absent).
21    pub fn get(&self, worker_id: &str) -> u64 {
22        self.entries.get(worker_id).copied().unwrap_or(0)
23    }
24
25    /// Set the timestamp for a specific worker.
26    pub fn set(&mut self, worker_id: impl Into<SmolStr>, ts: u64) {
27        self.entries.insert(worker_id.into(), ts);
28    }
29
30    /// Increment the timestamp for a worker, returning the new value.
31    pub fn tick(&mut self, worker_id: impl Into<SmolStr>) -> u64 {
32        let key = worker_id.into();
33        let entry = self.entries.entry(key).or_insert(0);
34        *entry += 1;
35        *entry
36    }
37
38    /// Merge another clock (component-wise maximum).
39    pub fn merge(&mut self, other: &VectorClock) {
40        for (k, &v) in &other.entries {
41            let entry = self.entries.entry(k.clone()).or_insert(0);
42            *entry = (*entry).max(v);
43        }
44    }
45
46    /// Returns true if every entry in `self` is <= the corresponding entry in `other`.
47    pub fn happens_before(&self, other: &VectorClock) -> bool {
48        self.entries.iter().all(|(k, &v)| v <= other.get(k))
49    }
50
51    pub fn len(&self) -> usize {
52        self.entries.len()
53    }
54
55    pub fn is_empty(&self) -> bool {
56        self.entries.is_empty()
57    }
58}
59
60#[cfg(test)]
61mod tests {
62    use super::*;
63
64    #[test]
65    fn new_is_empty() {
66        let vc = VectorClock::new();
67        assert!(vc.is_empty());
68        assert_eq!(vc.len(), 0);
69    }
70
71    #[test]
72    fn set_and_get() {
73        let mut vc = VectorClock::new();
74        vc.set("w1", 5);
75        assert_eq!(vc.get("w1"), 5);
76    }
77
78    #[test]
79    fn get_missing_returns_zero() {
80        let vc = VectorClock::new();
81        assert_eq!(vc.get("nonexistent"), 0);
82    }
83
84    #[test]
85    fn tick_increments() {
86        let mut vc = VectorClock::new();
87        assert_eq!(vc.tick("w1"), 1);
88        assert_eq!(vc.tick("w1"), 2);
89        assert_eq!(vc.tick("w1"), 3);
90        assert_eq!(vc.get("w1"), 3);
91    }
92
93    #[test]
94    fn merge_takes_max() {
95        let mut a = VectorClock::new();
96        a.set("w1", 3);
97        a.set("w2", 5);
98        let mut b = VectorClock::new();
99        b.set("w1", 7);
100        b.set("w2", 2);
101        a.merge(&b);
102        assert_eq!(a.get("w1"), 7);
103        assert_eq!(a.get("w2"), 5);
104    }
105
106    #[test]
107    fn merge_adds_new_entries() {
108        let mut a = VectorClock::new();
109        a.set("w1", 3);
110        let mut b = VectorClock::new();
111        b.set("w2", 5);
112        a.merge(&b);
113        assert_eq!(a.get("w2"), 5);
114        assert_eq!(a.len(), 2);
115    }
116
117    #[test]
118    fn happens_before_true() {
119        let mut a = VectorClock::new();
120        a.set("w1", 1);
121        a.set("w2", 2);
122        let mut b = VectorClock::new();
123        b.set("w1", 3);
124        b.set("w2", 4);
125        assert!(a.happens_before(&b));
126    }
127
128    #[test]
129    fn happens_before_false_concurrent() {
130        let mut a = VectorClock::new();
131        a.set("w1", 3);
132        a.set("w2", 1);
133        let mut b = VectorClock::new();
134        b.set("w1", 1);
135        b.set("w2", 3);
136        assert!(!a.happens_before(&b));
137        assert!(!b.happens_before(&a));
138    }
139}