Skip to main content

aida_core/
hlc.rs

1// trace:ARCH-distributed-hlc | ai:claude
2//! Hybrid Logical Clock (HLC) implementation for distributed timestamp ordering.
3//!
4//! HLC combines wall clock time with a logical counter to provide timestamps that:
5//! - Stay close to wall time (unlike pure Lamport clocks)
6//! - Preserve causality (unlike raw wall clocks under clock skew)
7//! - Are totally ordered (wall_time, counter, node_id)
8//!
9//! Reference: "Logical Physical Clocks and Consistent Snapshots in Globally
10//! Distributed Databases" (Kulkarni et al., 2014)
11
12use chrono::{DateTime, Utc};
13use serde::{Deserialize, Serialize};
14use std::sync::Mutex;
15
16/// A Hybrid Logical Clock timestamp.
17///
18/// Ordering: (wall_time, counter, node_id) — fully deterministic.
19/// Two HLC timestamps from different nodes are never equal.
20#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
21pub struct HlcTimestamp {
22    /// Wall clock component (milliseconds since Unix epoch)
23    pub wall_time_ms: i64,
24    /// Logical counter — incremented when wall clock hasn't advanced
25    pub counter: u32,
26    /// Node that generated this timestamp (0 for centralized mode)
27    pub node_id: u32,
28}
29
30impl HlcTimestamp {
31    /// Create a timestamp from components.
32    pub fn new(wall_time_ms: i64, counter: u32, node_id: u32) -> Self {
33        Self {
34            wall_time_ms,
35            counter,
36            node_id,
37        }
38    }
39
40    /// Convert to a chrono DateTime (loses counter and node_id precision).
41    pub fn to_datetime(&self) -> DateTime<Utc> {
42        DateTime::from_timestamp_millis(self.wall_time_ms)
43            .unwrap_or_else(|| Utc::now())
44    }
45
46    /// Create from a chrono DateTime (sets counter=0, node_id=0).
47    pub fn from_datetime(dt: DateTime<Utc>) -> Self {
48        Self {
49            wall_time_ms: dt.timestamp_millis(),
50            counter: 0,
51            node_id: 0,
52        }
53    }
54}
55
56impl PartialOrd for HlcTimestamp {
57    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
58        Some(self.cmp(other))
59    }
60}
61
62impl Ord for HlcTimestamp {
63    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
64        self.wall_time_ms
65            .cmp(&other.wall_time_ms)
66            .then(self.counter.cmp(&other.counter))
67            .then(self.node_id.cmp(&other.node_id))
68    }
69}
70
71impl std::fmt::Display for HlcTimestamp {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        write!(
74            f,
75            "{}:{}:{}",
76            self.wall_time_ms, self.counter, self.node_id
77        )
78    }
79}
80
81/// The HLC clock instance. Thread-safe via internal Mutex.
82///
83/// Usage:
84/// ```
85/// use aida_core::hlc::Hlc;
86///
87/// let clock = Hlc::new(7); // node_id = 7
88/// let ts1 = clock.now();
89/// let ts2 = clock.now();
90/// assert!(ts2 > ts1);
91/// ```
92pub struct Hlc {
93    node_id: u32,
94    state: Mutex<HlcState>,
95}
96
97struct HlcState {
98    last_wall_time_ms: i64,
99    last_counter: u32,
100}
101
102impl Hlc {
103    /// Create a new HLC for the given node.
104    /// Use node_id=0 for centralized (single-server) mode.
105    pub fn new(node_id: u32) -> Self {
106        Self {
107            node_id,
108            state: Mutex::new(HlcState {
109                last_wall_time_ms: 0,
110                last_counter: 0,
111            }),
112        }
113    }
114
115    /// Generate a new timestamp guaranteed to be greater than any previous
116    /// timestamp from this clock or any received timestamp.
117    pub fn now(&self) -> HlcTimestamp {
118        let physical = Utc::now().timestamp_millis();
119        let mut state = self.state.lock().unwrap();
120
121        if physical > state.last_wall_time_ms {
122            // Wall clock advanced — reset counter
123            state.last_wall_time_ms = physical;
124            state.last_counter = 0;
125        } else {
126            // Wall clock hasn't advanced (same ms or skew) — increment counter
127            state.last_counter += 1;
128        }
129
130        HlcTimestamp {
131            wall_time_ms: state.last_wall_time_ms,
132            counter: state.last_counter,
133            node_id: self.node_id,
134        }
135    }
136
137    /// Update the clock after receiving a remote timestamp.
138    /// Returns a new timestamp that is causally after both the local clock
139    /// and the received timestamp.
140    pub fn receive(&self, remote: &HlcTimestamp) -> HlcTimestamp {
141        let physical = Utc::now().timestamp_millis();
142        let mut state = self.state.lock().unwrap();
143
144        if physical > state.last_wall_time_ms && physical > remote.wall_time_ms {
145            // Physical clock is ahead of everything — use it
146            state.last_wall_time_ms = physical;
147            state.last_counter = 0;
148        } else if remote.wall_time_ms > state.last_wall_time_ms {
149            // Remote is ahead — adopt its wall time
150            state.last_wall_time_ms = remote.wall_time_ms;
151            state.last_counter = remote.counter + 1;
152        } else if state.last_wall_time_ms > remote.wall_time_ms {
153            // We're ahead — just increment our counter
154            state.last_counter += 1;
155        } else {
156            // Same wall time — take max counter + 1
157            state.last_counter = std::cmp::max(state.last_counter, remote.counter) + 1;
158        }
159
160        HlcTimestamp {
161            wall_time_ms: state.last_wall_time_ms,
162            counter: state.last_counter,
163            node_id: self.node_id,
164        }
165    }
166}
167
168#[cfg(test)]
169mod tests {
170    use super::*;
171
172    #[test]
173    fn test_monotonic_ordering() {
174        let clock = Hlc::new(7);
175        let ts1 = clock.now();
176        let ts2 = clock.now();
177        let ts3 = clock.now();
178        assert!(ts2 > ts1);
179        assert!(ts3 > ts2);
180    }
181
182    #[test]
183    fn test_receive_advances_past_remote() {
184        let clock_a = Hlc::new(7);
185        let clock_b = Hlc::new(11);
186
187        let ts_a = clock_a.now();
188        let ts_b = clock_b.receive(&ts_a);
189
190        // ts_b must be after ts_a
191        assert!(ts_b > ts_a);
192        // ts_b should have node_id 11
193        assert_eq!(ts_b.node_id, 11);
194    }
195
196    #[test]
197    fn test_total_ordering_different_nodes() {
198        // Two timestamps with same wall_time and counter but different node_id
199        let ts_a = HlcTimestamp::new(1000, 0, 7);
200        let ts_b = HlcTimestamp::new(1000, 0, 11);
201        // They should not be equal
202        assert_ne!(ts_a, ts_b);
203        // One must be less than the other (deterministic)
204        assert!(ts_a < ts_b); // node 7 < node 11
205    }
206
207    #[test]
208    fn test_display() {
209        let ts = HlcTimestamp::new(1710000000000, 5, 7);
210        assert_eq!(ts.to_string(), "1710000000000:5:7");
211    }
212
213    #[test]
214    fn test_datetime_roundtrip() {
215        let now = Utc::now();
216        let ts = HlcTimestamp::from_datetime(now);
217        let back = ts.to_datetime();
218        // Within 1ms due to millisecond truncation
219        assert!((now - back).num_milliseconds().abs() <= 1);
220    }
221
222    #[test]
223    fn test_centralized_mode() {
224        let clock = Hlc::new(0); // centralized mode
225        let ts = clock.now();
226        assert_eq!(ts.node_id, 0);
227    }
228}