Skip to main content

like_a_clockwork/
lamport.rs

1use serde::{Deserialize, Serialize};
2use std::cmp::Ordering;
3use std::fmt;
4use std::str::FromStr;
5
6#[derive(Debug, Clone, Serialize, Deserialize)]
7pub struct LamportClock {
8    node_id: String,
9    time: u64,
10}
11
12#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
13pub struct LamportTimestamp {
14    node_id: String,
15    time: u64,
16}
17
18#[derive(Debug, thiserror::Error)]
19pub enum LamportTimestampError {
20    #[error("missing ':' delimiter")]
21    MissingDelimiter,
22    #[error("invalid time value: {0}")]
23    InvalidTime(String),
24    #[error("empty node id")]
25    EmptyNodeId,
26}
27
28// --- LamportClock ---
29
30impl LamportClock {
31    pub fn new(node_id: &str) -> Self {
32        assert!(!node_id.is_empty(), "node_id must not be empty");
33        assert!(!node_id.contains(':'), "node_id must not contain ':'");
34        Self {
35            node_id: node_id.to_string(),
36            time: 0,
37        }
38    }
39
40    pub fn tick(&mut self) -> u64 {
41        self.time += 1;
42        self.time
43    }
44
45    pub fn send(&mut self) -> LamportTimestamp {
46        self.tick();
47        LamportTimestamp {
48            node_id: self.node_id.clone(),
49            time: self.time,
50        }
51    }
52
53    pub fn receive(&mut self, timestamp: &LamportTimestamp) -> u64 {
54        self.time = self.time.max(timestamp.time) + 1;
55        self.time
56    }
57
58    pub fn time(&self) -> u64 {
59        self.time
60    }
61
62    pub fn node_id(&self) -> &str {
63        &self.node_id
64    }
65}
66
67// --- LamportTimestamp ---
68
69impl LamportTimestamp {
70    pub fn node_id(&self) -> &str {
71        &self.node_id
72    }
73
74    pub fn time(&self) -> u64 {
75        self.time
76    }
77}
78
79impl fmt::Display for LamportTimestamp {
80    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
81        write!(f, "{}:{}", self.node_id, self.time)
82    }
83}
84
85impl FromStr for LamportTimestamp {
86    type Err = LamportTimestampError;
87
88    fn from_str(s: &str) -> Result<Self, Self::Err> {
89        let colon_pos = s.rfind(':').ok_or(LamportTimestampError::MissingDelimiter)?;
90        let node_id = &s[..colon_pos];
91        let time_str = &s[colon_pos + 1..];
92
93        if node_id.is_empty() {
94            return Err(LamportTimestampError::EmptyNodeId);
95        }
96
97        let time = time_str
98            .parse::<u64>()
99            .map_err(|_| LamportTimestampError::InvalidTime(time_str.to_string()))?;
100
101        Ok(Self {
102            node_id: node_id.to_string(),
103            time,
104        })
105    }
106}
107
108impl PartialOrd for LamportTimestamp {
109    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
110        Some(self.cmp(other))
111    }
112}
113
114impl Ord for LamportTimestamp {
115    fn cmp(&self, other: &Self) -> Ordering {
116        self.time
117            .cmp(&other.time)
118            .then_with(|| self.node_id.cmp(&other.node_id))
119    }
120}
121
122// --- Tests ---
123
124#[cfg(test)]
125mod tests {
126    use super::*;
127
128    #[test]
129    fn new_creates_clock_with_zero_time() {
130        let clock = LamportClock::new("node-1");
131        assert_eq!(clock.time(), 0);
132    }
133
134    #[test]
135    fn new_preserves_node_id() {
136        let clock = LamportClock::new("order-service");
137        assert_eq!(clock.node_id(), "order-service");
138    }
139
140    #[test]
141    #[should_panic]
142    fn new_rejects_empty_node_id() {
143        LamportClock::new("");
144    }
145
146    #[test]
147    #[should_panic]
148    fn new_rejects_node_id_with_colon() {
149        LamportClock::new("bad:id");
150    }
151
152    #[test]
153    fn timestamp_accessors_return_correct_values() {
154        let mut clock = LamportClock::new("svc");
155        let ts = clock.send();
156        assert_eq!(ts.node_id(), "svc");
157        assert_eq!(ts.time(), 1);
158    }
159
160    #[test]
161    fn tick_returns_one_on_first_call() {
162        let mut clock = LamportClock::new("a");
163        assert_eq!(clock.tick(), 1);
164    }
165
166    #[test]
167    fn tick_increments_monotonically() {
168        let mut clock = LamportClock::new("a");
169        assert_eq!(clock.tick(), 1);
170        assert_eq!(clock.tick(), 2);
171        assert_eq!(clock.tick(), 3);
172    }
173
174    #[test]
175    fn send_increments_and_returns_timestamp() {
176        let mut clock = LamportClock::new("node");
177        let ts = clock.send();
178        assert_eq!(ts.time(), 1);
179        assert_eq!(clock.time(), 1);
180    }
181
182    #[test]
183    fn send_timestamp_contains_correct_node_id() {
184        let mut clock = LamportClock::new("order-service");
185        let ts = clock.send();
186        assert_eq!(ts.node_id(), "order-service");
187    }
188
189    #[test]
190    fn receive_advances_past_remote_timestamp() {
191        let mut clock_a = LamportClock::new("a");
192        let mut clock_b = LamportClock::new("b");
193
194        for _ in 0..5 {
195            clock_a.tick();
196        }
197        let ts = clock_a.send(); // time=6
198
199        let new_time = clock_b.receive(&ts);
200        assert_eq!(new_time, 7); // max(0,6)+1
201        assert_eq!(clock_b.time(), 7);
202    }
203
204    #[test]
205    fn receive_with_lower_remote_still_increments() {
206        let mut clock = LamportClock::new("a");
207        for _ in 0..10 {
208            clock.tick();
209        }
210
211        let remote_ts = LamportTimestamp {
212            node_id: "b".to_string(),
213            time: 3,
214        };
215
216        let new_time = clock.receive(&remote_ts);
217        assert_eq!(new_time, 11); // max(10,3)+1
218    }
219
220    #[test]
221    fn receive_with_equal_remote_increments() {
222        let mut clock = LamportClock::new("a");
223        for _ in 0..5 {
224            clock.tick();
225        }
226
227        let remote_ts = LamportTimestamp {
228            node_id: "b".to_string(),
229            time: 5,
230        };
231
232        let new_time = clock.receive(&remote_ts);
233        assert_eq!(new_time, 6); // max(5,5)+1
234    }
235
236    #[test]
237    fn timestamp_display_format() {
238        let ts = LamportTimestamp {
239            node_id: "order-service".to_string(),
240            time: 42,
241        };
242        assert_eq!(ts.to_string(), "order-service:42");
243    }
244
245    #[test]
246    fn timestamp_parse_roundtrip() {
247        let ts = LamportTimestamp {
248            node_id: "order-service".to_string(),
249            time: 42,
250        };
251        let s = ts.to_string();
252        let parsed: LamportTimestamp = s.parse().unwrap();
253        assert_eq!(parsed, ts);
254    }
255
256    #[test]
257    fn timestamp_parse_missing_delimiter() {
258        let result = "no-delimiter".parse::<LamportTimestamp>();
259        assert!(result.is_err());
260        assert!(matches!(
261            result.unwrap_err(),
262            LamportTimestampError::MissingDelimiter
263        ));
264    }
265
266    #[test]
267    fn timestamp_parse_invalid_time() {
268        let result = "node:abc".parse::<LamportTimestamp>();
269        assert!(result.is_err());
270        assert!(matches!(
271            result.unwrap_err(),
272            LamportTimestampError::InvalidTime(_)
273        ));
274    }
275
276    #[test]
277    fn timestamp_parse_empty_node_id() {
278        let result = ":42".parse::<LamportTimestamp>();
279        assert!(result.is_err());
280        assert!(matches!(
281            result.unwrap_err(),
282            LamportTimestampError::EmptyNodeId
283        ));
284    }
285
286    #[test]
287    fn timestamp_ordering_by_time() {
288        let ts1 = LamportTimestamp {
289            node_id: "a".to_string(),
290            time: 1,
291        };
292        let ts2 = LamportTimestamp {
293            node_id: "a".to_string(),
294            time: 2,
295        };
296        assert!(ts1 < ts2);
297    }
298
299    #[test]
300    fn timestamp_ordering_tiebreak_by_node_id() {
301        let ts_a = LamportTimestamp {
302            node_id: "a".to_string(),
303            time: 5,
304        };
305        let ts_b = LamportTimestamp {
306            node_id: "b".to_string(),
307            time: 5,
308        };
309        assert!(ts_a < ts_b);
310    }
311
312    #[test]
313    fn serde_json_roundtrip_clock() {
314        let clock = LamportClock::new("serde-node");
315        let json = serde_json::to_string(&clock).unwrap();
316        let deserialized: LamportClock = serde_json::from_str(&json).unwrap();
317        assert_eq!(deserialized.node_id(), "serde-node");
318        assert_eq!(deserialized.time(), 0);
319    }
320
321    #[test]
322    fn serde_json_roundtrip_timestamp() {
323        let ts = LamportTimestamp {
324            node_id: "ts-node".to_string(),
325            time: 99,
326        };
327        let json = serde_json::to_string(&ts).unwrap();
328        let deserialized: LamportTimestamp = serde_json::from_str(&json).unwrap();
329        assert_eq!(deserialized, ts);
330    }
331}