like_a_clockwork/
lamport.rs1use serde::{Deserialize, Serialize};
2use std::cmp::Ordering;
3use std::fmt;
4use std::str::FromStr;
5
6#[derive(Debug, Clone, Serialize, Deserialize)]
7pub struct LamportClock {
8 node_id: String,
9 time: u64,
10}
11
12#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
13pub struct LamportTimestamp {
14 node_id: String,
15 time: u64,
16}
17
18#[derive(Debug, thiserror::Error)]
19pub enum LamportTimestampError {
20 #[error("missing ':' delimiter")]
21 MissingDelimiter,
22 #[error("invalid time value: {0}")]
23 InvalidTime(String),
24 #[error("empty node id")]
25 EmptyNodeId,
26}
27
28impl LamportClock {
31 pub fn new(node_id: &str) -> Self {
32 assert!(!node_id.is_empty(), "node_id must not be empty");
33 assert!(!node_id.contains(':'), "node_id must not contain ':'");
34 Self {
35 node_id: node_id.to_string(),
36 time: 0,
37 }
38 }
39
40 pub fn tick(&mut self) -> u64 {
41 self.time += 1;
42 self.time
43 }
44
45 pub fn send(&mut self) -> LamportTimestamp {
46 self.tick();
47 LamportTimestamp {
48 node_id: self.node_id.clone(),
49 time: self.time,
50 }
51 }
52
53 pub fn receive(&mut self, timestamp: &LamportTimestamp) -> u64 {
54 self.time = self.time.max(timestamp.time) + 1;
55 self.time
56 }
57
58 pub fn time(&self) -> u64 {
59 self.time
60 }
61
62 pub fn node_id(&self) -> &str {
63 &self.node_id
64 }
65}
66
67impl LamportTimestamp {
70 pub fn node_id(&self) -> &str {
71 &self.node_id
72 }
73
74 pub fn time(&self) -> u64 {
75 self.time
76 }
77}
78
79impl fmt::Display for LamportTimestamp {
80 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
81 write!(f, "{}:{}", self.node_id, self.time)
82 }
83}
84
85impl FromStr for LamportTimestamp {
86 type Err = LamportTimestampError;
87
88 fn from_str(s: &str) -> Result<Self, Self::Err> {
89 let colon_pos = s.rfind(':').ok_or(LamportTimestampError::MissingDelimiter)?;
90 let node_id = &s[..colon_pos];
91 let time_str = &s[colon_pos + 1..];
92
93 if node_id.is_empty() {
94 return Err(LamportTimestampError::EmptyNodeId);
95 }
96
97 let time = time_str
98 .parse::<u64>()
99 .map_err(|_| LamportTimestampError::InvalidTime(time_str.to_string()))?;
100
101 Ok(Self {
102 node_id: node_id.to_string(),
103 time,
104 })
105 }
106}
107
108impl PartialOrd for LamportTimestamp {
109 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
110 Some(self.cmp(other))
111 }
112}
113
114impl Ord for LamportTimestamp {
115 fn cmp(&self, other: &Self) -> Ordering {
116 self.time
117 .cmp(&other.time)
118 .then_with(|| self.node_id.cmp(&other.node_id))
119 }
120}
121
122#[cfg(test)]
125mod tests {
126 use super::*;
127
128 #[test]
129 fn new_creates_clock_with_zero_time() {
130 let clock = LamportClock::new("node-1");
131 assert_eq!(clock.time(), 0);
132 }
133
134 #[test]
135 fn new_preserves_node_id() {
136 let clock = LamportClock::new("order-service");
137 assert_eq!(clock.node_id(), "order-service");
138 }
139
140 #[test]
141 #[should_panic]
142 fn new_rejects_empty_node_id() {
143 LamportClock::new("");
144 }
145
146 #[test]
147 #[should_panic]
148 fn new_rejects_node_id_with_colon() {
149 LamportClock::new("bad:id");
150 }
151
152 #[test]
153 fn timestamp_accessors_return_correct_values() {
154 let mut clock = LamportClock::new("svc");
155 let ts = clock.send();
156 assert_eq!(ts.node_id(), "svc");
157 assert_eq!(ts.time(), 1);
158 }
159
160 #[test]
161 fn tick_returns_one_on_first_call() {
162 let mut clock = LamportClock::new("a");
163 assert_eq!(clock.tick(), 1);
164 }
165
166 #[test]
167 fn tick_increments_monotonically() {
168 let mut clock = LamportClock::new("a");
169 assert_eq!(clock.tick(), 1);
170 assert_eq!(clock.tick(), 2);
171 assert_eq!(clock.tick(), 3);
172 }
173
174 #[test]
175 fn send_increments_and_returns_timestamp() {
176 let mut clock = LamportClock::new("node");
177 let ts = clock.send();
178 assert_eq!(ts.time(), 1);
179 assert_eq!(clock.time(), 1);
180 }
181
182 #[test]
183 fn send_timestamp_contains_correct_node_id() {
184 let mut clock = LamportClock::new("order-service");
185 let ts = clock.send();
186 assert_eq!(ts.node_id(), "order-service");
187 }
188
189 #[test]
190 fn receive_advances_past_remote_timestamp() {
191 let mut clock_a = LamportClock::new("a");
192 let mut clock_b = LamportClock::new("b");
193
194 for _ in 0..5 {
195 clock_a.tick();
196 }
197 let ts = clock_a.send(); let new_time = clock_b.receive(&ts);
200 assert_eq!(new_time, 7); assert_eq!(clock_b.time(), 7);
202 }
203
204 #[test]
205 fn receive_with_lower_remote_still_increments() {
206 let mut clock = LamportClock::new("a");
207 for _ in 0..10 {
208 clock.tick();
209 }
210
211 let remote_ts = LamportTimestamp {
212 node_id: "b".to_string(),
213 time: 3,
214 };
215
216 let new_time = clock.receive(&remote_ts);
217 assert_eq!(new_time, 11); }
219
220 #[test]
221 fn receive_with_equal_remote_increments() {
222 let mut clock = LamportClock::new("a");
223 for _ in 0..5 {
224 clock.tick();
225 }
226
227 let remote_ts = LamportTimestamp {
228 node_id: "b".to_string(),
229 time: 5,
230 };
231
232 let new_time = clock.receive(&remote_ts);
233 assert_eq!(new_time, 6); }
235
236 #[test]
237 fn timestamp_display_format() {
238 let ts = LamportTimestamp {
239 node_id: "order-service".to_string(),
240 time: 42,
241 };
242 assert_eq!(ts.to_string(), "order-service:42");
243 }
244
245 #[test]
246 fn timestamp_parse_roundtrip() {
247 let ts = LamportTimestamp {
248 node_id: "order-service".to_string(),
249 time: 42,
250 };
251 let s = ts.to_string();
252 let parsed: LamportTimestamp = s.parse().unwrap();
253 assert_eq!(parsed, ts);
254 }
255
256 #[test]
257 fn timestamp_parse_missing_delimiter() {
258 let result = "no-delimiter".parse::<LamportTimestamp>();
259 assert!(result.is_err());
260 assert!(matches!(
261 result.unwrap_err(),
262 LamportTimestampError::MissingDelimiter
263 ));
264 }
265
266 #[test]
267 fn timestamp_parse_invalid_time() {
268 let result = "node:abc".parse::<LamportTimestamp>();
269 assert!(result.is_err());
270 assert!(matches!(
271 result.unwrap_err(),
272 LamportTimestampError::InvalidTime(_)
273 ));
274 }
275
276 #[test]
277 fn timestamp_parse_empty_node_id() {
278 let result = ":42".parse::<LamportTimestamp>();
279 assert!(result.is_err());
280 assert!(matches!(
281 result.unwrap_err(),
282 LamportTimestampError::EmptyNodeId
283 ));
284 }
285
286 #[test]
287 fn timestamp_ordering_by_time() {
288 let ts1 = LamportTimestamp {
289 node_id: "a".to_string(),
290 time: 1,
291 };
292 let ts2 = LamportTimestamp {
293 node_id: "a".to_string(),
294 time: 2,
295 };
296 assert!(ts1 < ts2);
297 }
298
299 #[test]
300 fn timestamp_ordering_tiebreak_by_node_id() {
301 let ts_a = LamportTimestamp {
302 node_id: "a".to_string(),
303 time: 5,
304 };
305 let ts_b = LamportTimestamp {
306 node_id: "b".to_string(),
307 time: 5,
308 };
309 assert!(ts_a < ts_b);
310 }
311
312 #[test]
313 fn serde_json_roundtrip_clock() {
314 let clock = LamportClock::new("serde-node");
315 let json = serde_json::to_string(&clock).unwrap();
316 let deserialized: LamportClock = serde_json::from_str(&json).unwrap();
317 assert_eq!(deserialized.node_id(), "serde-node");
318 assert_eq!(deserialized.time(), 0);
319 }
320
321 #[test]
322 fn serde_json_roundtrip_timestamp() {
323 let ts = LamportTimestamp {
324 node_id: "ts-node".to_string(),
325 time: 99,
326 };
327 let json = serde_json::to_string(&ts).unwrap();
328 let deserialized: LamportTimestamp = serde_json::from_str(&json).unwrap();
329 assert_eq!(deserialized, ts);
330 }
331}