Skip to main content

crdt_kit/
clock.rs

1//! Hybrid Logical Clock (HLC) for causal ordering.
2//!
3//! HLC combines physical time with a logical counter to provide:
4//! - **Monotonic timestamps** even when the physical clock goes backward
5//! - **Causal ordering** without full vector clocks
6//! - **Fixed size** (12 bytes) regardless of the number of nodes
7//!
8//! This is ideal for edge/IoT where clocks are not perfectly synchronized.
9//!
10//! # Example
11//!
12//! ```
13//! use crdt_kit::clock::{HybridClock, HybridTimestamp};
14//!
15//! let mut clock = HybridClock::new(1); // node_id = 1
16//!
17//! // Generate a timestamp for a local event
18//! let ts1 = clock.now();
19//! let ts2 = clock.now();
20//! assert!(ts2 > ts1);
21//!
22//! // Receive a timestamp from a remote node
23//! let remote_ts = HybridTimestamp { physical: ts2.physical + 1000, logical: 0, node_id: 2 };
24//! let ts3 = clock.receive(&remote_ts);
25//! assert!(ts3 > remote_ts);
26//! ```
27
28use core::cmp;
29
30use crate::NodeId;
31
32/// A timestamp from a Hybrid Logical Clock.
33///
34/// Consists of:
35/// - `physical`: milliseconds since Unix epoch (or any monotonic source)
36/// - `logical`: counter for events within the same physical millisecond
37/// - `node_id`: tiebreaker to ensure total ordering across nodes (lower 16 bits of [`NodeId`])
38///
39/// Total size: 12 bytes (u64 + u16 + u16).
40///
41/// **Note:** `node_id` uses only 16 bits for compact timestamps. When
42/// constructing from a [`NodeId`] (u64), only the lower 16 bits are used
43/// for tiebreaking. This is sufficient for up to 65 535 concurrent nodes.
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
45#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
46pub struct HybridTimestamp {
47    /// Physical time component (milliseconds).
48    pub physical: u64,
49    /// Logical counter for same-millisecond ordering.
50    pub logical: u16,
51    /// Node identifier for deterministic tiebreaking (lower 16 bits of [`NodeId`]).
52    pub node_id: u16,
53}
54
55impl HybridTimestamp {
56    /// Create a zero timestamp.
57    pub fn zero() -> Self {
58        Self {
59            physical: 0,
60            logical: 0,
61            node_id: 0,
62        }
63    }
64
65    /// Pack into a u128 for efficient comparison and storage.
66    /// Layout: [physical: 64 bits][logical: 16 bits][node_id: 16 bits][reserved: 32 bits]
67    pub fn to_u128(&self) -> u128 {
68        ((self.physical as u128) << 64)
69            | ((self.logical as u128) << 48)
70            | ((self.node_id as u128) << 32)
71    }
72}
73
74impl Ord for HybridTimestamp {
75    fn cmp(&self, other: &Self) -> cmp::Ordering {
76        self.physical
77            .cmp(&other.physical)
78            .then(self.logical.cmp(&other.logical))
79            .then(self.node_id.cmp(&other.node_id))
80    }
81}
82
83impl PartialOrd for HybridTimestamp {
84    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
85        Some(self.cmp(other))
86    }
87}
88
89/// A Hybrid Logical Clock instance for a single node.
90///
91/// Call [`now`](HybridClock::now) to generate timestamps for local events.
92/// Call [`receive`](HybridClock::receive) when processing a remote timestamp.
93///
94/// Accepts a [`NodeId`] (u64) on construction. Only the lower 16 bits are
95/// embedded in generated timestamps for compact tiebreaking.
96#[derive(Debug, Clone)]
97pub struct HybridClock {
98    node_id: u16,
99    last: HybridTimestamp,
100    /// Function to get the current physical time in milliseconds.
101    /// On `std`, this defaults to `SystemTime`. On `no_std`, you provide it.
102    physical_time_fn: fn() -> u64,
103}
104
105#[cfg(feature = "std")]
106fn system_time_ms() -> u64 {
107    std::time::SystemTime::now()
108        .duration_since(std::time::UNIX_EPOCH)
109        .unwrap_or_default()
110        .as_millis() as u64
111}
112
113#[cfg(not(feature = "std"))]
114fn fallback_time_ms() -> u64 {
115    0 // In no_std, user must provide a time source
116}
117
118impl HybridClock {
119    /// Create a new clock for the given node.
120    ///
121    /// Accepts a [`NodeId`] (u64). Only the lower 16 bits are embedded in
122    /// generated timestamps for compact tiebreaking.
123    ///
124    /// On `std` targets, uses `SystemTime` for physical time.
125    /// On `no_std` targets, use [`with_time_source`](Self::with_time_source).
126    pub fn new(node_id: NodeId) -> Self {
127        Self {
128            node_id: node_id as u16,
129            last: HybridTimestamp::zero(),
130            #[cfg(feature = "std")]
131            physical_time_fn: system_time_ms,
132            #[cfg(not(feature = "std"))]
133            physical_time_fn: fallback_time_ms,
134        }
135    }
136
137    /// Create a clock with a custom physical time source.
138    ///
139    /// Accepts a [`NodeId`] (u64). Only the lower 16 bits are embedded in
140    /// generated timestamps. The function should return milliseconds
141    /// (monotonic if possible).
142    pub fn with_time_source(node_id: NodeId, time_fn: fn() -> u64) -> Self {
143        Self {
144            node_id: node_id as u16,
145            last: HybridTimestamp::zero(),
146            physical_time_fn: time_fn,
147        }
148    }
149
150    /// Generate a timestamp for a local event.
151    ///
152    /// Guarantees monotonically increasing timestamps even if the
153    /// physical clock goes backward.
154    pub fn now(&mut self) -> HybridTimestamp {
155        let pt = (self.physical_time_fn)();
156
157        if pt > self.last.physical {
158            self.last = HybridTimestamp {
159                physical: pt,
160                logical: 0,
161                node_id: self.node_id,
162            };
163        } else {
164            self.last = HybridTimestamp {
165                physical: self.last.physical,
166                logical: self.last.logical + 1,
167                node_id: self.node_id,
168            };
169        }
170
171        self.last
172    }
173
174    /// Update the clock upon receiving a remote timestamp.
175    ///
176    /// Returns a new timestamp that is strictly greater than both
177    /// the local clock and the received timestamp.
178    pub fn receive(&mut self, remote: &HybridTimestamp) -> HybridTimestamp {
179        let pt = (self.physical_time_fn)();
180        let max_pt = cmp::max(cmp::max(pt, self.last.physical), remote.physical);
181
182        let logical = if max_pt == self.last.physical && max_pt == remote.physical {
183            cmp::max(self.last.logical, remote.logical) + 1
184        } else if max_pt == self.last.physical {
185            self.last.logical + 1
186        } else if max_pt == remote.physical {
187            remote.logical + 1
188        } else {
189            0
190        };
191
192        self.last = HybridTimestamp {
193            physical: max_pt,
194            logical,
195            node_id: self.node_id,
196        };
197
198        self.last
199    }
200
201    /// Get the node ID of this clock.
202    pub fn node_id(&self) -> u16 {
203        self.node_id
204    }
205
206    /// Get the last generated timestamp.
207    pub fn last_timestamp(&self) -> HybridTimestamp {
208        self.last
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215    use core::sync::atomic::{AtomicU64, Ordering};
216
217    static MOCK_TIME: AtomicU64 = AtomicU64::new(1000);
218
219    fn mock_time() -> u64 {
220        MOCK_TIME.load(Ordering::SeqCst)
221    }
222
223    fn set_mock_time(ms: u64) {
224        MOCK_TIME.store(ms, Ordering::SeqCst);
225    }
226
227    #[test]
228    fn monotonic_within_same_ms() {
229        set_mock_time(5000);
230        let mut clock = HybridClock::with_time_source(1, mock_time);
231
232        let ts1 = clock.now();
233        let ts2 = clock.now();
234        let ts3 = clock.now();
235
236        assert!(ts1 < ts2);
237        assert!(ts2 < ts3);
238        assert_eq!(ts1.physical, 5000);
239        assert_eq!(ts1.logical, 0);
240        assert_eq!(ts2.logical, 1);
241        assert_eq!(ts3.logical, 2);
242    }
243
244    #[test]
245    fn physical_time_advance_resets_logical() {
246        set_mock_time(1000);
247        let mut clock = HybridClock::with_time_source(1, mock_time);
248
249        let ts1 = clock.now();
250        assert_eq!(ts1.logical, 0);
251
252        let _ts2 = clock.now();
253
254        set_mock_time(2000);
255        let ts3 = clock.now();
256        assert_eq!(ts3.physical, 2000);
257        assert_eq!(ts3.logical, 0);
258    }
259
260    #[test]
261    fn receive_advances_clock() {
262        set_mock_time(1000);
263        let mut clock = HybridClock::with_time_source(1, mock_time);
264
265        // Remote is ahead
266        let remote = HybridTimestamp {
267            physical: 5000,
268            logical: 3,
269            node_id: 2,
270        };
271
272        let ts = clock.receive(&remote);
273        assert!(ts > remote);
274        assert_eq!(ts.physical, 5000);
275        assert_eq!(ts.logical, 4); // remote.logical + 1
276    }
277
278    #[test]
279    fn receive_same_physical_time() {
280        set_mock_time(5000);
281        let mut clock = HybridClock::with_time_source(1, mock_time);
282
283        let _local = clock.now(); // physical=5000, logical=0
284
285        let remote = HybridTimestamp {
286            physical: 5000,
287            logical: 5,
288            node_id: 2,
289        };
290
291        let ts = clock.receive(&remote);
292        assert!(ts > remote);
293        assert_eq!(ts.physical, 5000);
294        assert_eq!(ts.logical, 6); // max(0, 5) + 1
295    }
296
297    #[test]
298    fn ordering_is_total() {
299        let a = HybridTimestamp {
300            physical: 1000,
301            logical: 0,
302            node_id: 1,
303        };
304        let b = HybridTimestamp {
305            physical: 1000,
306            logical: 0,
307            node_id: 2,
308        };
309        let c = HybridTimestamp {
310            physical: 1000,
311            logical: 1,
312            node_id: 1,
313        };
314
315        assert!(a < b); // same physical+logical, node_id tiebreak
316        assert!(a < c); // same physical, logical tiebreak
317        assert!(b < c); // logical > node_id in precedence
318    }
319
320    #[test]
321    fn to_u128_preserves_ordering() {
322        let a = HybridTimestamp {
323            physical: 1000,
324            logical: 5,
325            node_id: 1,
326        };
327        let b = HybridTimestamp {
328            physical: 1000,
329            logical: 6,
330            node_id: 1,
331        };
332        let c = HybridTimestamp {
333            physical: 1001,
334            logical: 0,
335            node_id: 1,
336        };
337
338        assert!(a.to_u128() < b.to_u128());
339        assert!(b.to_u128() < c.to_u128());
340    }
341}