Skip to main content

crdt_kit/
clock.rs

1//! Hybrid Logical Clock (HLC) for causal ordering.
2//!
3//! HLC combines physical time with a logical counter to provide:
4//! - **Monotonic timestamps** even when the physical clock goes backward
5//! - **Causal ordering** without full vector clocks
6//! - **Fixed size** (12 bytes) regardless of the number of nodes
7//!
8//! This is ideal for edge/IoT where clocks are not perfectly synchronized.
9//!
10//! # Example
11//!
12//! ```
13//! use crdt_kit::clock::{HybridClock, HybridTimestamp};
14//!
15//! let mut clock = HybridClock::new(1); // node_id = 1
16//!
17//! // Generate a timestamp for a local event
18//! let ts1 = clock.now();
19//! let ts2 = clock.now();
20//! assert!(ts2 > ts1);
21//!
22//! // Receive a timestamp from a remote node
23//! let remote_ts = HybridTimestamp { physical: ts2.physical + 1000, logical: 0, node_id: 2 };
24//! let ts3 = clock.receive(&remote_ts);
25//! assert!(ts3 > remote_ts);
26//! ```
27
28use core::cmp;
29
30/// A timestamp from a Hybrid Logical Clock.
31///
32/// Consists of:
33/// - `physical`: milliseconds since Unix epoch (or any monotonic source)
34/// - `logical`: counter for events within the same physical millisecond
35/// - `node_id`: tiebreaker to ensure total ordering across nodes
36///
37/// Total size: 12 bytes (u64 + u16 + u16).
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
39#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
40pub struct HybridTimestamp {
41    /// Physical time component (milliseconds).
42    pub physical: u64,
43    /// Logical counter for same-millisecond ordering.
44    pub logical: u16,
45    /// Node identifier for deterministic tiebreaking.
46    pub node_id: u16,
47}
48
49impl HybridTimestamp {
50    /// Create a zero timestamp.
51    pub fn zero() -> Self {
52        Self {
53            physical: 0,
54            logical: 0,
55            node_id: 0,
56        }
57    }
58
59    /// Pack into a u128 for efficient comparison and storage.
60    /// Layout: [physical: 64 bits][logical: 16 bits][node_id: 16 bits][reserved: 32 bits]
61    pub fn to_u128(&self) -> u128 {
62        ((self.physical as u128) << 64)
63            | ((self.logical as u128) << 48)
64            | ((self.node_id as u128) << 32)
65    }
66}
67
68impl Ord for HybridTimestamp {
69    fn cmp(&self, other: &Self) -> cmp::Ordering {
70        self.physical
71            .cmp(&other.physical)
72            .then(self.logical.cmp(&other.logical))
73            .then(self.node_id.cmp(&other.node_id))
74    }
75}
76
77impl PartialOrd for HybridTimestamp {
78    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
79        Some(self.cmp(other))
80    }
81}
82
83/// A Hybrid Logical Clock instance for a single node.
84///
85/// Call [`now`](HybridClock::now) to generate timestamps for local events.
86/// Call [`receive`](HybridClock::receive) when processing a remote timestamp.
87pub struct HybridClock {
88    node_id: u16,
89    last: HybridTimestamp,
90    /// Function to get the current physical time in milliseconds.
91    /// On `std`, this defaults to `SystemTime`. On `no_std`, you provide it.
92    physical_time_fn: fn() -> u64,
93}
94
95#[cfg(feature = "std")]
96fn system_time_ms() -> u64 {
97    std::time::SystemTime::now()
98        .duration_since(std::time::UNIX_EPOCH)
99        .unwrap_or_default()
100        .as_millis() as u64
101}
102
103#[cfg(not(feature = "std"))]
104fn fallback_time_ms() -> u64 {
105    0 // In no_std, user must provide a time source
106}
107
108impl HybridClock {
109    /// Create a new clock for the given node.
110    ///
111    /// On `std` targets, uses `SystemTime` for physical time.
112    /// On `no_std` targets, use [`with_time_source`](Self::with_time_source).
113    pub fn new(node_id: u16) -> Self {
114        Self {
115            node_id,
116            last: HybridTimestamp::zero(),
117            #[cfg(feature = "std")]
118            physical_time_fn: system_time_ms,
119            #[cfg(not(feature = "std"))]
120            physical_time_fn: fallback_time_ms,
121        }
122    }
123
124    /// Create a clock with a custom physical time source.
125    /// The function should return milliseconds (monotonic if possible).
126    pub fn with_time_source(node_id: u16, time_fn: fn() -> u64) -> Self {
127        Self {
128            node_id,
129            last: HybridTimestamp::zero(),
130            physical_time_fn: time_fn,
131        }
132    }
133
134    /// Generate a timestamp for a local event.
135    ///
136    /// Guarantees monotonically increasing timestamps even if the
137    /// physical clock goes backward.
138    pub fn now(&mut self) -> HybridTimestamp {
139        let pt = (self.physical_time_fn)();
140
141        if pt > self.last.physical {
142            self.last = HybridTimestamp {
143                physical: pt,
144                logical: 0,
145                node_id: self.node_id,
146            };
147        } else {
148            self.last = HybridTimestamp {
149                physical: self.last.physical,
150                logical: self.last.logical + 1,
151                node_id: self.node_id,
152            };
153        }
154
155        self.last
156    }
157
158    /// Update the clock upon receiving a remote timestamp.
159    ///
160    /// Returns a new timestamp that is strictly greater than both
161    /// the local clock and the received timestamp.
162    pub fn receive(&mut self, remote: &HybridTimestamp) -> HybridTimestamp {
163        let pt = (self.physical_time_fn)();
164        let max_pt = cmp::max(cmp::max(pt, self.last.physical), remote.physical);
165
166        let logical = if max_pt == self.last.physical && max_pt == remote.physical {
167            cmp::max(self.last.logical, remote.logical) + 1
168        } else if max_pt == self.last.physical {
169            self.last.logical + 1
170        } else if max_pt == remote.physical {
171            remote.logical + 1
172        } else {
173            0
174        };
175
176        self.last = HybridTimestamp {
177            physical: max_pt,
178            logical,
179            node_id: self.node_id,
180        };
181
182        self.last
183    }
184
185    /// Get the node ID of this clock.
186    pub fn node_id(&self) -> u16 {
187        self.node_id
188    }
189
190    /// Get the last generated timestamp.
191    pub fn last_timestamp(&self) -> HybridTimestamp {
192        self.last
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use super::*;
199    use core::sync::atomic::{AtomicU64, Ordering};
200
201    static MOCK_TIME: AtomicU64 = AtomicU64::new(1000);
202
203    fn mock_time() -> u64 {
204        MOCK_TIME.load(Ordering::SeqCst)
205    }
206
207    fn set_mock_time(ms: u64) {
208        MOCK_TIME.store(ms, Ordering::SeqCst);
209    }
210
211    #[test]
212    fn monotonic_within_same_ms() {
213        set_mock_time(5000);
214        let mut clock = HybridClock::with_time_source(1, mock_time);
215
216        let ts1 = clock.now();
217        let ts2 = clock.now();
218        let ts3 = clock.now();
219
220        assert!(ts1 < ts2);
221        assert!(ts2 < ts3);
222        assert_eq!(ts1.physical, 5000);
223        assert_eq!(ts1.logical, 0);
224        assert_eq!(ts2.logical, 1);
225        assert_eq!(ts3.logical, 2);
226    }
227
228    #[test]
229    fn physical_time_advance_resets_logical() {
230        set_mock_time(1000);
231        let mut clock = HybridClock::with_time_source(1, mock_time);
232
233        let ts1 = clock.now();
234        assert_eq!(ts1.logical, 0);
235
236        let _ts2 = clock.now();
237
238        set_mock_time(2000);
239        let ts3 = clock.now();
240        assert_eq!(ts3.physical, 2000);
241        assert_eq!(ts3.logical, 0);
242    }
243
244    #[test]
245    fn receive_advances_clock() {
246        set_mock_time(1000);
247        let mut clock = HybridClock::with_time_source(1, mock_time);
248
249        // Remote is ahead
250        let remote = HybridTimestamp {
251            physical: 5000,
252            logical: 3,
253            node_id: 2,
254        };
255
256        let ts = clock.receive(&remote);
257        assert!(ts > remote);
258        assert_eq!(ts.physical, 5000);
259        assert_eq!(ts.logical, 4); // remote.logical + 1
260    }
261
262    #[test]
263    fn receive_same_physical_time() {
264        set_mock_time(5000);
265        let mut clock = HybridClock::with_time_source(1, mock_time);
266
267        let _local = clock.now(); // physical=5000, logical=0
268
269        let remote = HybridTimestamp {
270            physical: 5000,
271            logical: 5,
272            node_id: 2,
273        };
274
275        let ts = clock.receive(&remote);
276        assert!(ts > remote);
277        assert_eq!(ts.physical, 5000);
278        assert_eq!(ts.logical, 6); // max(0, 5) + 1
279    }
280
281    #[test]
282    fn ordering_is_total() {
283        let a = HybridTimestamp {
284            physical: 1000,
285            logical: 0,
286            node_id: 1,
287        };
288        let b = HybridTimestamp {
289            physical: 1000,
290            logical: 0,
291            node_id: 2,
292        };
293        let c = HybridTimestamp {
294            physical: 1000,
295            logical: 1,
296            node_id: 1,
297        };
298
299        assert!(a < b); // same physical+logical, node_id tiebreak
300        assert!(a < c); // same physical, logical tiebreak
301        assert!(b < c); // logical > node_id in precedence
302    }
303
304    #[test]
305    fn to_u128_preserves_ordering() {
306        let a = HybridTimestamp {
307            physical: 1000,
308            logical: 5,
309            node_id: 1,
310        };
311        let b = HybridTimestamp {
312            physical: 1000,
313            logical: 6,
314            node_id: 1,
315        };
316        let c = HybridTimestamp {
317            physical: 1001,
318            logical: 0,
319            node_id: 1,
320        };
321
322        assert!(a.to_u128() < b.to_u128());
323        assert!(b.to_u128() < c.to_u128());
324    }
325}