1use serde::{Deserialize, Serialize};
2use std::collections::HashMap;
3use std::fmt;
4use std::str::FromStr;
5
6use crate::causality::{compare, CausalityRelation};
7
8#[derive(Debug, thiserror::Error)]
9pub enum VectorTimestampError {
10 #[error("empty vector timestamp string")]
11 Empty,
12 #[error("missing '=' delimiter in segment")]
13 MissingDelimiter,
14 #[error("invalid time value: {0}")]
15 InvalidTime(String),
16 #[error("empty node id")]
17 EmptyNodeId,
18}
19
20#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
21pub struct VectorTimestamp {
22 clocks: HashMap<String, u64>,
23}
24
25impl VectorTimestamp {
26 pub fn clocks(&self) -> &HashMap<String, u64> {
27 &self.clocks
28 }
29
30 pub fn get(&self, node_id: &str) -> u64 {
31 self.clocks.get(node_id).copied().unwrap_or(0)
32 }
33}
34
35impl From<HashMap<String, u64>> for VectorTimestamp {
36 fn from(clocks: HashMap<String, u64>) -> Self {
37 VectorTimestamp { clocks }
38 }
39}
40
41impl fmt::Display for VectorTimestamp {
42 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43 let mut keys: Vec<&String> = self.clocks.keys().collect();
44 keys.sort();
45 let parts: Vec<String> = keys.iter().map(|k| format!("{}={}", k, self.clocks[*k])).collect();
46 write!(f, "{}", parts.join(","))
47 }
48}
49
50impl FromStr for VectorTimestamp {
51 type Err = VectorTimestampError;
52
53 fn from_str(s: &str) -> Result<Self, Self::Err> {
54 if s.is_empty() {
55 return Err(VectorTimestampError::Empty);
56 }
57
58 let mut clocks = HashMap::new();
59 for segment in s.split(',') {
60 let Some((node_id, value_str)) = segment.split_once('=') else {
61 return Err(VectorTimestampError::MissingDelimiter);
62 };
63 if node_id.is_empty() {
64 return Err(VectorTimestampError::EmptyNodeId);
65 }
66 let value: u64 = value_str
67 .parse()
68 .map_err(|_| VectorTimestampError::InvalidTime(value_str.to_string()))?;
69 clocks.insert(node_id.to_string(), value);
70 }
71
72 Ok(VectorTimestamp { clocks })
73 }
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct VectorClock {
78 node_id: String,
79 clocks: HashMap<String, u64>,
80}
81
82impl VectorClock {
83 pub fn new(node_id: &str, peers: &[&str]) -> Self {
84 assert!(!node_id.is_empty(), "node_id must not be empty");
85 assert!(
86 !node_id.contains('=') && !node_id.contains(','),
87 "node_id must not contain '=' or ','"
88 );
89
90 for peer in peers {
91 assert!(!peer.is_empty(), "peer id must not be empty");
92 assert!(
93 !peer.contains('=') && !peer.contains(','),
94 "peer id must not contain '=' or ','"
95 );
96 assert!(*peer != node_id, "node_id must not appear in peers");
97 }
98
99 let mut clocks = HashMap::new();
100 clocks.insert(node_id.to_string(), 0);
101 for peer in peers {
102 clocks.insert(peer.to_string(), 0);
103 }
104
105 VectorClock {
106 node_id: node_id.to_string(),
107 clocks,
108 }
109 }
110
111 pub fn from_map(node_id: &str, clocks: HashMap<String, u64>) -> Self {
112 VectorClock {
113 node_id: node_id.to_string(),
114 clocks,
115 }
116 }
117
118 pub fn tick(&mut self) -> u64 {
119 let counter = self.clocks.entry(self.node_id.clone()).or_insert(0);
120 *counter += 1;
121 *counter
122 }
123
124 pub fn send(&mut self) -> VectorTimestamp {
125 self.tick();
126 self.snapshot()
127 }
128
129 pub fn receive(&mut self, timestamp: &VectorTimestamp) {
130 for (node, &value) in timestamp.clocks() {
131 let entry = self.clocks.entry(node.clone()).or_insert(0);
132 *entry = (*entry).max(value);
133 }
134 let local = self.clocks.entry(self.node_id.clone()).or_insert(0);
135 *local += 1;
136 }
137
138 pub fn relation(&self, other: &VectorClock) -> CausalityRelation {
139 let self_ts = self.snapshot();
140 let other_ts = other.snapshot();
141 compare(&self_ts, &other_ts)
142 }
143
144 pub fn snapshot(&self) -> VectorTimestamp {
145 VectorTimestamp {
146 clocks: self.clocks.clone(),
147 }
148 }
149
150 pub fn get(&self, node_id: &str) -> u64 {
151 self.clocks.get(node_id).copied().unwrap_or(0)
152 }
153
154 pub fn merge(&mut self, other: &VectorClock) {
155 for (node, &value) in &other.clocks {
156 let entry = self.clocks.entry(node.clone()).or_insert(0);
157 *entry = (*entry).max(value);
158 }
159 }
160}
161
162#[cfg(test)]
163mod tests {
164 use super::*;
165
166 #[test]
167 fn new_initializes_all_clocks_to_zero() {
168 let vc = VectorClock::new("a", &["b", "c"]);
169 assert_eq!(vc.get("a"), 0);
170 assert_eq!(vc.get("b"), 0);
171 assert_eq!(vc.get("c"), 0);
172 }
173
174 #[test]
175 fn new_includes_self_in_clocks() {
176 let vc = VectorClock::new("node1", &[]);
177 assert_eq!(vc.get("node1"), 0);
178 assert!(vc.clocks.contains_key("node1"));
179 }
180
181 #[test]
182 #[should_panic]
183 fn new_rejects_empty_node_id() {
184 VectorClock::new("", &["b"]);
185 }
186
187 #[test]
188 #[should_panic]
189 fn new_rejects_duplicate_peer() {
190 VectorClock::new("a", &["a"]);
191 }
192
193 #[test]
194 #[should_panic]
195 fn new_rejects_invalid_chars_in_node_id() {
196 VectorClock::new("a=b", &[]);
197 }
198
199 #[test]
200 fn from_map_preserves_values() {
201 let mut map = HashMap::new();
202 map.insert("x".to_string(), 5);
203 map.insert("y".to_string(), 3);
204 let vc = VectorClock::from_map("x", map);
205 assert_eq!(vc.get("x"), 5);
206 assert_eq!(vc.get("y"), 3);
207 }
208
209 #[test]
210 fn tick_increments_only_local_node() {
211 let mut vc = VectorClock::new("a", &["b"]);
212 let val = vc.tick();
213 assert_eq!(val, 1);
214 assert_eq!(vc.get("a"), 1);
215 }
216
217 #[test]
218 fn tick_does_not_affect_other_nodes() {
219 let mut vc = VectorClock::new("a", &["b", "c"]);
220 vc.tick();
221 vc.tick();
222 assert_eq!(vc.get("a"), 2);
223 assert_eq!(vc.get("b"), 0);
224 assert_eq!(vc.get("c"), 0);
225 }
226
227 #[test]
228 fn send_increments_and_returns_snapshot() {
229 let mut vc = VectorClock::new("a", &["b"]);
230 let ts = vc.send();
231 assert_eq!(ts.get("a"), 1);
232 assert_eq!(ts.get("b"), 0);
233 assert_eq!(vc.get("a"), 1);
234 }
235
236 #[test]
237 fn send_snapshot_is_independent_copy() {
238 let mut vc = VectorClock::new("a", &["b"]);
239 let ts = vc.send();
240 vc.tick();
241 assert_eq!(ts.get("a"), 1);
242 assert_eq!(vc.get("a"), 2);
243 }
244
245 #[test]
246 fn receive_takes_max_then_increments_local() {
247 let mut vc_a = VectorClock::new("a", &["b"]);
248 let mut vc_b = VectorClock::new("b", &["a"]);
249 let ts = vc_a.send();
250 vc_b.receive(&ts);
251 assert_eq!(vc_b.get("a"), 1);
252 assert_eq!(vc_b.get("b"), 1);
253 }
254
255 #[test]
256 fn receive_takes_max_for_all_nodes() {
257 let mut vc_a = VectorClock::new("a", &["b", "c"]);
258 vc_a.tick(); vc_a.tick(); let mut vc_b = VectorClock::new("b", &["a", "c"]);
262 vc_b.tick(); let ts_a = vc_a.send(); vc_b.receive(&ts_a);
266 assert_eq!(vc_b.get("a"), 3);
268 assert_eq!(vc_b.get("b"), 2);
269 assert_eq!(vc_b.get("c"), 0);
270 }
271
272 #[test]
273 fn receive_with_unknown_node_adds_it() {
274 let mut vc_a = VectorClock::new("a", &[]);
275 let mut vc_b = VectorClock::new("b", &[]);
276 let ts = vc_a.send(); vc_b.receive(&ts);
278 assert_eq!(vc_b.get("a"), 1);
279 assert_eq!(vc_b.get("b"), 1);
280 }
281
282 #[test]
283 fn receive_with_lower_remote_still_increments_local() {
284 let mut vc_a = VectorClock::new("a", &["b"]);
285 let ts = vc_a.send(); let mut vc_b = VectorClock::new("b", &["a"]);
288 vc_b.tick(); vc_b.tick(); vc_b.tick(); vc_b.receive(&ts);
293 assert_eq!(vc_b.get("a"), 1);
294 assert_eq!(vc_b.get("b"), 4); }
296
297 #[test]
298 fn snapshot_returns_current_state() {
299 let mut vc = VectorClock::new("a", &["b"]);
300 vc.tick();
301 let ts = vc.snapshot();
302 assert_eq!(ts.get("a"), 1);
303 assert_eq!(ts.get("b"), 0);
304 }
305
306 #[test]
307 fn snapshot_does_not_mutate_clock() {
308 let mut vc = VectorClock::new("a", &["b"]);
309 vc.tick();
310 let _ = vc.snapshot();
311 assert_eq!(vc.get("a"), 1);
312 let _ = vc.snapshot();
313 assert_eq!(vc.get("a"), 1);
314 }
315
316 #[test]
317 fn get_returns_value_for_known_node() {
318 let mut vc = VectorClock::new("a", &["b"]);
319 vc.tick();
320 assert_eq!(vc.get("a"), 1);
321 }
322
323 #[test]
324 fn get_returns_zero_for_unknown_node() {
325 let vc = VectorClock::new("a", &[]);
326 assert_eq!(vc.get("unknown"), 0);
327 }
328
329 #[test]
330 fn merge_takes_max_without_incrementing() {
331 let mut vc_a = VectorClock::new("a", &["b"]);
332 vc_a.tick(); let mut vc_b = VectorClock::new("b", &["a"]);
335 vc_b.tick(); vc_b.tick(); vc_a.merge(&vc_b);
339 assert_eq!(vc_a.get("a"), 1); assert_eq!(vc_a.get("b"), 2); }
342
343 #[test]
344 fn merge_is_commutative() {
345 let mut vc_a = VectorClock::new("a", &["b"]);
346 vc_a.tick();
347
348 let mut vc_b = VectorClock::new("b", &["a"]);
349 vc_b.tick();
350
351 let vc_a2 = vc_a.clone();
352 let vc_b2 = vc_b.clone();
353
354 vc_a.merge(&vc_b);
355 vc_b.merge(&vc_a2);
356
357 let mut vc_a2_clone = vc_a2.clone();
359 vc_a2_clone.merge(&vc_b2);
360
361 assert_eq!(vc_a.get("a"), vc_a2_clone.get("a"));
362 assert_eq!(vc_a.get("b"), vc_a2_clone.get("b"));
363 }
364
365 #[test]
366 fn merge_adds_unknown_nodes() {
367 let mut vc_a = VectorClock::new("a", &[]);
368 let mut vc_b = VectorClock::new("b", &["c"]);
369 vc_b.tick();
370
371 vc_a.merge(&vc_b);
372 assert_eq!(vc_a.get("b"), 1);
373 assert_eq!(vc_a.get("c"), 0);
374 }
375
376 #[test]
377 fn relation_delegates_to_compare() {
378 let mut vc_a = VectorClock::new("a", &["b"]);
379 let vc_b = VectorClock::new("b", &["a"]);
380 vc_a.tick();
381
382 let rel = vc_a.relation(&vc_b);
383 assert_eq!(rel, CausalityRelation::HappensAfter);
384 }
385
386 #[test]
387 fn timestamp_display_format() {
388 let mut clocks = HashMap::new();
389 clocks.insert("svc-b".to_string(), 1);
390 clocks.insert("svc-a".to_string(), 3);
391 let ts = VectorTimestamp { clocks };
392 assert_eq!(format!("{}", ts), "svc-a=3,svc-b=1");
393 }
394
395 #[test]
396 fn timestamp_parse_roundtrip() {
397 let mut clocks = HashMap::new();
398 clocks.insert("a".to_string(), 2);
399 clocks.insert("b".to_string(), 5);
400 let ts = VectorTimestamp { clocks };
401 let s = ts.to_string();
402 let parsed: VectorTimestamp = s.parse().unwrap();
403 assert_eq!(ts, parsed);
404 }
405
406 #[test]
407 fn timestamp_parse_empty_string() {
408 let result = "".parse::<VectorTimestamp>();
409 assert!(result.is_err());
410 assert!(matches!(result.unwrap_err(), VectorTimestampError::Empty));
411 }
412
413 #[test]
414 fn timestamp_parse_missing_delimiter() {
415 let result = "abc".parse::<VectorTimestamp>();
416 assert!(result.is_err());
417 assert!(matches!(
418 result.unwrap_err(),
419 VectorTimestampError::MissingDelimiter
420 ));
421 }
422
423 #[test]
424 fn timestamp_parse_invalid_time() {
425 let result = "a=xyz".parse::<VectorTimestamp>();
426 assert!(result.is_err());
427 assert!(matches!(
428 result.unwrap_err(),
429 VectorTimestampError::InvalidTime(_)
430 ));
431 }
432
433 #[test]
434 fn timestamp_parse_empty_node_id() {
435 let result = "=5".parse::<VectorTimestamp>();
436 assert!(result.is_err());
437 assert!(matches!(
438 result.unwrap_err(),
439 VectorTimestampError::EmptyNodeId
440 ));
441 }
442
443 #[test]
444 fn serde_json_roundtrip_vector_clock() {
445 let mut vc = VectorClock::new("a", &["b"]);
446 vc.tick();
447 let json = serde_json::to_string(&vc).unwrap();
448 let deserialized: VectorClock = serde_json::from_str(&json).unwrap();
449 assert_eq!(deserialized.get("a"), 1);
450 assert_eq!(deserialized.get("b"), 0);
451 }
452
453 #[test]
454 fn serde_json_roundtrip_vector_timestamp() {
455 let mut clocks = HashMap::new();
456 clocks.insert("x".to_string(), 10);
457 clocks.insert("y".to_string(), 20);
458 let ts = VectorTimestamp { clocks };
459 let json = serde_json::to_string(&ts).unwrap();
460 let deserialized: VectorTimestamp = serde_json::from_str(&json).unwrap();
461 assert_eq!(ts, deserialized);
462 }
463
464 #[test]
465 fn three_process_message_chain() {
466 let mut a = VectorClock::new("a", &["b", "c"]);
467 let mut b = VectorClock::new("b", &["a", "c"]);
468 let mut c = VectorClock::new("c", &["a", "b"]);
469
470 let ts_a = a.send(); b.receive(&ts_a);
472
473 let ts_b = b.send(); c.receive(&ts_b);
475
476 assert!(c.get("a") >= 1);
478 assert_eq!(a.relation(&c), CausalityRelation::HappensBefore);
479 }
480
481 #[test]
482 fn fork_join_causal_graph() {
483 let mut a = VectorClock::new("a", &["b", "c", "d"]);
484 let mut b = VectorClock::new("b", &["a", "c", "d"]);
485 let mut c = VectorClock::new("c", &["a", "b", "d"]);
486 let mut d = VectorClock::new("d", &["a", "b", "c"]);
487
488 let ts_a = a.send();
490 b.receive(&ts_a);
491 c.receive(&ts_a);
492
493 let ts_b = b.send();
495 let ts_c = c.send();
496 d.receive(&ts_b);
497 d.receive(&ts_c);
498
499 assert!(d.get("b") >= 1);
501 assert!(d.get("c") >= 1);
502 assert!(d.get("a") >= 1);
503 }
504
505 #[test]
506 fn concurrent_writes_detected() {
507 let mut a = VectorClock::new("a", &["b"]);
508 let mut b = VectorClock::new("b", &["a"]);
509
510 a.tick();
511 b.tick();
512
513 assert_eq!(a.relation(&b), CausalityRelation::Concurrent);
514 }
515}