Skip to main content

silk/
clock.rs

1use serde::{Deserialize, Serialize};
2use std::time::SystemTime;
3
4/// Hybrid Logical Clock for causal + real-time ordering across distributed nodes.
5///
6/// R-01: Combines wall-clock time (physical_ms) with a logical counter.
7/// The physical component captures *when* an event happened in real time.
8/// The logical component orders events that happen within the same millisecond.
9///
10/// Based on Kulkarni, Demirbas, Madeppa, Avva & Leone (2014).
11/// Used in production by CockroachDB for MVCC timestamps.
12///
13/// Rules:
14/// - On local event: physical = max(old_physical, wall_clock).
15///   If physical advanced → logical = 0. Else → logical += 1.
16/// - On merge: physical = max(local, remote, wall_clock).
17///   Logical follows the same advancement/increment rule.
18/// - Total order: (physical, logical, id). Higher physical wins.
19///   Same physical → higher logical wins. Both equal → lower id wins.
20#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
21pub struct HybridClock {
22    /// Unique identifier of the instance that owns this clock
23    pub id: String,
24    /// Physical time in milliseconds since Unix epoch
25    pub physical_ms: u64,
26    /// Logical counter — incremented when physical time doesn't advance
27    pub logical: u32,
28}
29
30/// Get current wall-clock time in milliseconds since Unix epoch.
31fn current_time_ms() -> u64 {
32    SystemTime::now()
33        .duration_since(SystemTime::UNIX_EPOCH)
34        .map(|d| d.as_millis() as u64)
35        .unwrap_or(0)
36}
37
38impl HybridClock {
39    /// Create a new clock with the current wall-clock time.
40    pub fn new(id: impl Into<String>) -> Self {
41        Self {
42            id: id.into(),
43            physical_ms: current_time_ms(),
44            logical: 0,
45        }
46    }
47
48    /// Create a clock with explicit values (for tests and deserialization).
49    pub fn with_values(id: impl Into<String>, physical_ms: u64, logical: u32) -> Self {
50        Self {
51            id: id.into(),
52            physical_ms,
53            logical,
54        }
55    }
56
57    /// Increment the clock for a local event.
58    /// Returns (physical_ms, logical) after advancement.
59    pub fn tick(&mut self) -> (u64, u32) {
60        let wall = current_time_ms();
61        if wall > self.physical_ms {
62            self.physical_ms = wall;
63            self.logical = 0;
64        } else {
65            self.logical = self.logical.saturating_add(1);
66        }
67        (self.physical_ms, self.logical)
68    }
69
70    /// Merge with a remote clock.
71    /// physical = max(local, remote, wall_clock).
72    /// If physical advanced past both → logical = 0.
73    /// If tied with one side → logical = max of tied sides + 1.
74    pub fn merge(&mut self, remote: &HybridClock) -> (u64, u32) {
75        let wall = current_time_ms();
76        let new_physical = self.physical_ms.max(remote.physical_ms).max(wall);
77
78        if new_physical > self.physical_ms && new_physical > remote.physical_ms {
79            // Wall clock advanced past both — reset logical
80            self.logical = 0;
81        } else if new_physical == self.physical_ms && new_physical == remote.physical_ms {
82            // All three tied — increment max logical
83            self.logical = self.logical.max(remote.logical).saturating_add(1);
84        } else if new_physical == self.physical_ms {
85            // Local physical matches — increment our logical
86            self.logical = self.logical.saturating_add(1);
87        } else {
88            // Remote physical matches — take remote logical + 1
89            self.logical = remote.logical.saturating_add(1);
90        }
91
92        self.physical_ms = new_physical;
93        (self.physical_ms, self.logical)
94    }
95
96    /// Compare two clocks for total ordering.
97    /// Higher physical wins. Same physical → higher logical wins.
98    /// Both equal → lower id wins (deterministic tiebreaker).
99    pub fn cmp_order(&self, other: &HybridClock) -> std::cmp::Ordering {
100        self.physical_ms
101            .cmp(&other.physical_ms)
102            .then_with(|| self.logical.cmp(&other.logical))
103            .then_with(|| other.id.cmp(&self.id)) // lower id wins → reverse comparison
104    }
105
106    /// Compact representation for sorting: (physical_ms, logical).
107    pub fn as_tuple(&self) -> (u64, u32) {
108        (self.physical_ms, self.logical)
109    }
110}
111
112// Keep the old name as a type alias for migration clarity in other modules.
113// All code will use HybridClock directly — this alias exists only for documentation.
114pub type LamportClock = HybridClock;
115
116#[cfg(test)]
117mod tests {
118    use super::*;
119
120    #[test]
121    fn hlc_monotonic() {
122        let mut clock = HybridClock::new("node-a");
123        let (p1, l1) = clock.tick();
124        let (p2, l2) = clock.tick();
125        assert!(
126            (p2, l2) >= (p1, l1),
127            "clock must be monotonic: ({p1},{l1}) -> ({p2},{l2})"
128        );
129    }
130
131    #[test]
132    fn hlc_merge_advances() {
133        let mut local = HybridClock::with_values("node-a", 100, 0);
134        let remote = HybridClock::with_values("node-b", 200, 5);
135        let (p, l) = local.merge(&remote);
136        // Physical should be at least 200 (remote's physical)
137        assert!(p >= 200, "merge should advance physical to at least remote");
138        // If wall clock is > 200, logical resets. If == 200, logical = 5 + 1.
139        // Either way, the clock advanced past the remote.
140        assert!(
141            (p, l) > (200, 5),
142            "merged clock should be ahead of remote: ({p},{l}) vs (200,5)"
143        );
144    }
145
146    #[test]
147    fn hlc_merge_local_ahead() {
148        let mut local = HybridClock::with_values("node-a", 500, 10);
149        let remote = HybridClock::with_values("node-b", 100, 0);
150        let (p, _l) = local.merge(&remote);
151        assert!(p >= 500, "local was ahead — physical should stay >= 500");
152    }
153
154    #[test]
155    fn hlc_tiebreak_deterministic() {
156        let a = HybridClock::with_values("alpha", 100, 5);
157        let b = HybridClock::with_values("beta", 100, 5);
158        // Same physical + logical → lower id ("alpha") wins
159        assert_eq!(a.cmp_order(&b), std::cmp::Ordering::Greater);
160        assert_eq!(b.cmp_order(&a), std::cmp::Ordering::Less);
161    }
162
163    #[test]
164    fn hlc_physical_beats_logical() {
165        let a = HybridClock::with_values("node-a", 200, 0);
166        let b = HybridClock::with_values("node-b", 100, 999);
167        // a has higher physical — wins regardless of logical
168        assert_eq!(a.cmp_order(&b), std::cmp::Ordering::Greater);
169    }
170
171    #[test]
172    fn hlc_logical_breaks_physical_tie() {
173        let a = HybridClock::with_values("node-a", 100, 10);
174        let b = HybridClock::with_values("node-b", 100, 5);
175        // Same physical → higher logical wins
176        assert_eq!(a.cmp_order(&b), std::cmp::Ordering::Greater);
177    }
178
179    #[test]
180    fn hlc_serialization_roundtrip() {
181        let clock = HybridClock::with_values("node-x", 1711234567890, 42);
182        let bytes = rmp_serde::to_vec(&clock).unwrap();
183        let decoded: HybridClock = rmp_serde::from_slice(&bytes).unwrap();
184        assert_eq!(clock, decoded);
185    }
186
187    #[test]
188    fn hlc_logical_saturates() {
189        let mut clock = HybridClock::with_values("node-a", u64::MAX, u32::MAX);
190        let (p, l) = clock.tick();
191        // If wall clock < u64::MAX (which it is), logical saturates
192        assert_eq!(p, u64::MAX);
193        assert_eq!(l, u32::MAX);
194    }
195
196    #[test]
197    fn hlc_with_values_constructor() {
198        let clock = HybridClock::with_values("test", 42, 7);
199        assert_eq!(clock.id, "test");
200        assert_eq!(clock.physical_ms, 42);
201        assert_eq!(clock.logical, 7);
202    }
203
204    #[test]
205    fn hlc_as_tuple() {
206        let clock = HybridClock::with_values("test", 100, 5);
207        assert_eq!(clock.as_tuple(), (100, 5));
208    }
209}