1use rkyv::{Archive, Deserialize, Serialize};
2use std::fmt;
3
4pub const WAL_MAGIC: [u8; 4] = [0x5A, 0x41, 0x4D, 0x21];
5pub const WAL_VERSION: u8 = 1;
6pub const WAL_VERSION_ENCRYPTED: u8 = 2;
9
10#[derive(
11 Archive,
12 Deserialize,
13 Serialize,
14 Debug,
15 Clone,
16 Copy,
17 PartialEq,
18 Eq,
19 PartialOrd,
20 Ord,
21 Hash,
22 Default,
23)]
24#[archive(check_bytes)]
25pub struct Hlc {
26 pub physical: u64,
27 pub logical: u32,
28}
29
30impl Hlc {
31 pub fn new(physical: u64, logical: u32) -> Self {
32 Self { physical, logical }
33 }
34
35 pub fn tick(&mut self, now_ms: u64) {
36 if now_ms > self.physical {
37 self.physical = now_ms;
38 self.logical = 0;
39 } else {
40 self.logical += 1;
41 }
42 }
43
44 pub fn sync(&mut self, now_ms: u64, remote: &Hlc) {
45 let max_phys = now_ms.max(self.physical).max(remote.physical);
46 if max_phys == self.physical && max_phys == remote.physical {
47 self.logical = self.logical.max(remote.logical) + 1;
48 } else if max_phys == self.physical {
49 self.logical += 1;
50 } else if max_phys == remote.physical {
51 self.physical = remote.physical;
52 self.logical = remote.logical + 1;
53 } else {
54 self.physical = max_phys;
55 self.logical = 0;
56 }
57 }
58}
59
60#[derive(
61 Archive,
62 Deserialize,
63 Serialize,
64 Debug,
65 Clone,
66 Copy,
67 PartialEq,
68 Eq,
69 PartialOrd,
70 Ord,
71 Hash,
72 Default,
73)]
74#[archive(check_bytes)]
75#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
76pub struct NodeId(pub u32);
77
78#[derive(
79 Archive,
80 Deserialize,
81 Serialize,
82 Debug,
83 Clone,
84 Copy,
85 PartialEq,
86 Eq,
87 PartialOrd,
88 Ord,
89 Hash,
90 Default,
91)]
92#[archive(check_bytes)]
93#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
94pub struct SequenceNumber(pub u64);
95
96impl SequenceNumber {
97 pub const ZERO: Self = Self(0);
98
99 pub fn next(&self) -> Self {
100 Self(self.0 + 1)
101 }
102}
103
104impl fmt::Display for SequenceNumber {
105 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
106 write!(f, "{}", self.0)
107 }
108}
109
110#[derive(Archive, Deserialize, Serialize, Debug, Clone)]
111#[archive(check_bytes)]
112pub struct Event {
113 pub origin_node: NodeId,
114 pub seq: SequenceNumber,
115 pub hlc: Hlc,
116 pub event_type: u32,
117 pub payload: Vec<u8>,
118}
119
120#[derive(Debug, Clone)]
121pub struct Chunk {
122 pub seq: SequenceNumber,
123 pub data: Vec<u8>,
124 pub crc: u32,
125}
126
127#[cfg(test)]
128mod tests {
129 use super::*;
130
131 #[test]
132 fn test_hlc_tick_advances_physical_clock() {
133 let mut h = Hlc::new(100, 5);
134 h.tick(200);
135 assert_eq!(h.physical, 200);
136 assert_eq!(h.logical, 0);
137 }
138
139 #[test]
140 fn test_hlc_tick_same_wall_increments_logical() {
141 let mut h = Hlc::new(100, 0);
142 h.tick(100);
143 assert_eq!(h.physical, 100);
144 assert_eq!(h.logical, 1);
145 h.tick(100);
146 assert_eq!(h.logical, 2);
147 }
148
149 #[test]
150 fn test_hlc_tick_clock_rollback_does_not_regress() {
151 let mut h = Hlc::new(1000, 0);
153 h.tick(50);
154 assert_eq!(h.physical, 1000, "physical must not decrease");
155 assert_eq!(h.logical, 1, "logical increments when wall is behind physical");
156 }
157
158 #[test]
159 fn test_hlc_tick_sequence_is_strictly_monotonic() {
160 let mut h = Hlc::default();
161 let mut prev = h;
162 for now in [100u64, 100, 100, 200, 200, 50, 300, 300, 300, 400] {
164 h.tick(now);
165 assert!(h > prev, "HLC must be strictly monotonic at wall={now}");
166 prev = h;
167 }
168 }
169
170 #[test]
171 fn test_hlc_sync_remote_physical_ahead() {
172 let mut local = Hlc::new(100, 0);
173 let remote = Hlc::new(200, 5);
174 local.sync(100, &remote);
175 assert_eq!(local.physical, 200);
177 assert_eq!(local.logical, 6); }
179
180 #[test]
181 fn test_hlc_sync_local_physical_ahead() {
182 let mut local = Hlc::new(500, 3);
183 let remote = Hlc::new(100, 99);
184 local.sync(100, &remote);
185 assert_eq!(local.physical, 500);
187 assert_eq!(local.logical, 4); }
189
190 #[test]
191 fn test_hlc_sync_wall_clock_ahead_of_both() {
192 let mut local = Hlc::new(100, 0);
193 let remote = Hlc::new(100, 0);
194 local.sync(500, &remote);
195 assert_eq!(local.physical, 500);
197 assert_eq!(local.logical, 0); }
199
200 #[test]
201 fn test_hlc_sync_tie_uses_max_logical() {
202 let mut local = Hlc::new(100, 3);
203 let remote = Hlc::new(100, 7);
204 local.sync(100, &remote);
205 assert_eq!(local.physical, 100);
207 assert_eq!(local.logical, 8); }
209
210 #[test]
211 fn test_hlc_sync_always_strictly_ahead_of_remote() {
212 let mut h = Hlc::default();
213 let remote = Hlc::new(999, 42);
214 h.sync(1, &remote);
215 assert!(h > remote, "local HLC must be strictly ahead of remote after sync");
216 }
217
218 #[test]
219 fn test_hlc_total_order() {
220 let a = Hlc::new(100, 5);
221 let b = Hlc::new(100, 6);
222 let c = Hlc::new(200, 0);
223 assert!(a < b, "same physical, lower logical is smaller");
224 assert!(b < c, "higher physical wins regardless of logical");
225 assert!(a < c);
226 }
227}