aegis_replication/
vector_clock.rs

1//! Aegis Vector Clocks
2//!
3//! Vector clocks for tracking causality and ordering in distributed systems.
4//!
5//! @version 0.1.0
6//! @author AutomataNexus Development Team
7
8use crate::node::NodeId;
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::cmp::Ordering;
12
13// =============================================================================
14// Vector Clock
15// =============================================================================
16
17/// A vector clock for tracking causality between events.
18#[derive(Debug, Clone, Default, Serialize, Deserialize)]
19pub struct VectorClock {
20    clocks: HashMap<String, u64>,
21}
22
23impl VectorClock {
24    /// Create a new empty vector clock.
25    pub fn new() -> Self {
26        Self {
27            clocks: HashMap::new(),
28        }
29    }
30
31    /// Create a vector clock with initial node.
32    pub fn with_node(node_id: &NodeId) -> Self {
33        let mut clocks = HashMap::new();
34        clocks.insert(node_id.as_str().to_string(), 0);
35        Self { clocks }
36    }
37
38    /// Increment the clock for a node.
39    pub fn increment(&mut self, node_id: &NodeId) {
40        let key = node_id.as_str().to_string();
41        *self.clocks.entry(key).or_insert(0) += 1;
42    }
43
44    /// Get the clock value for a node.
45    pub fn get(&self, node_id: &NodeId) -> u64 {
46        self.clocks
47            .get(node_id.as_str())
48            .copied()
49            .unwrap_or(0)
50    }
51
52    /// Set the clock value for a node.
53    pub fn set(&mut self, node_id: &NodeId, value: u64) {
54        self.clocks.insert(node_id.as_str().to_string(), value);
55    }
56
57    /// Merge with another vector clock (take maximum of each component).
58    pub fn merge(&mut self, other: &VectorClock) {
59        for (node, &value) in &other.clocks {
60            let current = self.clocks.entry(node.clone()).or_insert(0);
61            *current = (*current).max(value);
62        }
63    }
64
65    /// Create a merged clock without modifying self.
66    pub fn merged(&self, other: &VectorClock) -> VectorClock {
67        let mut result = self.clone();
68        result.merge(other);
69        result
70    }
71
72    /// Check if this clock happened before another.
73    pub fn happened_before(&self, other: &VectorClock) -> bool {
74        let mut dominated = false;
75
76        // All our values must be <= other's values
77        for (node, &value) in &self.clocks {
78            let other_value = other.clocks.get(node).copied().unwrap_or(0);
79            if value > other_value {
80                return false;
81            }
82            if value < other_value {
83                dominated = true;
84            }
85        }
86
87        // Check for nodes in other that we don't have
88        for (node, &value) in &other.clocks {
89            if !self.clocks.contains_key(node) && value > 0 {
90                dominated = true;
91            }
92        }
93
94        dominated
95    }
96
97    /// Check if this clock happened after another.
98    pub fn happened_after(&self, other: &VectorClock) -> bool {
99        other.happened_before(self)
100    }
101
102    /// Check if two clocks are concurrent (neither happened before the other).
103    pub fn is_concurrent(&self, other: &VectorClock) -> bool {
104        !self.happened_before(other) && !self.happened_after(other)
105    }
106
107    /// Check if two clocks are identical.
108    pub fn equals(&self, other: &VectorClock) -> bool {
109        if self.clocks.len() != other.clocks.len() {
110            return false;
111        }
112
113        for (node, &value) in &self.clocks {
114            if other.clocks.get(node).copied().unwrap_or(0) != value {
115                return false;
116            }
117        }
118
119        true
120    }
121
122    /// Compare two vector clocks.
123    pub fn compare(&self, other: &VectorClock) -> VectorClockOrdering {
124        if self.equals(other) {
125            VectorClockOrdering::Equal
126        } else if self.happened_before(other) {
127            VectorClockOrdering::Before
128        } else if self.happened_after(other) {
129            VectorClockOrdering::After
130        } else {
131            VectorClockOrdering::Concurrent
132        }
133    }
134
135    /// Get the number of nodes tracked.
136    pub fn node_count(&self) -> usize {
137        self.clocks.len()
138    }
139
140    /// Get all nodes in this clock.
141    pub fn nodes(&self) -> Vec<String> {
142        self.clocks.keys().cloned().collect()
143    }
144
145    /// Get the maximum clock value across all nodes.
146    pub fn max_value(&self) -> u64 {
147        self.clocks.values().copied().max().unwrap_or(0)
148    }
149
150    /// Get the sum of all clock values.
151    pub fn sum(&self) -> u64 {
152        self.clocks.values().sum()
153    }
154
155    /// Check if the clock is empty (all zeros or no entries).
156    pub fn is_empty(&self) -> bool {
157        self.clocks.values().all(|&v| v == 0)
158    }
159
160    /// Reset all clock values to zero.
161    pub fn reset(&mut self) {
162        for value in self.clocks.values_mut() {
163            *value = 0;
164        }
165    }
166
167    /// Remove a node from the clock.
168    pub fn remove(&mut self, node_id: &NodeId) {
169        self.clocks.remove(node_id.as_str());
170    }
171}
172
173impl PartialEq for VectorClock {
174    fn eq(&self, other: &Self) -> bool {
175        self.equals(other)
176    }
177}
178
179impl Eq for VectorClock {}
180
181impl PartialOrd for VectorClock {
182    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
183        match self.compare(other) {
184            VectorClockOrdering::Equal => Some(Ordering::Equal),
185            VectorClockOrdering::Before => Some(Ordering::Less),
186            VectorClockOrdering::After => Some(Ordering::Greater),
187            VectorClockOrdering::Concurrent => None,
188        }
189    }
190}
191
192// =============================================================================
193// Vector Clock Ordering
194// =============================================================================
195
196/// Result of comparing two vector clocks.
197#[derive(Debug, Clone, Copy, PartialEq, Eq)]
198pub enum VectorClockOrdering {
199    /// First clock happened before second.
200    Before,
201    /// First clock happened after second.
202    After,
203    /// Clocks are equal.
204    Equal,
205    /// Clocks are concurrent (incomparable).
206    Concurrent,
207}
208
209// =============================================================================
210// Versioned Value
211// =============================================================================
212
213/// A value with an associated vector clock version.
214#[derive(Debug, Clone, Serialize, Deserialize)]
215pub struct VersionedValue<T> {
216    pub value: T,
217    pub clock: VectorClock,
218    pub timestamp: u64,
219}
220
221impl<T> VersionedValue<T> {
222    /// Create a new versioned value.
223    pub fn new(value: T, clock: VectorClock) -> Self {
224        let timestamp = std::time::SystemTime::now()
225            .duration_since(std::time::UNIX_EPOCH)
226            .unwrap_or_default()
227            .as_millis() as u64;
228
229        Self {
230            value,
231            clock,
232            timestamp,
233        }
234    }
235
236    /// Create with a specific timestamp.
237    pub fn with_timestamp(value: T, clock: VectorClock, timestamp: u64) -> Self {
238        Self {
239            value,
240            clock,
241            timestamp,
242        }
243    }
244
245    /// Check if this version happened before another.
246    pub fn happened_before(&self, other: &VersionedValue<T>) -> bool {
247        self.clock.happened_before(&other.clock)
248    }
249
250    /// Check if this version happened after another.
251    pub fn happened_after(&self, other: &VersionedValue<T>) -> bool {
252        self.clock.happened_after(&other.clock)
253    }
254
255    /// Check if this version is concurrent with another.
256    pub fn is_concurrent(&self, other: &VersionedValue<T>) -> bool {
257        self.clock.is_concurrent(&other.clock)
258    }
259}
260
261// =============================================================================
262// Hybrid Logical Clock
263// =============================================================================
264
265/// A hybrid logical clock combining physical time with logical counters.
266#[derive(Debug, Clone, Serialize, Deserialize)]
267pub struct HybridClock {
268    physical: u64,
269    logical: u32,
270    node_id: String,
271}
272
273impl HybridClock {
274    /// Create a new hybrid clock.
275    pub fn new(node_id: impl Into<String>) -> Self {
276        Self {
277            physical: Self::now(),
278            logical: 0,
279            node_id: node_id.into(),
280        }
281    }
282
283    /// Get current physical time in milliseconds.
284    fn now() -> u64 {
285        std::time::SystemTime::now()
286            .duration_since(std::time::UNIX_EPOCH)
287            .unwrap_or_default()
288            .as_millis() as u64
289    }
290
291    /// Generate a new timestamp.
292    pub fn tick(&mut self) -> HybridTimestamp {
293        let now = Self::now();
294
295        if now > self.physical {
296            self.physical = now;
297            self.logical = 0;
298        } else {
299            self.logical += 1;
300        }
301
302        HybridTimestamp {
303            physical: self.physical,
304            logical: self.logical,
305            node_id: self.node_id.clone(),
306        }
307    }
308
309    /// Update from a received timestamp.
310    pub fn receive(&mut self, other: &HybridTimestamp) -> HybridTimestamp {
311        let now = Self::now();
312
313        if now > self.physical && now > other.physical {
314            self.physical = now;
315            self.logical = 0;
316        } else if self.physical > other.physical {
317            self.logical += 1;
318        } else if other.physical > self.physical {
319            self.physical = other.physical;
320            self.logical = other.logical + 1;
321        } else {
322            // physical times are equal
323            self.logical = self.logical.max(other.logical) + 1;
324        }
325
326        HybridTimestamp {
327            physical: self.physical,
328            logical: self.logical,
329            node_id: self.node_id.clone(),
330        }
331    }
332
333    /// Get current physical component.
334    pub fn physical(&self) -> u64 {
335        self.physical
336    }
337
338    /// Get current logical component.
339    pub fn logical(&self) -> u32 {
340        self.logical
341    }
342}
343
344// =============================================================================
345// Hybrid Timestamp
346// =============================================================================
347
348/// A timestamp from a hybrid logical clock.
349#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
350pub struct HybridTimestamp {
351    pub physical: u64,
352    pub logical: u32,
353    pub node_id: String,
354}
355
356impl HybridTimestamp {
357    /// Create a new hybrid timestamp.
358    pub fn new(physical: u64, logical: u32, node_id: impl Into<String>) -> Self {
359        Self {
360            physical,
361            logical,
362            node_id: node_id.into(),
363        }
364    }
365
366    /// Compare with another timestamp.
367    pub fn compare(&self, other: &HybridTimestamp) -> Ordering {
368        match self.physical.cmp(&other.physical) {
369            Ordering::Equal => match self.logical.cmp(&other.logical) {
370                Ordering::Equal => self.node_id.cmp(&other.node_id),
371                other => other,
372            },
373            other => other,
374        }
375    }
376
377    /// Check if this timestamp is before another.
378    pub fn is_before(&self, other: &HybridTimestamp) -> bool {
379        self.compare(other) == Ordering::Less
380    }
381
382    /// Check if this timestamp is after another.
383    pub fn is_after(&self, other: &HybridTimestamp) -> bool {
384        self.compare(other) == Ordering::Greater
385    }
386
387    /// Convert to a sortable u128 value.
388    pub fn to_sortable(&self) -> u128 {
389        ((self.physical as u128) << 32) | (self.logical as u128)
390    }
391}
392
393impl PartialOrd for HybridTimestamp {
394    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
395        Some(self.compare(other))
396    }
397}
398
399impl Ord for HybridTimestamp {
400    fn cmp(&self, other: &Self) -> Ordering {
401        self.compare(other)
402    }
403}
404
405// =============================================================================
406// Lamport Clock
407// =============================================================================
408
409/// A simple Lamport logical clock.
410#[derive(Debug, Clone, Default, Serialize, Deserialize)]
411pub struct LamportClock {
412    counter: u64,
413}
414
415impl LamportClock {
416    /// Create a new Lamport clock.
417    pub fn new() -> Self {
418        Self { counter: 0 }
419    }
420
421    /// Create with initial value.
422    pub fn with_value(value: u64) -> Self {
423        Self { counter: value }
424    }
425
426    /// Increment and return the new timestamp.
427    pub fn tick(&mut self) -> u64 {
428        self.counter += 1;
429        self.counter
430    }
431
432    /// Update from a received timestamp.
433    pub fn receive(&mut self, received: u64) -> u64 {
434        self.counter = self.counter.max(received) + 1;
435        self.counter
436    }
437
438    /// Get current value.
439    pub fn value(&self) -> u64 {
440        self.counter
441    }
442
443    /// Merge with another clock.
444    pub fn merge(&mut self, other: &LamportClock) {
445        self.counter = self.counter.max(other.counter);
446    }
447}
448
449// =============================================================================
450// Dotted Version Vector
451// =============================================================================
452
453/// A dotted version vector for tracking object versions.
454#[derive(Debug, Clone, Default, Serialize, Deserialize)]
455pub struct DottedVersionVector {
456    base: VectorClock,
457    dots: HashMap<String, u64>,
458}
459
460impl DottedVersionVector {
461    /// Create a new dotted version vector.
462    pub fn new() -> Self {
463        Self {
464            base: VectorClock::new(),
465            dots: HashMap::new(),
466        }
467    }
468
469    /// Create a new event at a node.
470    pub fn event(&mut self, node_id: &NodeId) -> (String, u64) {
471        let key = node_id.as_str().to_string();
472        let value = self.base.get(node_id) + 1;
473        self.dots.insert(key.clone(), value);
474        (key, value)
475    }
476
477    /// Sync the dot for a node into the base clock.
478    pub fn sync(&mut self, node_id: &NodeId) {
479        let key = node_id.as_str();
480        if let Some(&dot) = self.dots.get(key) {
481            let base_value = self.base.get(node_id);
482            if dot == base_value + 1 {
483                self.base.set(node_id, dot);
484                self.dots.remove(key);
485            }
486        }
487    }
488
489    /// Sync all dots.
490    pub fn sync_all(&mut self) {
491        let nodes: Vec<_> = self.dots.keys().cloned().collect();
492        for node in nodes {
493            self.sync(&NodeId::new(&node));
494        }
495    }
496
497    /// Merge with another dotted version vector.
498    pub fn merge(&mut self, other: &DottedVersionVector) {
499        self.base.merge(&other.base);
500
501        for (node, &value) in &other.dots {
502            let current = self.dots.entry(node.clone()).or_insert(0);
503            *current = (*current).max(value);
504        }
505    }
506
507    /// Check if this DVV dominates another.
508    pub fn dominates(&self, other: &DottedVersionVector) -> bool {
509        // Check all entries in other
510        for (node, &value) in &other.base.clocks {
511            let our_base = self.base.clocks.get(node).copied().unwrap_or(0);
512            let our_dot = self.dots.get(node).copied().unwrap_or(0);
513            if our_base.max(our_dot) < value {
514                return false;
515            }
516        }
517
518        for (node, &value) in &other.dots {
519            let our_base = self.base.clocks.get(node).copied().unwrap_or(0);
520            let our_dot = self.dots.get(node).copied().unwrap_or(0);
521            if our_base.max(our_dot) < value {
522                return false;
523            }
524        }
525
526        true
527    }
528
529    /// Get the base vector clock.
530    pub fn base(&self) -> &VectorClock {
531        &self.base
532    }
533
534    /// Get the dots.
535    pub fn dots(&self) -> &HashMap<String, u64> {
536        &self.dots
537    }
538}
539
540// =============================================================================
541// Tests
542// =============================================================================
543
544#[cfg(test)]
545mod tests {
546    use super::*;
547
548    #[test]
549    fn test_vector_clock_basic() {
550        let mut clock = VectorClock::new();
551        let node_a = NodeId::new("A");
552        let node_b = NodeId::new("B");
553
554        clock.increment(&node_a);
555        clock.increment(&node_a);
556        clock.increment(&node_b);
557
558        assert_eq!(clock.get(&node_a), 2);
559        assert_eq!(clock.get(&node_b), 1);
560        assert_eq!(clock.node_count(), 2);
561    }
562
563    #[test]
564    fn test_vector_clock_happened_before() {
565        let node_a = NodeId::new("A");
566        let node_b = NodeId::new("B");
567
568        let mut clock1 = VectorClock::new();
569        clock1.set(&node_a, 1);
570        clock1.set(&node_b, 1);
571
572        let mut clock2 = VectorClock::new();
573        clock2.set(&node_a, 2);
574        clock2.set(&node_b, 2);
575
576        assert!(clock1.happened_before(&clock2));
577        assert!(!clock2.happened_before(&clock1));
578    }
579
580    #[test]
581    fn test_vector_clock_concurrent() {
582        let node_a = NodeId::new("A");
583        let node_b = NodeId::new("B");
584
585        let mut clock1 = VectorClock::new();
586        clock1.set(&node_a, 2);
587        clock1.set(&node_b, 1);
588
589        let mut clock2 = VectorClock::new();
590        clock2.set(&node_a, 1);
591        clock2.set(&node_b, 2);
592
593        assert!(clock1.is_concurrent(&clock2));
594        assert!(!clock1.happened_before(&clock2));
595        assert!(!clock2.happened_before(&clock1));
596    }
597
598    #[test]
599    fn test_vector_clock_merge() {
600        let node_a = NodeId::new("A");
601        let node_b = NodeId::new("B");
602
603        let mut clock1 = VectorClock::new();
604        clock1.set(&node_a, 2);
605        clock1.set(&node_b, 1);
606
607        let mut clock2 = VectorClock::new();
608        clock2.set(&node_a, 1);
609        clock2.set(&node_b, 3);
610
611        clock1.merge(&clock2);
612
613        assert_eq!(clock1.get(&node_a), 2);
614        assert_eq!(clock1.get(&node_b), 3);
615    }
616
617    #[test]
618    fn test_vector_clock_compare() {
619        let node_a = NodeId::new("A");
620
621        let mut clock1 = VectorClock::new();
622        clock1.set(&node_a, 1);
623
624        let mut clock2 = VectorClock::new();
625        clock2.set(&node_a, 2);
626
627        let clock3 = clock1.clone();
628
629        assert_eq!(clock1.compare(&clock2), VectorClockOrdering::Before);
630        assert_eq!(clock2.compare(&clock1), VectorClockOrdering::After);
631        assert_eq!(clock1.compare(&clock3), VectorClockOrdering::Equal);
632    }
633
634    #[test]
635    fn test_versioned_value() {
636        let node_a = NodeId::new("A");
637
638        let mut clock1 = VectorClock::new();
639        clock1.set(&node_a, 1);
640
641        let mut clock2 = VectorClock::new();
642        clock2.set(&node_a, 2);
643
644        let v1 = VersionedValue::new("value1", clock1);
645        let v2 = VersionedValue::new("value2", clock2);
646
647        assert!(v1.happened_before(&v2));
648        assert!(!v1.is_concurrent(&v2));
649    }
650
651    #[test]
652    fn test_hybrid_clock() {
653        let mut clock = HybridClock::new("node1");
654
655        let ts1 = clock.tick();
656        let ts2 = clock.tick();
657
658        assert!(ts1.is_before(&ts2));
659        assert!(ts2.is_after(&ts1));
660    }
661
662    #[test]
663    fn test_hybrid_clock_receive() {
664        let mut clock1 = HybridClock::new("node1");
665        let mut clock2 = HybridClock::new("node2");
666
667        let ts1 = clock1.tick();
668        let ts2 = clock2.receive(&ts1);
669
670        assert!(ts1.is_before(&ts2));
671    }
672
673    #[test]
674    fn test_hybrid_timestamp_ordering() {
675        let ts1 = HybridTimestamp::new(100, 0, "A");
676        let ts2 = HybridTimestamp::new(100, 1, "A");
677        let ts3 = HybridTimestamp::new(101, 0, "A");
678
679        assert!(ts1 < ts2);
680        assert!(ts2 < ts3);
681        assert!(ts1 < ts3);
682    }
683
684    #[test]
685    fn test_lamport_clock() {
686        let mut clock = LamportClock::new();
687
688        assert_eq!(clock.tick(), 1);
689        assert_eq!(clock.tick(), 2);
690
691        let received = clock.receive(10);
692        assert_eq!(received, 11);
693
694        assert_eq!(clock.tick(), 12);
695    }
696
697    #[test]
698    fn test_lamport_clock_merge() {
699        let mut clock1 = LamportClock::with_value(5);
700        let clock2 = LamportClock::with_value(10);
701
702        clock1.merge(&clock2);
703        assert_eq!(clock1.value(), 10);
704    }
705
706    #[test]
707    fn test_dotted_version_vector() {
708        let mut dvv = DottedVersionVector::new();
709        let node_a = NodeId::new("A");
710
711        let (node, version) = dvv.event(&node_a);
712        assert_eq!(node, "A");
713        assert_eq!(version, 1);
714
715        dvv.sync(&node_a);
716        assert_eq!(dvv.base().get(&node_a), 1);
717    }
718
719    #[test]
720    fn test_dvv_merge() {
721        let mut dvv1 = DottedVersionVector::new();
722        let mut dvv2 = DottedVersionVector::new();
723
724        let node_a = NodeId::new("A");
725        let node_b = NodeId::new("B");
726
727        dvv1.event(&node_a);
728        dvv1.sync(&node_a);
729
730        dvv2.event(&node_b);
731        dvv2.sync(&node_b);
732
733        dvv1.merge(&dvv2);
734
735        assert_eq!(dvv1.base().get(&node_a), 1);
736        assert_eq!(dvv1.base().get(&node_b), 1);
737    }
738
739    #[test]
740    fn test_vector_clock_partial_ord() {
741        let node_a = NodeId::new("A");
742
743        let mut clock1 = VectorClock::new();
744        clock1.set(&node_a, 1);
745
746        let mut clock2 = VectorClock::new();
747        clock2.set(&node_a, 2);
748
749        assert!(clock1 < clock2);
750        assert!(clock2 > clock1);
751    }
752
753    #[test]
754    fn test_vector_clock_is_empty() {
755        let clock1 = VectorClock::new();
756        assert!(clock1.is_empty());
757
758        let mut clock2 = VectorClock::new();
759        clock2.increment(&NodeId::new("A"));
760        assert!(!clock2.is_empty());
761    }
762
763    #[test]
764    fn test_vector_clock_reset() {
765        let mut clock = VectorClock::new();
766        clock.increment(&NodeId::new("A"));
767        clock.increment(&NodeId::new("B"));
768
769        assert!(!clock.is_empty());
770
771        clock.reset();
772        assert!(clock.is_empty());
773        assert_eq!(clock.node_count(), 2);
774    }
775}