1use crate::node::NodeId;
9use serde::{Deserialize, Serialize};
10use std::cmp::Ordering;
11use std::collections::HashMap;
12
13#[derive(Debug, Clone, Default, Serialize, Deserialize)]
19pub struct VectorClock {
20 clocks: HashMap<String, u64>,
21}
22
23impl VectorClock {
24 pub fn new() -> Self {
26 Self {
27 clocks: HashMap::new(),
28 }
29 }
30
31 pub fn with_node(node_id: &NodeId) -> Self {
33 let mut clocks = HashMap::new();
34 clocks.insert(node_id.as_str().to_string(), 0);
35 Self { clocks }
36 }
37
38 pub fn increment(&mut self, node_id: &NodeId) {
40 let key = node_id.as_str().to_string();
41 *self.clocks.entry(key).or_insert(0) += 1;
42 }
43
44 pub fn get(&self, node_id: &NodeId) -> u64 {
46 self.clocks.get(node_id.as_str()).copied().unwrap_or(0)
47 }
48
49 pub fn set(&mut self, node_id: &NodeId, value: u64) {
51 self.clocks.insert(node_id.as_str().to_string(), value);
52 }
53
54 pub fn merge(&mut self, other: &VectorClock) {
56 for (node, &value) in &other.clocks {
57 let current = self.clocks.entry(node.clone()).or_insert(0);
58 *current = (*current).max(value);
59 }
60 }
61
62 pub fn merged(&self, other: &VectorClock) -> VectorClock {
64 let mut result = self.clone();
65 result.merge(other);
66 result
67 }
68
69 pub fn happened_before(&self, other: &VectorClock) -> bool {
71 let mut dominated = false;
72
73 for (node, &value) in &self.clocks {
75 let other_value = other.clocks.get(node).copied().unwrap_or(0);
76 if value > other_value {
77 return false;
78 }
79 if value < other_value {
80 dominated = true;
81 }
82 }
83
84 for (node, &value) in &other.clocks {
86 if !self.clocks.contains_key(node) && value > 0 {
87 dominated = true;
88 }
89 }
90
91 dominated
92 }
93
94 pub fn happened_after(&self, other: &VectorClock) -> bool {
96 other.happened_before(self)
97 }
98
99 pub fn is_concurrent(&self, other: &VectorClock) -> bool {
101 !self.happened_before(other) && !self.happened_after(other)
102 }
103
104 pub fn equals(&self, other: &VectorClock) -> bool {
106 if self.clocks.len() != other.clocks.len() {
107 return false;
108 }
109
110 for (node, &value) in &self.clocks {
111 if other.clocks.get(node).copied().unwrap_or(0) != value {
112 return false;
113 }
114 }
115
116 true
117 }
118
119 pub fn compare(&self, other: &VectorClock) -> VectorClockOrdering {
121 if self.equals(other) {
122 VectorClockOrdering::Equal
123 } else if self.happened_before(other) {
124 VectorClockOrdering::Before
125 } else if self.happened_after(other) {
126 VectorClockOrdering::After
127 } else {
128 VectorClockOrdering::Concurrent
129 }
130 }
131
132 pub fn node_count(&self) -> usize {
134 self.clocks.len()
135 }
136
137 pub fn nodes(&self) -> Vec<String> {
139 self.clocks.keys().cloned().collect()
140 }
141
142 pub fn max_value(&self) -> u64 {
144 self.clocks.values().copied().max().unwrap_or(0)
145 }
146
147 pub fn sum(&self) -> u64 {
149 self.clocks.values().sum()
150 }
151
152 pub fn is_empty(&self) -> bool {
154 self.clocks.values().all(|&v| v == 0)
155 }
156
157 pub fn reset(&mut self) {
159 for value in self.clocks.values_mut() {
160 *value = 0;
161 }
162 }
163
164 pub fn remove(&mut self, node_id: &NodeId) {
166 self.clocks.remove(node_id.as_str());
167 }
168}
169
170impl PartialEq for VectorClock {
171 fn eq(&self, other: &Self) -> bool {
172 self.equals(other)
173 }
174}
175
176impl Eq for VectorClock {}
177
178impl PartialOrd for VectorClock {
179 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
180 match self.compare(other) {
181 VectorClockOrdering::Equal => Some(Ordering::Equal),
182 VectorClockOrdering::Before => Some(Ordering::Less),
183 VectorClockOrdering::After => Some(Ordering::Greater),
184 VectorClockOrdering::Concurrent => None,
185 }
186 }
187}
188
189#[derive(Debug, Clone, Copy, PartialEq, Eq)]
195pub enum VectorClockOrdering {
196 Before,
198 After,
200 Equal,
202 Concurrent,
204}
205
206#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct VersionedValue<T> {
213 pub value: T,
214 pub clock: VectorClock,
215 pub timestamp: u64,
216}
217
218impl<T> VersionedValue<T> {
219 pub fn new(value: T, clock: VectorClock) -> Self {
221 let timestamp = std::time::SystemTime::now()
222 .duration_since(std::time::UNIX_EPOCH)
223 .unwrap_or_default()
224 .as_millis() as u64;
225
226 Self {
227 value,
228 clock,
229 timestamp,
230 }
231 }
232
233 pub fn with_timestamp(value: T, clock: VectorClock, timestamp: u64) -> Self {
235 Self {
236 value,
237 clock,
238 timestamp,
239 }
240 }
241
242 pub fn happened_before(&self, other: &VersionedValue<T>) -> bool {
244 self.clock.happened_before(&other.clock)
245 }
246
247 pub fn happened_after(&self, other: &VersionedValue<T>) -> bool {
249 self.clock.happened_after(&other.clock)
250 }
251
252 pub fn is_concurrent(&self, other: &VersionedValue<T>) -> bool {
254 self.clock.is_concurrent(&other.clock)
255 }
256}
257
258#[derive(Debug, Clone, Serialize, Deserialize)]
264pub struct HybridClock {
265 physical: u64,
266 logical: u32,
267 node_id: String,
268}
269
270impl HybridClock {
271 pub fn new(node_id: impl Into<String>) -> Self {
273 Self {
274 physical: Self::now(),
275 logical: 0,
276 node_id: node_id.into(),
277 }
278 }
279
280 fn now() -> u64 {
282 std::time::SystemTime::now()
283 .duration_since(std::time::UNIX_EPOCH)
284 .unwrap_or_default()
285 .as_millis() as u64
286 }
287
288 pub fn tick(&mut self) -> HybridTimestamp {
290 let now = Self::now();
291
292 if now > self.physical {
293 self.physical = now;
294 self.logical = 0;
295 } else {
296 self.logical += 1;
297 }
298
299 HybridTimestamp {
300 physical: self.physical,
301 logical: self.logical,
302 node_id: self.node_id.clone(),
303 }
304 }
305
306 pub fn receive(&mut self, other: &HybridTimestamp) -> HybridTimestamp {
308 let now = Self::now();
309
310 if now > self.physical && now > other.physical {
311 self.physical = now;
312 self.logical = 0;
313 } else if self.physical > other.physical {
314 self.logical += 1;
315 } else if other.physical > self.physical {
316 self.physical = other.physical;
317 self.logical = other.logical + 1;
318 } else {
319 self.logical = self.logical.max(other.logical) + 1;
321 }
322
323 HybridTimestamp {
324 physical: self.physical,
325 logical: self.logical,
326 node_id: self.node_id.clone(),
327 }
328 }
329
330 pub fn physical(&self) -> u64 {
332 self.physical
333 }
334
335 pub fn logical(&self) -> u32 {
337 self.logical
338 }
339}
340
341#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
347pub struct HybridTimestamp {
348 pub physical: u64,
349 pub logical: u32,
350 pub node_id: String,
351}
352
353impl HybridTimestamp {
354 pub fn new(physical: u64, logical: u32, node_id: impl Into<String>) -> Self {
356 Self {
357 physical,
358 logical,
359 node_id: node_id.into(),
360 }
361 }
362
363 pub fn compare(&self, other: &HybridTimestamp) -> Ordering {
365 match self.physical.cmp(&other.physical) {
366 Ordering::Equal => match self.logical.cmp(&other.logical) {
367 Ordering::Equal => self.node_id.cmp(&other.node_id),
368 other => other,
369 },
370 other => other,
371 }
372 }
373
374 pub fn is_before(&self, other: &HybridTimestamp) -> bool {
376 self.compare(other) == Ordering::Less
377 }
378
379 pub fn is_after(&self, other: &HybridTimestamp) -> bool {
381 self.compare(other) == Ordering::Greater
382 }
383
384 pub fn to_sortable(&self) -> u128 {
386 ((self.physical as u128) << 32) | (self.logical as u128)
387 }
388}
389
390impl PartialOrd for HybridTimestamp {
391 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
392 Some(self.cmp(other))
393 }
394}
395
396impl Ord for HybridTimestamp {
397 fn cmp(&self, other: &Self) -> Ordering {
398 self.compare(other)
399 }
400}
401
402#[derive(Debug, Clone, Default, Serialize, Deserialize)]
408pub struct LamportClock {
409 counter: u64,
410}
411
412impl LamportClock {
413 pub fn new() -> Self {
415 Self { counter: 0 }
416 }
417
418 pub fn with_value(value: u64) -> Self {
420 Self { counter: value }
421 }
422
423 pub fn tick(&mut self) -> u64 {
425 self.counter += 1;
426 self.counter
427 }
428
429 pub fn receive(&mut self, received: u64) -> u64 {
431 self.counter = self.counter.max(received) + 1;
432 self.counter
433 }
434
435 pub fn value(&self) -> u64 {
437 self.counter
438 }
439
440 pub fn merge(&mut self, other: &LamportClock) {
442 self.counter = self.counter.max(other.counter);
443 }
444}
445
446#[derive(Debug, Clone, Default, Serialize, Deserialize)]
452pub struct DottedVersionVector {
453 base: VectorClock,
454 dots: HashMap<String, u64>,
455}
456
457impl DottedVersionVector {
458 pub fn new() -> Self {
460 Self {
461 base: VectorClock::new(),
462 dots: HashMap::new(),
463 }
464 }
465
466 pub fn event(&mut self, node_id: &NodeId) -> (String, u64) {
468 let key = node_id.as_str().to_string();
469 let value = self.base.get(node_id) + 1;
470 self.dots.insert(key.clone(), value);
471 (key, value)
472 }
473
474 pub fn sync(&mut self, node_id: &NodeId) {
476 let key = node_id.as_str();
477 if let Some(&dot) = self.dots.get(key) {
478 let base_value = self.base.get(node_id);
479 if dot == base_value + 1 {
480 self.base.set(node_id, dot);
481 self.dots.remove(key);
482 }
483 }
484 }
485
486 pub fn sync_all(&mut self) {
488 let nodes: Vec<_> = self.dots.keys().cloned().collect();
489 for node in nodes {
490 self.sync(&NodeId::new(&node));
491 }
492 }
493
494 pub fn merge(&mut self, other: &DottedVersionVector) {
496 self.base.merge(&other.base);
497
498 for (node, &value) in &other.dots {
499 let current = self.dots.entry(node.clone()).or_insert(0);
500 *current = (*current).max(value);
501 }
502 }
503
504 pub fn dominates(&self, other: &DottedVersionVector) -> bool {
506 for (node, &value) in &other.base.clocks {
508 let our_base = self.base.clocks.get(node).copied().unwrap_or(0);
509 let our_dot = self.dots.get(node).copied().unwrap_or(0);
510 if our_base.max(our_dot) < value {
511 return false;
512 }
513 }
514
515 for (node, &value) in &other.dots {
516 let our_base = self.base.clocks.get(node).copied().unwrap_or(0);
517 let our_dot = self.dots.get(node).copied().unwrap_or(0);
518 if our_base.max(our_dot) < value {
519 return false;
520 }
521 }
522
523 true
524 }
525
526 pub fn base(&self) -> &VectorClock {
528 &self.base
529 }
530
531 pub fn dots(&self) -> &HashMap<String, u64> {
533 &self.dots
534 }
535}
536
537#[cfg(test)]
542mod tests {
543 use super::*;
544
545 #[test]
546 fn test_vector_clock_basic() {
547 let mut clock = VectorClock::new();
548 let node_a = NodeId::new("A");
549 let node_b = NodeId::new("B");
550
551 clock.increment(&node_a);
552 clock.increment(&node_a);
553 clock.increment(&node_b);
554
555 assert_eq!(clock.get(&node_a), 2);
556 assert_eq!(clock.get(&node_b), 1);
557 assert_eq!(clock.node_count(), 2);
558 }
559
560 #[test]
561 fn test_vector_clock_happened_before() {
562 let node_a = NodeId::new("A");
563 let node_b = NodeId::new("B");
564
565 let mut clock1 = VectorClock::new();
566 clock1.set(&node_a, 1);
567 clock1.set(&node_b, 1);
568
569 let mut clock2 = VectorClock::new();
570 clock2.set(&node_a, 2);
571 clock2.set(&node_b, 2);
572
573 assert!(clock1.happened_before(&clock2));
574 assert!(!clock2.happened_before(&clock1));
575 }
576
577 #[test]
578 fn test_vector_clock_concurrent() {
579 let node_a = NodeId::new("A");
580 let node_b = NodeId::new("B");
581
582 let mut clock1 = VectorClock::new();
583 clock1.set(&node_a, 2);
584 clock1.set(&node_b, 1);
585
586 let mut clock2 = VectorClock::new();
587 clock2.set(&node_a, 1);
588 clock2.set(&node_b, 2);
589
590 assert!(clock1.is_concurrent(&clock2));
591 assert!(!clock1.happened_before(&clock2));
592 assert!(!clock2.happened_before(&clock1));
593 }
594
595 #[test]
596 fn test_vector_clock_merge() {
597 let node_a = NodeId::new("A");
598 let node_b = NodeId::new("B");
599
600 let mut clock1 = VectorClock::new();
601 clock1.set(&node_a, 2);
602 clock1.set(&node_b, 1);
603
604 let mut clock2 = VectorClock::new();
605 clock2.set(&node_a, 1);
606 clock2.set(&node_b, 3);
607
608 clock1.merge(&clock2);
609
610 assert_eq!(clock1.get(&node_a), 2);
611 assert_eq!(clock1.get(&node_b), 3);
612 }
613
614 #[test]
615 fn test_vector_clock_compare() {
616 let node_a = NodeId::new("A");
617
618 let mut clock1 = VectorClock::new();
619 clock1.set(&node_a, 1);
620
621 let mut clock2 = VectorClock::new();
622 clock2.set(&node_a, 2);
623
624 let clock3 = clock1.clone();
625
626 assert_eq!(clock1.compare(&clock2), VectorClockOrdering::Before);
627 assert_eq!(clock2.compare(&clock1), VectorClockOrdering::After);
628 assert_eq!(clock1.compare(&clock3), VectorClockOrdering::Equal);
629 }
630
631 #[test]
632 fn test_versioned_value() {
633 let node_a = NodeId::new("A");
634
635 let mut clock1 = VectorClock::new();
636 clock1.set(&node_a, 1);
637
638 let mut clock2 = VectorClock::new();
639 clock2.set(&node_a, 2);
640
641 let v1 = VersionedValue::new("value1", clock1);
642 let v2 = VersionedValue::new("value2", clock2);
643
644 assert!(v1.happened_before(&v2));
645 assert!(!v1.is_concurrent(&v2));
646 }
647
648 #[test]
649 fn test_hybrid_clock() {
650 let mut clock = HybridClock::new("node1");
651
652 let ts1 = clock.tick();
653 let ts2 = clock.tick();
654
655 assert!(ts1.is_before(&ts2));
656 assert!(ts2.is_after(&ts1));
657 }
658
659 #[test]
660 fn test_hybrid_clock_receive() {
661 let mut clock1 = HybridClock::new("node1");
662 let mut clock2 = HybridClock::new("node2");
663
664 let ts1 = clock1.tick();
665 let ts2 = clock2.receive(&ts1);
666
667 assert!(ts1.is_before(&ts2));
668 }
669
670 #[test]
671 fn test_hybrid_timestamp_ordering() {
672 let ts1 = HybridTimestamp::new(100, 0, "A");
673 let ts2 = HybridTimestamp::new(100, 1, "A");
674 let ts3 = HybridTimestamp::new(101, 0, "A");
675
676 assert!(ts1 < ts2);
677 assert!(ts2 < ts3);
678 assert!(ts1 < ts3);
679 }
680
681 #[test]
682 fn test_lamport_clock() {
683 let mut clock = LamportClock::new();
684
685 assert_eq!(clock.tick(), 1);
686 assert_eq!(clock.tick(), 2);
687
688 let received = clock.receive(10);
689 assert_eq!(received, 11);
690
691 assert_eq!(clock.tick(), 12);
692 }
693
694 #[test]
695 fn test_lamport_clock_merge() {
696 let mut clock1 = LamportClock::with_value(5);
697 let clock2 = LamportClock::with_value(10);
698
699 clock1.merge(&clock2);
700 assert_eq!(clock1.value(), 10);
701 }
702
703 #[test]
704 fn test_dotted_version_vector() {
705 let mut dvv = DottedVersionVector::new();
706 let node_a = NodeId::new("A");
707
708 let (node, version) = dvv.event(&node_a);
709 assert_eq!(node, "A");
710 assert_eq!(version, 1);
711
712 dvv.sync(&node_a);
713 assert_eq!(dvv.base().get(&node_a), 1);
714 }
715
716 #[test]
717 fn test_dvv_merge() {
718 let mut dvv1 = DottedVersionVector::new();
719 let mut dvv2 = DottedVersionVector::new();
720
721 let node_a = NodeId::new("A");
722 let node_b = NodeId::new("B");
723
724 dvv1.event(&node_a);
725 dvv1.sync(&node_a);
726
727 dvv2.event(&node_b);
728 dvv2.sync(&node_b);
729
730 dvv1.merge(&dvv2);
731
732 assert_eq!(dvv1.base().get(&node_a), 1);
733 assert_eq!(dvv1.base().get(&node_b), 1);
734 }
735
736 #[test]
737 fn test_vector_clock_partial_ord() {
738 let node_a = NodeId::new("A");
739
740 let mut clock1 = VectorClock::new();
741 clock1.set(&node_a, 1);
742
743 let mut clock2 = VectorClock::new();
744 clock2.set(&node_a, 2);
745
746 assert!(clock1 < clock2);
747 assert!(clock2 > clock1);
748 }
749
750 #[test]
751 fn test_vector_clock_is_empty() {
752 let clock1 = VectorClock::new();
753 assert!(clock1.is_empty());
754
755 let mut clock2 = VectorClock::new();
756 clock2.increment(&NodeId::new("A"));
757 assert!(!clock2.is_empty());
758 }
759
760 #[test]
761 fn test_vector_clock_reset() {
762 let mut clock = VectorClock::new();
763 clock.increment(&NodeId::new("A"));
764 clock.increment(&NodeId::new("B"));
765
766 assert!(!clock.is_empty());
767
768 clock.reset();
769 assert!(clock.is_empty());
770 assert_eq!(clock.node_count(), 2);
771 }
772}