1use crate::node::NodeId;
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::cmp::Ordering;
12
13#[derive(Debug, Clone, Default, Serialize, Deserialize)]
19pub struct VectorClock {
20 clocks: HashMap<String, u64>,
21}
22
23impl VectorClock {
24 pub fn new() -> Self {
26 Self {
27 clocks: HashMap::new(),
28 }
29 }
30
31 pub fn with_node(node_id: &NodeId) -> Self {
33 let mut clocks = HashMap::new();
34 clocks.insert(node_id.as_str().to_string(), 0);
35 Self { clocks }
36 }
37
38 pub fn increment(&mut self, node_id: &NodeId) {
40 let key = node_id.as_str().to_string();
41 *self.clocks.entry(key).or_insert(0) += 1;
42 }
43
44 pub fn get(&self, node_id: &NodeId) -> u64 {
46 self.clocks
47 .get(node_id.as_str())
48 .copied()
49 .unwrap_or(0)
50 }
51
52 pub fn set(&mut self, node_id: &NodeId, value: u64) {
54 self.clocks.insert(node_id.as_str().to_string(), value);
55 }
56
57 pub fn merge(&mut self, other: &VectorClock) {
59 for (node, &value) in &other.clocks {
60 let current = self.clocks.entry(node.clone()).or_insert(0);
61 *current = (*current).max(value);
62 }
63 }
64
65 pub fn merged(&self, other: &VectorClock) -> VectorClock {
67 let mut result = self.clone();
68 result.merge(other);
69 result
70 }
71
72 pub fn happened_before(&self, other: &VectorClock) -> bool {
74 let mut dominated = false;
75
76 for (node, &value) in &self.clocks {
78 let other_value = other.clocks.get(node).copied().unwrap_or(0);
79 if value > other_value {
80 return false;
81 }
82 if value < other_value {
83 dominated = true;
84 }
85 }
86
87 for (node, &value) in &other.clocks {
89 if !self.clocks.contains_key(node) && value > 0 {
90 dominated = true;
91 }
92 }
93
94 dominated
95 }
96
97 pub fn happened_after(&self, other: &VectorClock) -> bool {
99 other.happened_before(self)
100 }
101
102 pub fn is_concurrent(&self, other: &VectorClock) -> bool {
104 !self.happened_before(other) && !self.happened_after(other)
105 }
106
107 pub fn equals(&self, other: &VectorClock) -> bool {
109 if self.clocks.len() != other.clocks.len() {
110 return false;
111 }
112
113 for (node, &value) in &self.clocks {
114 if other.clocks.get(node).copied().unwrap_or(0) != value {
115 return false;
116 }
117 }
118
119 true
120 }
121
122 pub fn compare(&self, other: &VectorClock) -> VectorClockOrdering {
124 if self.equals(other) {
125 VectorClockOrdering::Equal
126 } else if self.happened_before(other) {
127 VectorClockOrdering::Before
128 } else if self.happened_after(other) {
129 VectorClockOrdering::After
130 } else {
131 VectorClockOrdering::Concurrent
132 }
133 }
134
135 pub fn node_count(&self) -> usize {
137 self.clocks.len()
138 }
139
140 pub fn nodes(&self) -> Vec<String> {
142 self.clocks.keys().cloned().collect()
143 }
144
145 pub fn max_value(&self) -> u64 {
147 self.clocks.values().copied().max().unwrap_or(0)
148 }
149
150 pub fn sum(&self) -> u64 {
152 self.clocks.values().sum()
153 }
154
155 pub fn is_empty(&self) -> bool {
157 self.clocks.values().all(|&v| v == 0)
158 }
159
160 pub fn reset(&mut self) {
162 for value in self.clocks.values_mut() {
163 *value = 0;
164 }
165 }
166
167 pub fn remove(&mut self, node_id: &NodeId) {
169 self.clocks.remove(node_id.as_str());
170 }
171}
172
173impl PartialEq for VectorClock {
174 fn eq(&self, other: &Self) -> bool {
175 self.equals(other)
176 }
177}
178
179impl Eq for VectorClock {}
180
181impl PartialOrd for VectorClock {
182 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
183 match self.compare(other) {
184 VectorClockOrdering::Equal => Some(Ordering::Equal),
185 VectorClockOrdering::Before => Some(Ordering::Less),
186 VectorClockOrdering::After => Some(Ordering::Greater),
187 VectorClockOrdering::Concurrent => None,
188 }
189 }
190}
191
192#[derive(Debug, Clone, Copy, PartialEq, Eq)]
198pub enum VectorClockOrdering {
199 Before,
201 After,
203 Equal,
205 Concurrent,
207}
208
209#[derive(Debug, Clone, Serialize, Deserialize)]
215pub struct VersionedValue<T> {
216 pub value: T,
217 pub clock: VectorClock,
218 pub timestamp: u64,
219}
220
221impl<T> VersionedValue<T> {
222 pub fn new(value: T, clock: VectorClock) -> Self {
224 let timestamp = std::time::SystemTime::now()
225 .duration_since(std::time::UNIX_EPOCH)
226 .unwrap_or_default()
227 .as_millis() as u64;
228
229 Self {
230 value,
231 clock,
232 timestamp,
233 }
234 }
235
236 pub fn with_timestamp(value: T, clock: VectorClock, timestamp: u64) -> Self {
238 Self {
239 value,
240 clock,
241 timestamp,
242 }
243 }
244
245 pub fn happened_before(&self, other: &VersionedValue<T>) -> bool {
247 self.clock.happened_before(&other.clock)
248 }
249
250 pub fn happened_after(&self, other: &VersionedValue<T>) -> bool {
252 self.clock.happened_after(&other.clock)
253 }
254
255 pub fn is_concurrent(&self, other: &VersionedValue<T>) -> bool {
257 self.clock.is_concurrent(&other.clock)
258 }
259}
260
261#[derive(Debug, Clone, Serialize, Deserialize)]
267pub struct HybridClock {
268 physical: u64,
269 logical: u32,
270 node_id: String,
271}
272
273impl HybridClock {
274 pub fn new(node_id: impl Into<String>) -> Self {
276 Self {
277 physical: Self::now(),
278 logical: 0,
279 node_id: node_id.into(),
280 }
281 }
282
283 fn now() -> u64 {
285 std::time::SystemTime::now()
286 .duration_since(std::time::UNIX_EPOCH)
287 .unwrap_or_default()
288 .as_millis() as u64
289 }
290
291 pub fn tick(&mut self) -> HybridTimestamp {
293 let now = Self::now();
294
295 if now > self.physical {
296 self.physical = now;
297 self.logical = 0;
298 } else {
299 self.logical += 1;
300 }
301
302 HybridTimestamp {
303 physical: self.physical,
304 logical: self.logical,
305 node_id: self.node_id.clone(),
306 }
307 }
308
309 pub fn receive(&mut self, other: &HybridTimestamp) -> HybridTimestamp {
311 let now = Self::now();
312
313 if now > self.physical && now > other.physical {
314 self.physical = now;
315 self.logical = 0;
316 } else if self.physical > other.physical {
317 self.logical += 1;
318 } else if other.physical > self.physical {
319 self.physical = other.physical;
320 self.logical = other.logical + 1;
321 } else {
322 self.logical = self.logical.max(other.logical) + 1;
324 }
325
326 HybridTimestamp {
327 physical: self.physical,
328 logical: self.logical,
329 node_id: self.node_id.clone(),
330 }
331 }
332
333 pub fn physical(&self) -> u64 {
335 self.physical
336 }
337
338 pub fn logical(&self) -> u32 {
340 self.logical
341 }
342}
343
344#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
350pub struct HybridTimestamp {
351 pub physical: u64,
352 pub logical: u32,
353 pub node_id: String,
354}
355
356impl HybridTimestamp {
357 pub fn new(physical: u64, logical: u32, node_id: impl Into<String>) -> Self {
359 Self {
360 physical,
361 logical,
362 node_id: node_id.into(),
363 }
364 }
365
366 pub fn compare(&self, other: &HybridTimestamp) -> Ordering {
368 match self.physical.cmp(&other.physical) {
369 Ordering::Equal => match self.logical.cmp(&other.logical) {
370 Ordering::Equal => self.node_id.cmp(&other.node_id),
371 other => other,
372 },
373 other => other,
374 }
375 }
376
377 pub fn is_before(&self, other: &HybridTimestamp) -> bool {
379 self.compare(other) == Ordering::Less
380 }
381
382 pub fn is_after(&self, other: &HybridTimestamp) -> bool {
384 self.compare(other) == Ordering::Greater
385 }
386
387 pub fn to_sortable(&self) -> u128 {
389 ((self.physical as u128) << 32) | (self.logical as u128)
390 }
391}
392
393impl PartialOrd for HybridTimestamp {
394 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
395 Some(self.cmp(other))
396 }
397}
398
399impl Ord for HybridTimestamp {
400 fn cmp(&self, other: &Self) -> Ordering {
401 self.compare(other)
402 }
403}
404
405#[derive(Debug, Clone, Default, Serialize, Deserialize)]
411pub struct LamportClock {
412 counter: u64,
413}
414
415impl LamportClock {
416 pub fn new() -> Self {
418 Self { counter: 0 }
419 }
420
421 pub fn with_value(value: u64) -> Self {
423 Self { counter: value }
424 }
425
426 pub fn tick(&mut self) -> u64 {
428 self.counter += 1;
429 self.counter
430 }
431
432 pub fn receive(&mut self, received: u64) -> u64 {
434 self.counter = self.counter.max(received) + 1;
435 self.counter
436 }
437
438 pub fn value(&self) -> u64 {
440 self.counter
441 }
442
443 pub fn merge(&mut self, other: &LamportClock) {
445 self.counter = self.counter.max(other.counter);
446 }
447}
448
449#[derive(Debug, Clone, Default, Serialize, Deserialize)]
455pub struct DottedVersionVector {
456 base: VectorClock,
457 dots: HashMap<String, u64>,
458}
459
460impl DottedVersionVector {
461 pub fn new() -> Self {
463 Self {
464 base: VectorClock::new(),
465 dots: HashMap::new(),
466 }
467 }
468
469 pub fn event(&mut self, node_id: &NodeId) -> (String, u64) {
471 let key = node_id.as_str().to_string();
472 let value = self.base.get(node_id) + 1;
473 self.dots.insert(key.clone(), value);
474 (key, value)
475 }
476
477 pub fn sync(&mut self, node_id: &NodeId) {
479 let key = node_id.as_str();
480 if let Some(&dot) = self.dots.get(key) {
481 let base_value = self.base.get(node_id);
482 if dot == base_value + 1 {
483 self.base.set(node_id, dot);
484 self.dots.remove(key);
485 }
486 }
487 }
488
489 pub fn sync_all(&mut self) {
491 let nodes: Vec<_> = self.dots.keys().cloned().collect();
492 for node in nodes {
493 self.sync(&NodeId::new(&node));
494 }
495 }
496
497 pub fn merge(&mut self, other: &DottedVersionVector) {
499 self.base.merge(&other.base);
500
501 for (node, &value) in &other.dots {
502 let current = self.dots.entry(node.clone()).or_insert(0);
503 *current = (*current).max(value);
504 }
505 }
506
507 pub fn dominates(&self, other: &DottedVersionVector) -> bool {
509 for (node, &value) in &other.base.clocks {
511 let our_base = self.base.clocks.get(node).copied().unwrap_or(0);
512 let our_dot = self.dots.get(node).copied().unwrap_or(0);
513 if our_base.max(our_dot) < value {
514 return false;
515 }
516 }
517
518 for (node, &value) in &other.dots {
519 let our_base = self.base.clocks.get(node).copied().unwrap_or(0);
520 let our_dot = self.dots.get(node).copied().unwrap_or(0);
521 if our_base.max(our_dot) < value {
522 return false;
523 }
524 }
525
526 true
527 }
528
529 pub fn base(&self) -> &VectorClock {
531 &self.base
532 }
533
534 pub fn dots(&self) -> &HashMap<String, u64> {
536 &self.dots
537 }
538}
539
540#[cfg(test)]
545mod tests {
546 use super::*;
547
548 #[test]
549 fn test_vector_clock_basic() {
550 let mut clock = VectorClock::new();
551 let node_a = NodeId::new("A");
552 let node_b = NodeId::new("B");
553
554 clock.increment(&node_a);
555 clock.increment(&node_a);
556 clock.increment(&node_b);
557
558 assert_eq!(clock.get(&node_a), 2);
559 assert_eq!(clock.get(&node_b), 1);
560 assert_eq!(clock.node_count(), 2);
561 }
562
563 #[test]
564 fn test_vector_clock_happened_before() {
565 let node_a = NodeId::new("A");
566 let node_b = NodeId::new("B");
567
568 let mut clock1 = VectorClock::new();
569 clock1.set(&node_a, 1);
570 clock1.set(&node_b, 1);
571
572 let mut clock2 = VectorClock::new();
573 clock2.set(&node_a, 2);
574 clock2.set(&node_b, 2);
575
576 assert!(clock1.happened_before(&clock2));
577 assert!(!clock2.happened_before(&clock1));
578 }
579
580 #[test]
581 fn test_vector_clock_concurrent() {
582 let node_a = NodeId::new("A");
583 let node_b = NodeId::new("B");
584
585 let mut clock1 = VectorClock::new();
586 clock1.set(&node_a, 2);
587 clock1.set(&node_b, 1);
588
589 let mut clock2 = VectorClock::new();
590 clock2.set(&node_a, 1);
591 clock2.set(&node_b, 2);
592
593 assert!(clock1.is_concurrent(&clock2));
594 assert!(!clock1.happened_before(&clock2));
595 assert!(!clock2.happened_before(&clock1));
596 }
597
598 #[test]
599 fn test_vector_clock_merge() {
600 let node_a = NodeId::new("A");
601 let node_b = NodeId::new("B");
602
603 let mut clock1 = VectorClock::new();
604 clock1.set(&node_a, 2);
605 clock1.set(&node_b, 1);
606
607 let mut clock2 = VectorClock::new();
608 clock2.set(&node_a, 1);
609 clock2.set(&node_b, 3);
610
611 clock1.merge(&clock2);
612
613 assert_eq!(clock1.get(&node_a), 2);
614 assert_eq!(clock1.get(&node_b), 3);
615 }
616
617 #[test]
618 fn test_vector_clock_compare() {
619 let node_a = NodeId::new("A");
620
621 let mut clock1 = VectorClock::new();
622 clock1.set(&node_a, 1);
623
624 let mut clock2 = VectorClock::new();
625 clock2.set(&node_a, 2);
626
627 let clock3 = clock1.clone();
628
629 assert_eq!(clock1.compare(&clock2), VectorClockOrdering::Before);
630 assert_eq!(clock2.compare(&clock1), VectorClockOrdering::After);
631 assert_eq!(clock1.compare(&clock3), VectorClockOrdering::Equal);
632 }
633
634 #[test]
635 fn test_versioned_value() {
636 let node_a = NodeId::new("A");
637
638 let mut clock1 = VectorClock::new();
639 clock1.set(&node_a, 1);
640
641 let mut clock2 = VectorClock::new();
642 clock2.set(&node_a, 2);
643
644 let v1 = VersionedValue::new("value1", clock1);
645 let v2 = VersionedValue::new("value2", clock2);
646
647 assert!(v1.happened_before(&v2));
648 assert!(!v1.is_concurrent(&v2));
649 }
650
651 #[test]
652 fn test_hybrid_clock() {
653 let mut clock = HybridClock::new("node1");
654
655 let ts1 = clock.tick();
656 let ts2 = clock.tick();
657
658 assert!(ts1.is_before(&ts2));
659 assert!(ts2.is_after(&ts1));
660 }
661
662 #[test]
663 fn test_hybrid_clock_receive() {
664 let mut clock1 = HybridClock::new("node1");
665 let mut clock2 = HybridClock::new("node2");
666
667 let ts1 = clock1.tick();
668 let ts2 = clock2.receive(&ts1);
669
670 assert!(ts1.is_before(&ts2));
671 }
672
673 #[test]
674 fn test_hybrid_timestamp_ordering() {
675 let ts1 = HybridTimestamp::new(100, 0, "A");
676 let ts2 = HybridTimestamp::new(100, 1, "A");
677 let ts3 = HybridTimestamp::new(101, 0, "A");
678
679 assert!(ts1 < ts2);
680 assert!(ts2 < ts3);
681 assert!(ts1 < ts3);
682 }
683
684 #[test]
685 fn test_lamport_clock() {
686 let mut clock = LamportClock::new();
687
688 assert_eq!(clock.tick(), 1);
689 assert_eq!(clock.tick(), 2);
690
691 let received = clock.receive(10);
692 assert_eq!(received, 11);
693
694 assert_eq!(clock.tick(), 12);
695 }
696
697 #[test]
698 fn test_lamport_clock_merge() {
699 let mut clock1 = LamportClock::with_value(5);
700 let clock2 = LamportClock::with_value(10);
701
702 clock1.merge(&clock2);
703 assert_eq!(clock1.value(), 10);
704 }
705
706 #[test]
707 fn test_dotted_version_vector() {
708 let mut dvv = DottedVersionVector::new();
709 let node_a = NodeId::new("A");
710
711 let (node, version) = dvv.event(&node_a);
712 assert_eq!(node, "A");
713 assert_eq!(version, 1);
714
715 dvv.sync(&node_a);
716 assert_eq!(dvv.base().get(&node_a), 1);
717 }
718
719 #[test]
720 fn test_dvv_merge() {
721 let mut dvv1 = DottedVersionVector::new();
722 let mut dvv2 = DottedVersionVector::new();
723
724 let node_a = NodeId::new("A");
725 let node_b = NodeId::new("B");
726
727 dvv1.event(&node_a);
728 dvv1.sync(&node_a);
729
730 dvv2.event(&node_b);
731 dvv2.sync(&node_b);
732
733 dvv1.merge(&dvv2);
734
735 assert_eq!(dvv1.base().get(&node_a), 1);
736 assert_eq!(dvv1.base().get(&node_b), 1);
737 }
738
739 #[test]
740 fn test_vector_clock_partial_ord() {
741 let node_a = NodeId::new("A");
742
743 let mut clock1 = VectorClock::new();
744 clock1.set(&node_a, 1);
745
746 let mut clock2 = VectorClock::new();
747 clock2.set(&node_a, 2);
748
749 assert!(clock1 < clock2);
750 assert!(clock2 > clock1);
751 }
752
753 #[test]
754 fn test_vector_clock_is_empty() {
755 let clock1 = VectorClock::new();
756 assert!(clock1.is_empty());
757
758 let mut clock2 = VectorClock::new();
759 clock2.increment(&NodeId::new("A"));
760 assert!(!clock2.is_empty());
761 }
762
763 #[test]
764 fn test_vector_clock_reset() {
765 let mut clock = VectorClock::new();
766 clock.increment(&NodeId::new("A"));
767 clock.increment(&NodeId::new("B"));
768
769 assert!(!clock.is_empty());
770
771 clock.reset();
772 assert!(clock.is_empty());
773 assert_eq!(clock.node_count(), 2);
774 }
775}