Skip to main content

aegis_replication/
vector_clock.rs

1//! Aegis Vector Clocks
2//!
3//! Vector clocks for tracking causality and ordering in distributed systems.
4//!
5//! @version 0.1.0
6//! @author AutomataNexus Development Team
7
8use crate::node::NodeId;
9use serde::{Deserialize, Serialize};
10use std::cmp::Ordering;
11use std::collections::HashMap;
12
13// =============================================================================
14// Vector Clock
15// =============================================================================
16
17/// A vector clock for tracking causality between events.
18#[derive(Debug, Clone, Default, Serialize, Deserialize)]
19pub struct VectorClock {
20    clocks: HashMap<String, u64>,
21}
22
23impl VectorClock {
24    /// Create a new empty vector clock.
25    pub fn new() -> Self {
26        Self {
27            clocks: HashMap::new(),
28        }
29    }
30
31    /// Create a vector clock with initial node.
32    pub fn with_node(node_id: &NodeId) -> Self {
33        let mut clocks = HashMap::new();
34        clocks.insert(node_id.as_str().to_string(), 0);
35        Self { clocks }
36    }
37
38    /// Increment the clock for a node.
39    pub fn increment(&mut self, node_id: &NodeId) {
40        let key = node_id.as_str().to_string();
41        *self.clocks.entry(key).or_insert(0) += 1;
42    }
43
44    /// Get the clock value for a node.
45    pub fn get(&self, node_id: &NodeId) -> u64 {
46        self.clocks.get(node_id.as_str()).copied().unwrap_or(0)
47    }
48
49    /// Set the clock value for a node.
50    pub fn set(&mut self, node_id: &NodeId, value: u64) {
51        self.clocks.insert(node_id.as_str().to_string(), value);
52    }
53
54    /// Merge with another vector clock (take maximum of each component).
55    pub fn merge(&mut self, other: &VectorClock) {
56        for (node, &value) in &other.clocks {
57            let current = self.clocks.entry(node.clone()).or_insert(0);
58            *current = (*current).max(value);
59        }
60    }
61
62    /// Create a merged clock without modifying self.
63    pub fn merged(&self, other: &VectorClock) -> VectorClock {
64        let mut result = self.clone();
65        result.merge(other);
66        result
67    }
68
69    /// Check if this clock happened before another.
70    pub fn happened_before(&self, other: &VectorClock) -> bool {
71        let mut dominated = false;
72
73        // All our values must be <= other's values
74        for (node, &value) in &self.clocks {
75            let other_value = other.clocks.get(node).copied().unwrap_or(0);
76            if value > other_value {
77                return false;
78            }
79            if value < other_value {
80                dominated = true;
81            }
82        }
83
84        // Check for nodes in other that we don't have
85        for (node, &value) in &other.clocks {
86            if !self.clocks.contains_key(node) && value > 0 {
87                dominated = true;
88            }
89        }
90
91        dominated
92    }
93
94    /// Check if this clock happened after another.
95    pub fn happened_after(&self, other: &VectorClock) -> bool {
96        other.happened_before(self)
97    }
98
99    /// Check if two clocks are concurrent (neither happened before the other).
100    pub fn is_concurrent(&self, other: &VectorClock) -> bool {
101        !self.happened_before(other) && !self.happened_after(other)
102    }
103
104    /// Check if two clocks are identical.
105    pub fn equals(&self, other: &VectorClock) -> bool {
106        if self.clocks.len() != other.clocks.len() {
107            return false;
108        }
109
110        for (node, &value) in &self.clocks {
111            if other.clocks.get(node).copied().unwrap_or(0) != value {
112                return false;
113            }
114        }
115
116        true
117    }
118
119    /// Compare two vector clocks.
120    pub fn compare(&self, other: &VectorClock) -> VectorClockOrdering {
121        if self.equals(other) {
122            VectorClockOrdering::Equal
123        } else if self.happened_before(other) {
124            VectorClockOrdering::Before
125        } else if self.happened_after(other) {
126            VectorClockOrdering::After
127        } else {
128            VectorClockOrdering::Concurrent
129        }
130    }
131
132    /// Get the number of nodes tracked.
133    pub fn node_count(&self) -> usize {
134        self.clocks.len()
135    }
136
137    /// Get all nodes in this clock.
138    pub fn nodes(&self) -> Vec<String> {
139        self.clocks.keys().cloned().collect()
140    }
141
142    /// Get the maximum clock value across all nodes.
143    pub fn max_value(&self) -> u64 {
144        self.clocks.values().copied().max().unwrap_or(0)
145    }
146
147    /// Get the sum of all clock values.
148    pub fn sum(&self) -> u64 {
149        self.clocks.values().sum()
150    }
151
152    /// Check if the clock is empty (all zeros or no entries).
153    pub fn is_empty(&self) -> bool {
154        self.clocks.values().all(|&v| v == 0)
155    }
156
157    /// Reset all clock values to zero.
158    pub fn reset(&mut self) {
159        for value in self.clocks.values_mut() {
160            *value = 0;
161        }
162    }
163
164    /// Remove a node from the clock.
165    pub fn remove(&mut self, node_id: &NodeId) {
166        self.clocks.remove(node_id.as_str());
167    }
168}
169
170impl PartialEq for VectorClock {
171    fn eq(&self, other: &Self) -> bool {
172        self.equals(other)
173    }
174}
175
176impl Eq for VectorClock {}
177
178impl PartialOrd for VectorClock {
179    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
180        match self.compare(other) {
181            VectorClockOrdering::Equal => Some(Ordering::Equal),
182            VectorClockOrdering::Before => Some(Ordering::Less),
183            VectorClockOrdering::After => Some(Ordering::Greater),
184            VectorClockOrdering::Concurrent => None,
185        }
186    }
187}
188
189// =============================================================================
190// Vector Clock Ordering
191// =============================================================================
192
193/// Result of comparing two vector clocks.
194#[derive(Debug, Clone, Copy, PartialEq, Eq)]
195pub enum VectorClockOrdering {
196    /// First clock happened before second.
197    Before,
198    /// First clock happened after second.
199    After,
200    /// Clocks are equal.
201    Equal,
202    /// Clocks are concurrent (incomparable).
203    Concurrent,
204}
205
206// =============================================================================
207// Versioned Value
208// =============================================================================
209
210/// A value with an associated vector clock version.
211#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct VersionedValue<T> {
213    pub value: T,
214    pub clock: VectorClock,
215    pub timestamp: u64,
216}
217
218impl<T> VersionedValue<T> {
219    /// Create a new versioned value.
220    pub fn new(value: T, clock: VectorClock) -> Self {
221        let timestamp = std::time::SystemTime::now()
222            .duration_since(std::time::UNIX_EPOCH)
223            .unwrap_or_default()
224            .as_millis() as u64;
225
226        Self {
227            value,
228            clock,
229            timestamp,
230        }
231    }
232
233    /// Create with a specific timestamp.
234    pub fn with_timestamp(value: T, clock: VectorClock, timestamp: u64) -> Self {
235        Self {
236            value,
237            clock,
238            timestamp,
239        }
240    }
241
242    /// Check if this version happened before another.
243    pub fn happened_before(&self, other: &VersionedValue<T>) -> bool {
244        self.clock.happened_before(&other.clock)
245    }
246
247    /// Check if this version happened after another.
248    pub fn happened_after(&self, other: &VersionedValue<T>) -> bool {
249        self.clock.happened_after(&other.clock)
250    }
251
252    /// Check if this version is concurrent with another.
253    pub fn is_concurrent(&self, other: &VersionedValue<T>) -> bool {
254        self.clock.is_concurrent(&other.clock)
255    }
256}
257
258// =============================================================================
259// Hybrid Logical Clock
260// =============================================================================
261
262/// A hybrid logical clock combining physical time with logical counters.
263#[derive(Debug, Clone, Serialize, Deserialize)]
264pub struct HybridClock {
265    physical: u64,
266    logical: u32,
267    node_id: String,
268}
269
270impl HybridClock {
271    /// Create a new hybrid clock.
272    pub fn new(node_id: impl Into<String>) -> Self {
273        Self {
274            physical: Self::now(),
275            logical: 0,
276            node_id: node_id.into(),
277        }
278    }
279
280    /// Get current physical time in milliseconds.
281    fn now() -> u64 {
282        std::time::SystemTime::now()
283            .duration_since(std::time::UNIX_EPOCH)
284            .unwrap_or_default()
285            .as_millis() as u64
286    }
287
288    /// Generate a new timestamp.
289    pub fn tick(&mut self) -> HybridTimestamp {
290        let now = Self::now();
291
292        if now > self.physical {
293            self.physical = now;
294            self.logical = 0;
295        } else {
296            self.logical += 1;
297        }
298
299        HybridTimestamp {
300            physical: self.physical,
301            logical: self.logical,
302            node_id: self.node_id.clone(),
303        }
304    }
305
306    /// Update from a received timestamp.
307    pub fn receive(&mut self, other: &HybridTimestamp) -> HybridTimestamp {
308        let now = Self::now();
309
310        if now > self.physical && now > other.physical {
311            self.physical = now;
312            self.logical = 0;
313        } else if self.physical > other.physical {
314            self.logical += 1;
315        } else if other.physical > self.physical {
316            self.physical = other.physical;
317            self.logical = other.logical + 1;
318        } else {
319            // physical times are equal
320            self.logical = self.logical.max(other.logical) + 1;
321        }
322
323        HybridTimestamp {
324            physical: self.physical,
325            logical: self.logical,
326            node_id: self.node_id.clone(),
327        }
328    }
329
330    /// Get current physical component.
331    pub fn physical(&self) -> u64 {
332        self.physical
333    }
334
335    /// Get current logical component.
336    pub fn logical(&self) -> u32 {
337        self.logical
338    }
339}
340
341// =============================================================================
342// Hybrid Timestamp
343// =============================================================================
344
345/// A timestamp from a hybrid logical clock.
346#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
347pub struct HybridTimestamp {
348    pub physical: u64,
349    pub logical: u32,
350    pub node_id: String,
351}
352
353impl HybridTimestamp {
354    /// Create a new hybrid timestamp.
355    pub fn new(physical: u64, logical: u32, node_id: impl Into<String>) -> Self {
356        Self {
357            physical,
358            logical,
359            node_id: node_id.into(),
360        }
361    }
362
363    /// Compare with another timestamp.
364    pub fn compare(&self, other: &HybridTimestamp) -> Ordering {
365        match self.physical.cmp(&other.physical) {
366            Ordering::Equal => match self.logical.cmp(&other.logical) {
367                Ordering::Equal => self.node_id.cmp(&other.node_id),
368                other => other,
369            },
370            other => other,
371        }
372    }
373
374    /// Check if this timestamp is before another.
375    pub fn is_before(&self, other: &HybridTimestamp) -> bool {
376        self.compare(other) == Ordering::Less
377    }
378
379    /// Check if this timestamp is after another.
380    pub fn is_after(&self, other: &HybridTimestamp) -> bool {
381        self.compare(other) == Ordering::Greater
382    }
383
384    /// Convert to a sortable u128 value.
385    pub fn to_sortable(&self) -> u128 {
386        ((self.physical as u128) << 32) | (self.logical as u128)
387    }
388}
389
390impl PartialOrd for HybridTimestamp {
391    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
392        Some(self.cmp(other))
393    }
394}
395
396impl Ord for HybridTimestamp {
397    fn cmp(&self, other: &Self) -> Ordering {
398        self.compare(other)
399    }
400}
401
402// =============================================================================
403// Lamport Clock
404// =============================================================================
405
406/// A simple Lamport logical clock.
407#[derive(Debug, Clone, Default, Serialize, Deserialize)]
408pub struct LamportClock {
409    counter: u64,
410}
411
412impl LamportClock {
413    /// Create a new Lamport clock.
414    pub fn new() -> Self {
415        Self { counter: 0 }
416    }
417
418    /// Create with initial value.
419    pub fn with_value(value: u64) -> Self {
420        Self { counter: value }
421    }
422
423    /// Increment and return the new timestamp.
424    pub fn tick(&mut self) -> u64 {
425        self.counter += 1;
426        self.counter
427    }
428
429    /// Update from a received timestamp.
430    pub fn receive(&mut self, received: u64) -> u64 {
431        self.counter = self.counter.max(received) + 1;
432        self.counter
433    }
434
435    /// Get current value.
436    pub fn value(&self) -> u64 {
437        self.counter
438    }
439
440    /// Merge with another clock.
441    pub fn merge(&mut self, other: &LamportClock) {
442        self.counter = self.counter.max(other.counter);
443    }
444}
445
446// =============================================================================
447// Dotted Version Vector
448// =============================================================================
449
450/// A dotted version vector for tracking object versions.
451#[derive(Debug, Clone, Default, Serialize, Deserialize)]
452pub struct DottedVersionVector {
453    base: VectorClock,
454    dots: HashMap<String, u64>,
455}
456
457impl DottedVersionVector {
458    /// Create a new dotted version vector.
459    pub fn new() -> Self {
460        Self {
461            base: VectorClock::new(),
462            dots: HashMap::new(),
463        }
464    }
465
466    /// Create a new event at a node.
467    pub fn event(&mut self, node_id: &NodeId) -> (String, u64) {
468        let key = node_id.as_str().to_string();
469        let value = self.base.get(node_id) + 1;
470        self.dots.insert(key.clone(), value);
471        (key, value)
472    }
473
474    /// Sync the dot for a node into the base clock.
475    pub fn sync(&mut self, node_id: &NodeId) {
476        let key = node_id.as_str();
477        if let Some(&dot) = self.dots.get(key) {
478            let base_value = self.base.get(node_id);
479            if dot == base_value + 1 {
480                self.base.set(node_id, dot);
481                self.dots.remove(key);
482            }
483        }
484    }
485
486    /// Sync all dots.
487    pub fn sync_all(&mut self) {
488        let nodes: Vec<_> = self.dots.keys().cloned().collect();
489        for node in nodes {
490            self.sync(&NodeId::new(&node));
491        }
492    }
493
494    /// Merge with another dotted version vector.
495    pub fn merge(&mut self, other: &DottedVersionVector) {
496        self.base.merge(&other.base);
497
498        for (node, &value) in &other.dots {
499            let current = self.dots.entry(node.clone()).or_insert(0);
500            *current = (*current).max(value);
501        }
502    }
503
504    /// Check if this DVV dominates another.
505    pub fn dominates(&self, other: &DottedVersionVector) -> bool {
506        // Check all entries in other
507        for (node, &value) in &other.base.clocks {
508            let our_base = self.base.clocks.get(node).copied().unwrap_or(0);
509            let our_dot = self.dots.get(node).copied().unwrap_or(0);
510            if our_base.max(our_dot) < value {
511                return false;
512            }
513        }
514
515        for (node, &value) in &other.dots {
516            let our_base = self.base.clocks.get(node).copied().unwrap_or(0);
517            let our_dot = self.dots.get(node).copied().unwrap_or(0);
518            if our_base.max(our_dot) < value {
519                return false;
520            }
521        }
522
523        true
524    }
525
526    /// Get the base vector clock.
527    pub fn base(&self) -> &VectorClock {
528        &self.base
529    }
530
531    /// Get the dots.
532    pub fn dots(&self) -> &HashMap<String, u64> {
533        &self.dots
534    }
535}
536
537// =============================================================================
538// Tests
539// =============================================================================
540
541#[cfg(test)]
542mod tests {
543    use super::*;
544
545    #[test]
546    fn test_vector_clock_basic() {
547        let mut clock = VectorClock::new();
548        let node_a = NodeId::new("A");
549        let node_b = NodeId::new("B");
550
551        clock.increment(&node_a);
552        clock.increment(&node_a);
553        clock.increment(&node_b);
554
555        assert_eq!(clock.get(&node_a), 2);
556        assert_eq!(clock.get(&node_b), 1);
557        assert_eq!(clock.node_count(), 2);
558    }
559
560    #[test]
561    fn test_vector_clock_happened_before() {
562        let node_a = NodeId::new("A");
563        let node_b = NodeId::new("B");
564
565        let mut clock1 = VectorClock::new();
566        clock1.set(&node_a, 1);
567        clock1.set(&node_b, 1);
568
569        let mut clock2 = VectorClock::new();
570        clock2.set(&node_a, 2);
571        clock2.set(&node_b, 2);
572
573        assert!(clock1.happened_before(&clock2));
574        assert!(!clock2.happened_before(&clock1));
575    }
576
577    #[test]
578    fn test_vector_clock_concurrent() {
579        let node_a = NodeId::new("A");
580        let node_b = NodeId::new("B");
581
582        let mut clock1 = VectorClock::new();
583        clock1.set(&node_a, 2);
584        clock1.set(&node_b, 1);
585
586        let mut clock2 = VectorClock::new();
587        clock2.set(&node_a, 1);
588        clock2.set(&node_b, 2);
589
590        assert!(clock1.is_concurrent(&clock2));
591        assert!(!clock1.happened_before(&clock2));
592        assert!(!clock2.happened_before(&clock1));
593    }
594
595    #[test]
596    fn test_vector_clock_merge() {
597        let node_a = NodeId::new("A");
598        let node_b = NodeId::new("B");
599
600        let mut clock1 = VectorClock::new();
601        clock1.set(&node_a, 2);
602        clock1.set(&node_b, 1);
603
604        let mut clock2 = VectorClock::new();
605        clock2.set(&node_a, 1);
606        clock2.set(&node_b, 3);
607
608        clock1.merge(&clock2);
609
610        assert_eq!(clock1.get(&node_a), 2);
611        assert_eq!(clock1.get(&node_b), 3);
612    }
613
614    #[test]
615    fn test_vector_clock_compare() {
616        let node_a = NodeId::new("A");
617
618        let mut clock1 = VectorClock::new();
619        clock1.set(&node_a, 1);
620
621        let mut clock2 = VectorClock::new();
622        clock2.set(&node_a, 2);
623
624        let clock3 = clock1.clone();
625
626        assert_eq!(clock1.compare(&clock2), VectorClockOrdering::Before);
627        assert_eq!(clock2.compare(&clock1), VectorClockOrdering::After);
628        assert_eq!(clock1.compare(&clock3), VectorClockOrdering::Equal);
629    }
630
631    #[test]
632    fn test_versioned_value() {
633        let node_a = NodeId::new("A");
634
635        let mut clock1 = VectorClock::new();
636        clock1.set(&node_a, 1);
637
638        let mut clock2 = VectorClock::new();
639        clock2.set(&node_a, 2);
640
641        let v1 = VersionedValue::new("value1", clock1);
642        let v2 = VersionedValue::new("value2", clock2);
643
644        assert!(v1.happened_before(&v2));
645        assert!(!v1.is_concurrent(&v2));
646    }
647
648    #[test]
649    fn test_hybrid_clock() {
650        let mut clock = HybridClock::new("node1");
651
652        let ts1 = clock.tick();
653        let ts2 = clock.tick();
654
655        assert!(ts1.is_before(&ts2));
656        assert!(ts2.is_after(&ts1));
657    }
658
659    #[test]
660    fn test_hybrid_clock_receive() {
661        let mut clock1 = HybridClock::new("node1");
662        let mut clock2 = HybridClock::new("node2");
663
664        let ts1 = clock1.tick();
665        let ts2 = clock2.receive(&ts1);
666
667        assert!(ts1.is_before(&ts2));
668    }
669
670    #[test]
671    fn test_hybrid_timestamp_ordering() {
672        let ts1 = HybridTimestamp::new(100, 0, "A");
673        let ts2 = HybridTimestamp::new(100, 1, "A");
674        let ts3 = HybridTimestamp::new(101, 0, "A");
675
676        assert!(ts1 < ts2);
677        assert!(ts2 < ts3);
678        assert!(ts1 < ts3);
679    }
680
681    #[test]
682    fn test_lamport_clock() {
683        let mut clock = LamportClock::new();
684
685        assert_eq!(clock.tick(), 1);
686        assert_eq!(clock.tick(), 2);
687
688        let received = clock.receive(10);
689        assert_eq!(received, 11);
690
691        assert_eq!(clock.tick(), 12);
692    }
693
694    #[test]
695    fn test_lamport_clock_merge() {
696        let mut clock1 = LamportClock::with_value(5);
697        let clock2 = LamportClock::with_value(10);
698
699        clock1.merge(&clock2);
700        assert_eq!(clock1.value(), 10);
701    }
702
703    #[test]
704    fn test_dotted_version_vector() {
705        let mut dvv = DottedVersionVector::new();
706        let node_a = NodeId::new("A");
707
708        let (node, version) = dvv.event(&node_a);
709        assert_eq!(node, "A");
710        assert_eq!(version, 1);
711
712        dvv.sync(&node_a);
713        assert_eq!(dvv.base().get(&node_a), 1);
714    }
715
716    #[test]
717    fn test_dvv_merge() {
718        let mut dvv1 = DottedVersionVector::new();
719        let mut dvv2 = DottedVersionVector::new();
720
721        let node_a = NodeId::new("A");
722        let node_b = NodeId::new("B");
723
724        dvv1.event(&node_a);
725        dvv1.sync(&node_a);
726
727        dvv2.event(&node_b);
728        dvv2.sync(&node_b);
729
730        dvv1.merge(&dvv2);
731
732        assert_eq!(dvv1.base().get(&node_a), 1);
733        assert_eq!(dvv1.base().get(&node_b), 1);
734    }
735
736    #[test]
737    fn test_vector_clock_partial_ord() {
738        let node_a = NodeId::new("A");
739
740        let mut clock1 = VectorClock::new();
741        clock1.set(&node_a, 1);
742
743        let mut clock2 = VectorClock::new();
744        clock2.set(&node_a, 2);
745
746        assert!(clock1 < clock2);
747        assert!(clock2 > clock1);
748    }
749
750    #[test]
751    fn test_vector_clock_is_empty() {
752        let clock1 = VectorClock::new();
753        assert!(clock1.is_empty());
754
755        let mut clock2 = VectorClock::new();
756        clock2.increment(&NodeId::new("A"));
757        assert!(!clock2.is_empty());
758    }
759
760    #[test]
761    fn test_vector_clock_reset() {
762        let mut clock = VectorClock::new();
763        clock.increment(&NodeId::new("A"));
764        clock.increment(&NodeId::new("B"));
765
766        assert!(!clock.is_empty());
767
768        clock.reset();
769        assert!(clock.is_empty());
770        assert_eq!(clock.node_count(), 2);
771    }
772}