1use chrono::{DateTime, Utc};
13use serde::{Deserialize, Serialize};
14use std::sync::Mutex;
15
16#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
21pub struct HlcTimestamp {
22 pub wall_time_ms: i64,
24 pub counter: u32,
26 pub node_id: u32,
28}
29
30impl HlcTimestamp {
31 pub fn new(wall_time_ms: i64, counter: u32, node_id: u32) -> Self {
33 Self {
34 wall_time_ms,
35 counter,
36 node_id,
37 }
38 }
39
40 pub fn to_datetime(&self) -> DateTime<Utc> {
42 DateTime::from_timestamp_millis(self.wall_time_ms)
43 .unwrap_or_else(|| Utc::now())
44 }
45
46 pub fn from_datetime(dt: DateTime<Utc>) -> Self {
48 Self {
49 wall_time_ms: dt.timestamp_millis(),
50 counter: 0,
51 node_id: 0,
52 }
53 }
54}
55
56impl PartialOrd for HlcTimestamp {
57 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
58 Some(self.cmp(other))
59 }
60}
61
62impl Ord for HlcTimestamp {
63 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
64 self.wall_time_ms
65 .cmp(&other.wall_time_ms)
66 .then(self.counter.cmp(&other.counter))
67 .then(self.node_id.cmp(&other.node_id))
68 }
69}
70
71impl std::fmt::Display for HlcTimestamp {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 write!(
74 f,
75 "{}:{}:{}",
76 self.wall_time_ms, self.counter, self.node_id
77 )
78 }
79}
80
81pub struct Hlc {
93 node_id: u32,
94 state: Mutex<HlcState>,
95}
96
97struct HlcState {
98 last_wall_time_ms: i64,
99 last_counter: u32,
100}
101
102impl Hlc {
103 pub fn new(node_id: u32) -> Self {
106 Self {
107 node_id,
108 state: Mutex::new(HlcState {
109 last_wall_time_ms: 0,
110 last_counter: 0,
111 }),
112 }
113 }
114
115 pub fn now(&self) -> HlcTimestamp {
118 let physical = Utc::now().timestamp_millis();
119 let mut state = self.state.lock().unwrap();
120
121 if physical > state.last_wall_time_ms {
122 state.last_wall_time_ms = physical;
124 state.last_counter = 0;
125 } else {
126 state.last_counter += 1;
128 }
129
130 HlcTimestamp {
131 wall_time_ms: state.last_wall_time_ms,
132 counter: state.last_counter,
133 node_id: self.node_id,
134 }
135 }
136
137 pub fn receive(&self, remote: &HlcTimestamp) -> HlcTimestamp {
141 let physical = Utc::now().timestamp_millis();
142 let mut state = self.state.lock().unwrap();
143
144 if physical > state.last_wall_time_ms && physical > remote.wall_time_ms {
145 state.last_wall_time_ms = physical;
147 state.last_counter = 0;
148 } else if remote.wall_time_ms > state.last_wall_time_ms {
149 state.last_wall_time_ms = remote.wall_time_ms;
151 state.last_counter = remote.counter + 1;
152 } else if state.last_wall_time_ms > remote.wall_time_ms {
153 state.last_counter += 1;
155 } else {
156 state.last_counter = std::cmp::max(state.last_counter, remote.counter) + 1;
158 }
159
160 HlcTimestamp {
161 wall_time_ms: state.last_wall_time_ms,
162 counter: state.last_counter,
163 node_id: self.node_id,
164 }
165 }
166}
167
168#[cfg(test)]
169mod tests {
170 use super::*;
171
172 #[test]
173 fn test_monotonic_ordering() {
174 let clock = Hlc::new(7);
175 let ts1 = clock.now();
176 let ts2 = clock.now();
177 let ts3 = clock.now();
178 assert!(ts2 > ts1);
179 assert!(ts3 > ts2);
180 }
181
182 #[test]
183 fn test_receive_advances_past_remote() {
184 let clock_a = Hlc::new(7);
185 let clock_b = Hlc::new(11);
186
187 let ts_a = clock_a.now();
188 let ts_b = clock_b.receive(&ts_a);
189
190 assert!(ts_b > ts_a);
192 assert_eq!(ts_b.node_id, 11);
194 }
195
196 #[test]
197 fn test_total_ordering_different_nodes() {
198 let ts_a = HlcTimestamp::new(1000, 0, 7);
200 let ts_b = HlcTimestamp::new(1000, 0, 11);
201 assert_ne!(ts_a, ts_b);
203 assert!(ts_a < ts_b); }
206
207 #[test]
208 fn test_display() {
209 let ts = HlcTimestamp::new(1710000000000, 5, 7);
210 assert_eq!(ts.to_string(), "1710000000000:5:7");
211 }
212
213 #[test]
214 fn test_datetime_roundtrip() {
215 let now = Utc::now();
216 let ts = HlcTimestamp::from_datetime(now);
217 let back = ts.to_datetime();
218 assert!((now - back).num_milliseconds().abs() <= 1);
220 }
221
222 #[test]
223 fn test_centralized_mode() {
224 let clock = Hlc::new(0); let ts = clock.now();
226 assert_eq!(ts.node_id, 0);
227 }
228}