Skip to main content

like_a_clockwork/
vector.rs

1use serde::{Deserialize, Serialize};
2use std::collections::HashMap;
3use std::fmt;
4use std::str::FromStr;
5
6use crate::causality::{compare, CausalityRelation};
7
8#[derive(Debug, thiserror::Error)]
9pub enum VectorTimestampError {
10    #[error("empty vector timestamp string")]
11    Empty,
12    #[error("missing '=' delimiter in segment")]
13    MissingDelimiter,
14    #[error("invalid time value: {0}")]
15    InvalidTime(String),
16    #[error("empty node id")]
17    EmptyNodeId,
18}
19
20#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
21pub struct VectorTimestamp {
22    clocks: HashMap<String, u64>,
23}
24
25impl VectorTimestamp {
26    pub fn clocks(&self) -> &HashMap<String, u64> {
27        &self.clocks
28    }
29
30    pub fn get(&self, node_id: &str) -> u64 {
31        self.clocks.get(node_id).copied().unwrap_or(0)
32    }
33}
34
35impl From<HashMap<String, u64>> for VectorTimestamp {
36    fn from(clocks: HashMap<String, u64>) -> Self {
37        VectorTimestamp { clocks }
38    }
39}
40
41impl fmt::Display for VectorTimestamp {
42    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43        let mut keys: Vec<&String> = self.clocks.keys().collect();
44        keys.sort();
45        let parts: Vec<String> = keys.iter().map(|k| format!("{}={}", k, self.clocks[*k])).collect();
46        write!(f, "{}", parts.join(","))
47    }
48}
49
50impl FromStr for VectorTimestamp {
51    type Err = VectorTimestampError;
52
53    fn from_str(s: &str) -> Result<Self, Self::Err> {
54        if s.is_empty() {
55            return Err(VectorTimestampError::Empty);
56        }
57
58        let mut clocks = HashMap::new();
59        for segment in s.split(',') {
60            let Some((node_id, value_str)) = segment.split_once('=') else {
61                return Err(VectorTimestampError::MissingDelimiter);
62            };
63            if node_id.is_empty() {
64                return Err(VectorTimestampError::EmptyNodeId);
65            }
66            let value: u64 = value_str
67                .parse()
68                .map_err(|_| VectorTimestampError::InvalidTime(value_str.to_string()))?;
69            clocks.insert(node_id.to_string(), value);
70        }
71
72        Ok(VectorTimestamp { clocks })
73    }
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct VectorClock {
78    node_id: String,
79    clocks: HashMap<String, u64>,
80}
81
82impl VectorClock {
83    pub fn new(node_id: &str, peers: &[&str]) -> Self {
84        assert!(!node_id.is_empty(), "node_id must not be empty");
85        assert!(
86            !node_id.contains('=') && !node_id.contains(','),
87            "node_id must not contain '=' or ','"
88        );
89
90        for peer in peers {
91            assert!(!peer.is_empty(), "peer id must not be empty");
92            assert!(
93                !peer.contains('=') && !peer.contains(','),
94                "peer id must not contain '=' or ','"
95            );
96            assert!(*peer != node_id, "node_id must not appear in peers");
97        }
98
99        let mut clocks = HashMap::new();
100        clocks.insert(node_id.to_string(), 0);
101        for peer in peers {
102            clocks.insert(peer.to_string(), 0);
103        }
104
105        VectorClock {
106            node_id: node_id.to_string(),
107            clocks,
108        }
109    }
110
111    pub fn from_map(node_id: &str, clocks: HashMap<String, u64>) -> Self {
112        VectorClock {
113            node_id: node_id.to_string(),
114            clocks,
115        }
116    }
117
118    pub fn tick(&mut self) -> u64 {
119        let counter = self.clocks.entry(self.node_id.clone()).or_insert(0);
120        *counter += 1;
121        *counter
122    }
123
124    pub fn send(&mut self) -> VectorTimestamp {
125        self.tick();
126        self.snapshot()
127    }
128
129    pub fn receive(&mut self, timestamp: &VectorTimestamp) {
130        for (node, &value) in timestamp.clocks() {
131            let entry = self.clocks.entry(node.clone()).or_insert(0);
132            *entry = (*entry).max(value);
133        }
134        let local = self.clocks.entry(self.node_id.clone()).or_insert(0);
135        *local += 1;
136    }
137
138    pub fn relation(&self, other: &VectorClock) -> CausalityRelation {
139        let self_ts = self.snapshot();
140        let other_ts = other.snapshot();
141        compare(&self_ts, &other_ts)
142    }
143
144    pub fn snapshot(&self) -> VectorTimestamp {
145        VectorTimestamp {
146            clocks: self.clocks.clone(),
147        }
148    }
149
150    pub fn get(&self, node_id: &str) -> u64 {
151        self.clocks.get(node_id).copied().unwrap_or(0)
152    }
153
154    pub fn merge(&mut self, other: &VectorClock) {
155        for (node, &value) in &other.clocks {
156            let entry = self.clocks.entry(node.clone()).or_insert(0);
157            *entry = (*entry).max(value);
158        }
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    use super::*;
165
166    #[test]
167    fn new_initializes_all_clocks_to_zero() {
168        let vc = VectorClock::new("a", &["b", "c"]);
169        assert_eq!(vc.get("a"), 0);
170        assert_eq!(vc.get("b"), 0);
171        assert_eq!(vc.get("c"), 0);
172    }
173
174    #[test]
175    fn new_includes_self_in_clocks() {
176        let vc = VectorClock::new("node1", &[]);
177        assert_eq!(vc.get("node1"), 0);
178        assert!(vc.clocks.contains_key("node1"));
179    }
180
181    #[test]
182    #[should_panic]
183    fn new_rejects_empty_node_id() {
184        VectorClock::new("", &["b"]);
185    }
186
187    #[test]
188    #[should_panic]
189    fn new_rejects_duplicate_peer() {
190        VectorClock::new("a", &["a"]);
191    }
192
193    #[test]
194    #[should_panic]
195    fn new_rejects_invalid_chars_in_node_id() {
196        VectorClock::new("a=b", &[]);
197    }
198
199    #[test]
200    fn from_map_preserves_values() {
201        let mut map = HashMap::new();
202        map.insert("x".to_string(), 5);
203        map.insert("y".to_string(), 3);
204        let vc = VectorClock::from_map("x", map);
205        assert_eq!(vc.get("x"), 5);
206        assert_eq!(vc.get("y"), 3);
207    }
208
209    #[test]
210    fn tick_increments_only_local_node() {
211        let mut vc = VectorClock::new("a", &["b"]);
212        let val = vc.tick();
213        assert_eq!(val, 1);
214        assert_eq!(vc.get("a"), 1);
215    }
216
217    #[test]
218    fn tick_does_not_affect_other_nodes() {
219        let mut vc = VectorClock::new("a", &["b", "c"]);
220        vc.tick();
221        vc.tick();
222        assert_eq!(vc.get("a"), 2);
223        assert_eq!(vc.get("b"), 0);
224        assert_eq!(vc.get("c"), 0);
225    }
226
227    #[test]
228    fn send_increments_and_returns_snapshot() {
229        let mut vc = VectorClock::new("a", &["b"]);
230        let ts = vc.send();
231        assert_eq!(ts.get("a"), 1);
232        assert_eq!(ts.get("b"), 0);
233        assert_eq!(vc.get("a"), 1);
234    }
235
236    #[test]
237    fn send_snapshot_is_independent_copy() {
238        let mut vc = VectorClock::new("a", &["b"]);
239        let ts = vc.send();
240        vc.tick();
241        assert_eq!(ts.get("a"), 1);
242        assert_eq!(vc.get("a"), 2);
243    }
244
245    #[test]
246    fn receive_takes_max_then_increments_local() {
247        let mut vc_a = VectorClock::new("a", &["b"]);
248        let mut vc_b = VectorClock::new("b", &["a"]);
249        let ts = vc_a.send();
250        vc_b.receive(&ts);
251        assert_eq!(vc_b.get("a"), 1);
252        assert_eq!(vc_b.get("b"), 1);
253    }
254
255    #[test]
256    fn receive_takes_max_for_all_nodes() {
257        let mut vc_a = VectorClock::new("a", &["b", "c"]);
258        vc_a.tick(); // a=1
259        vc_a.tick(); // a=2
260
261        let mut vc_b = VectorClock::new("b", &["a", "c"]);
262        vc_b.tick(); // b=1
263
264        let ts_a = vc_a.send(); // a=3
265        vc_b.receive(&ts_a);
266        // b should have max(a)=3, b=1+1=2, c=0
267        assert_eq!(vc_b.get("a"), 3);
268        assert_eq!(vc_b.get("b"), 2);
269        assert_eq!(vc_b.get("c"), 0);
270    }
271
272    #[test]
273    fn receive_with_unknown_node_adds_it() {
274        let mut vc_a = VectorClock::new("a", &[]);
275        let mut vc_b = VectorClock::new("b", &[]);
276        let ts = vc_a.send(); // a=1
277        vc_b.receive(&ts);
278        assert_eq!(vc_b.get("a"), 1);
279        assert_eq!(vc_b.get("b"), 1);
280    }
281
282    #[test]
283    fn receive_with_lower_remote_still_increments_local() {
284        let mut vc_a = VectorClock::new("a", &["b"]);
285        let ts = vc_a.send(); // a=1
286
287        let mut vc_b = VectorClock::new("b", &["a"]);
288        vc_b.tick(); // b=1
289        vc_b.tick(); // b=2
290        vc_b.tick(); // b=3
291
292        vc_b.receive(&ts);
293        assert_eq!(vc_b.get("a"), 1);
294        assert_eq!(vc_b.get("b"), 4); // 3 + 1
295    }
296
297    #[test]
298    fn snapshot_returns_current_state() {
299        let mut vc = VectorClock::new("a", &["b"]);
300        vc.tick();
301        let ts = vc.snapshot();
302        assert_eq!(ts.get("a"), 1);
303        assert_eq!(ts.get("b"), 0);
304    }
305
306    #[test]
307    fn snapshot_does_not_mutate_clock() {
308        let mut vc = VectorClock::new("a", &["b"]);
309        vc.tick();
310        let _ = vc.snapshot();
311        assert_eq!(vc.get("a"), 1);
312        let _ = vc.snapshot();
313        assert_eq!(vc.get("a"), 1);
314    }
315
316    #[test]
317    fn get_returns_value_for_known_node() {
318        let mut vc = VectorClock::new("a", &["b"]);
319        vc.tick();
320        assert_eq!(vc.get("a"), 1);
321    }
322
323    #[test]
324    fn get_returns_zero_for_unknown_node() {
325        let vc = VectorClock::new("a", &[]);
326        assert_eq!(vc.get("unknown"), 0);
327    }
328
329    #[test]
330    fn merge_takes_max_without_incrementing() {
331        let mut vc_a = VectorClock::new("a", &["b"]);
332        vc_a.tick(); // a=1
333
334        let mut vc_b = VectorClock::new("b", &["a"]);
335        vc_b.tick(); // b=1
336        vc_b.tick(); // b=2
337
338        vc_a.merge(&vc_b);
339        assert_eq!(vc_a.get("a"), 1); // unchanged
340        assert_eq!(vc_a.get("b"), 2); // max(0, 2)
341    }
342
343    #[test]
344    fn merge_is_commutative() {
345        let mut vc_a = VectorClock::new("a", &["b"]);
346        vc_a.tick();
347
348        let mut vc_b = VectorClock::new("b", &["a"]);
349        vc_b.tick();
350
351        let vc_a2 = vc_a.clone();
352        let vc_b2 = vc_b.clone();
353
354        vc_a.merge(&vc_b);
355        vc_b.merge(&vc_a2);
356
357        // After merge, both should have the same view
358        let mut vc_a2_clone = vc_a2.clone();
359        vc_a2_clone.merge(&vc_b2);
360
361        assert_eq!(vc_a.get("a"), vc_a2_clone.get("a"));
362        assert_eq!(vc_a.get("b"), vc_a2_clone.get("b"));
363    }
364
365    #[test]
366    fn merge_adds_unknown_nodes() {
367        let mut vc_a = VectorClock::new("a", &[]);
368        let mut vc_b = VectorClock::new("b", &["c"]);
369        vc_b.tick();
370
371        vc_a.merge(&vc_b);
372        assert_eq!(vc_a.get("b"), 1);
373        assert_eq!(vc_a.get("c"), 0);
374    }
375
376    #[test]
377    fn relation_delegates_to_compare() {
378        let mut vc_a = VectorClock::new("a", &["b"]);
379        let vc_b = VectorClock::new("b", &["a"]);
380        vc_a.tick();
381
382        let rel = vc_a.relation(&vc_b);
383        assert_eq!(rel, CausalityRelation::HappensAfter);
384    }
385
386    #[test]
387    fn timestamp_display_format() {
388        let mut clocks = HashMap::new();
389        clocks.insert("svc-b".to_string(), 1);
390        clocks.insert("svc-a".to_string(), 3);
391        let ts = VectorTimestamp { clocks };
392        assert_eq!(format!("{}", ts), "svc-a=3,svc-b=1");
393    }
394
395    #[test]
396    fn timestamp_parse_roundtrip() {
397        let mut clocks = HashMap::new();
398        clocks.insert("a".to_string(), 2);
399        clocks.insert("b".to_string(), 5);
400        let ts = VectorTimestamp { clocks };
401        let s = ts.to_string();
402        let parsed: VectorTimestamp = s.parse().unwrap();
403        assert_eq!(ts, parsed);
404    }
405
406    #[test]
407    fn timestamp_parse_empty_string() {
408        let result = "".parse::<VectorTimestamp>();
409        assert!(result.is_err());
410        assert!(matches!(result.unwrap_err(), VectorTimestampError::Empty));
411    }
412
413    #[test]
414    fn timestamp_parse_missing_delimiter() {
415        let result = "abc".parse::<VectorTimestamp>();
416        assert!(result.is_err());
417        assert!(matches!(
418            result.unwrap_err(),
419            VectorTimestampError::MissingDelimiter
420        ));
421    }
422
423    #[test]
424    fn timestamp_parse_invalid_time() {
425        let result = "a=xyz".parse::<VectorTimestamp>();
426        assert!(result.is_err());
427        assert!(matches!(
428            result.unwrap_err(),
429            VectorTimestampError::InvalidTime(_)
430        ));
431    }
432
433    #[test]
434    fn timestamp_parse_empty_node_id() {
435        let result = "=5".parse::<VectorTimestamp>();
436        assert!(result.is_err());
437        assert!(matches!(
438            result.unwrap_err(),
439            VectorTimestampError::EmptyNodeId
440        ));
441    }
442
443    #[test]
444    fn serde_json_roundtrip_vector_clock() {
445        let mut vc = VectorClock::new("a", &["b"]);
446        vc.tick();
447        let json = serde_json::to_string(&vc).unwrap();
448        let deserialized: VectorClock = serde_json::from_str(&json).unwrap();
449        assert_eq!(deserialized.get("a"), 1);
450        assert_eq!(deserialized.get("b"), 0);
451    }
452
453    #[test]
454    fn serde_json_roundtrip_vector_timestamp() {
455        let mut clocks = HashMap::new();
456        clocks.insert("x".to_string(), 10);
457        clocks.insert("y".to_string(), 20);
458        let ts = VectorTimestamp { clocks };
459        let json = serde_json::to_string(&ts).unwrap();
460        let deserialized: VectorTimestamp = serde_json::from_str(&json).unwrap();
461        assert_eq!(ts, deserialized);
462    }
463
464    #[test]
465    fn three_process_message_chain() {
466        let mut a = VectorClock::new("a", &["b", "c"]);
467        let mut b = VectorClock::new("b", &["a", "c"]);
468        let mut c = VectorClock::new("c", &["a", "b"]);
469
470        let ts_a = a.send(); // A sends to B
471        b.receive(&ts_a);
472
473        let ts_b = b.send(); // B sends to C
474        c.receive(&ts_b);
475
476        // C has causal knowledge of A
477        assert!(c.get("a") >= 1);
478        assert_eq!(a.relation(&c), CausalityRelation::HappensBefore);
479    }
480
481    #[test]
482    fn fork_join_causal_graph() {
483        let mut a = VectorClock::new("a", &["b", "c", "d"]);
484        let mut b = VectorClock::new("b", &["a", "c", "d"]);
485        let mut c = VectorClock::new("c", &["a", "b", "d"]);
486        let mut d = VectorClock::new("d", &["a", "b", "c"]);
487
488        // A sends to B and C
489        let ts_a = a.send();
490        b.receive(&ts_a);
491        c.receive(&ts_a);
492
493        // B and C send to D
494        let ts_b = b.send();
495        let ts_c = c.send();
496        d.receive(&ts_b);
497        d.receive(&ts_c);
498
499        // D sees both B and C
500        assert!(d.get("b") >= 1);
501        assert!(d.get("c") >= 1);
502        assert!(d.get("a") >= 1);
503    }
504
505    #[test]
506    fn concurrent_writes_detected() {
507        let mut a = VectorClock::new("a", &["b"]);
508        let mut b = VectorClock::new("b", &["a"]);
509
510        a.tick();
511        b.tick();
512
513        assert_eq!(a.relation(&b), CausalityRelation::Concurrent);
514    }
515}