1use serde::{Deserialize, Serialize};
2use std::time::SystemTime;
3
4#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
21pub struct HybridClock {
22 pub id: String,
24 pub physical_ms: u64,
26 pub logical: u32,
28}
29
30fn current_time_ms() -> u64 {
32 SystemTime::now()
33 .duration_since(SystemTime::UNIX_EPOCH)
34 .map(|d| d.as_millis() as u64)
35 .unwrap_or(0)
36}
37
38impl HybridClock {
39 pub fn new(id: impl Into<String>) -> Self {
41 Self {
42 id: id.into(),
43 physical_ms: current_time_ms(),
44 logical: 0,
45 }
46 }
47
48 pub fn with_values(id: impl Into<String>, physical_ms: u64, logical: u32) -> Self {
50 Self {
51 id: id.into(),
52 physical_ms,
53 logical,
54 }
55 }
56
57 pub fn tick(&mut self) -> (u64, u32) {
60 let wall = current_time_ms();
61 if wall > self.physical_ms {
62 self.physical_ms = wall;
63 self.logical = 0;
64 } else {
65 self.logical = self.logical.saturating_add(1);
66 }
67 (self.physical_ms, self.logical)
68 }
69
70 pub fn merge(&mut self, remote: &HybridClock) -> (u64, u32) {
75 let wall = current_time_ms();
76 let new_physical = self.physical_ms.max(remote.physical_ms).max(wall);
77
78 if new_physical > self.physical_ms && new_physical > remote.physical_ms {
79 self.logical = 0;
81 } else if new_physical == self.physical_ms && new_physical == remote.physical_ms {
82 self.logical = self.logical.max(remote.logical).saturating_add(1);
84 } else if new_physical == self.physical_ms {
85 self.logical = self.logical.saturating_add(1);
87 } else {
88 self.logical = remote.logical.saturating_add(1);
90 }
91
92 self.physical_ms = new_physical;
93 (self.physical_ms, self.logical)
94 }
95
96 pub fn cmp_order(&self, other: &HybridClock) -> std::cmp::Ordering {
100 self.physical_ms
101 .cmp(&other.physical_ms)
102 .then_with(|| self.logical.cmp(&other.logical))
103 .then_with(|| other.id.cmp(&self.id)) }
105
106 pub fn as_tuple(&self) -> (u64, u32) {
108 (self.physical_ms, self.logical)
109 }
110}
111
112pub type LamportClock = HybridClock;
115
116#[cfg(test)]
117mod tests {
118 use super::*;
119
120 #[test]
121 fn hlc_monotonic() {
122 let mut clock = HybridClock::new("node-a");
123 let (p1, l1) = clock.tick();
124 let (p2, l2) = clock.tick();
125 assert!(
126 (p2, l2) >= (p1, l1),
127 "clock must be monotonic: ({p1},{l1}) -> ({p2},{l2})"
128 );
129 }
130
131 #[test]
132 fn hlc_merge_advances() {
133 let mut local = HybridClock::with_values("node-a", 100, 0);
134 let remote = HybridClock::with_values("node-b", 200, 5);
135 let (p, l) = local.merge(&remote);
136 assert!(p >= 200, "merge should advance physical to at least remote");
138 assert!(
141 (p, l) > (200, 5),
142 "merged clock should be ahead of remote: ({p},{l}) vs (200,5)"
143 );
144 }
145
146 #[test]
147 fn hlc_merge_local_ahead() {
148 let mut local = HybridClock::with_values("node-a", 500, 10);
149 let remote = HybridClock::with_values("node-b", 100, 0);
150 let (p, _l) = local.merge(&remote);
151 assert!(p >= 500, "local was ahead — physical should stay >= 500");
152 }
153
154 #[test]
155 fn hlc_tiebreak_deterministic() {
156 let a = HybridClock::with_values("alpha", 100, 5);
157 let b = HybridClock::with_values("beta", 100, 5);
158 assert_eq!(a.cmp_order(&b), std::cmp::Ordering::Greater);
160 assert_eq!(b.cmp_order(&a), std::cmp::Ordering::Less);
161 }
162
163 #[test]
164 fn hlc_physical_beats_logical() {
165 let a = HybridClock::with_values("node-a", 200, 0);
166 let b = HybridClock::with_values("node-b", 100, 999);
167 assert_eq!(a.cmp_order(&b), std::cmp::Ordering::Greater);
169 }
170
171 #[test]
172 fn hlc_logical_breaks_physical_tie() {
173 let a = HybridClock::with_values("node-a", 100, 10);
174 let b = HybridClock::with_values("node-b", 100, 5);
175 assert_eq!(a.cmp_order(&b), std::cmp::Ordering::Greater);
177 }
178
179 #[test]
180 fn hlc_serialization_roundtrip() {
181 let clock = HybridClock::with_values("node-x", 1711234567890, 42);
182 let bytes = rmp_serde::to_vec(&clock).unwrap();
183 let decoded: HybridClock = rmp_serde::from_slice(&bytes).unwrap();
184 assert_eq!(clock, decoded);
185 }
186
187 #[test]
188 fn hlc_logical_saturates() {
189 let mut clock = HybridClock::with_values("node-a", u64::MAX, u32::MAX);
190 let (p, l) = clock.tick();
191 assert_eq!(p, u64::MAX);
193 assert_eq!(l, u32::MAX);
194 }
195
196 #[test]
197 fn hlc_with_values_constructor() {
198 let clock = HybridClock::with_values("test", 42, 7);
199 assert_eq!(clock.id, "test");
200 assert_eq!(clock.physical_ms, 42);
201 assert_eq!(clock.logical, 7);
202 }
203
204 #[test]
205 fn hlc_as_tuple() {
206 let clock = HybridClock::with_values("test", 100, 5);
207 assert_eq!(clock.as_tuple(), (100, 5));
208 }
209}