1use rkyv::{Archive, Deserialize, Serialize};
2use std::fmt;
3
4pub const WAL_MAGIC: [u8; 4] = [0x5A, 0x41, 0x4D, 0x21];
5pub const WAL_VERSION: u8 = 1;
6pub const WAL_VERSION_ENCRYPTED: u8 = 2;
9
10#[derive(
11 Archive,
12 Deserialize,
13 Serialize,
14 Debug,
15 Clone,
16 Copy,
17 PartialEq,
18 Eq,
19 PartialOrd,
20 Ord,
21 Hash,
22 Default,
23)]
24#[archive(check_bytes)]
25pub struct Hlc {
26 pub physical: u64,
27 pub logical: u32,
28}
29
30impl Hlc {
31 pub fn new(physical: u64, logical: u32) -> Self {
32 Self { physical, logical }
33 }
34
35 pub fn tick(&mut self, now_ms: u64) {
36 if now_ms > self.physical {
37 self.physical = now_ms;
38 self.logical = 0;
39 } else {
40 self.logical += 1;
41 }
42 }
43
44 pub fn sync(&mut self, now_ms: u64, remote: &Hlc) {
45 let max_phys = now_ms.max(self.physical).max(remote.physical);
46 if max_phys == self.physical && max_phys == remote.physical {
47 self.logical = self.logical.max(remote.logical) + 1;
48 } else if max_phys == self.physical {
49 self.logical += 1;
50 } else if max_phys == remote.physical {
51 self.physical = remote.physical;
52 self.logical = remote.logical + 1;
53 } else {
54 self.physical = max_phys;
55 self.logical = 0;
56 }
57 }
58}
59
60#[derive(
61 Archive,
62 Deserialize,
63 Serialize,
64 Debug,
65 Clone,
66 Copy,
67 PartialEq,
68 Eq,
69 PartialOrd,
70 Ord,
71 Hash,
72 Default,
73)]
74#[archive(check_bytes)]
75#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
76pub struct NodeId(pub u32);
77
78#[derive(
79 Archive,
80 Deserialize,
81 Serialize,
82 Debug,
83 Clone,
84 Copy,
85 PartialEq,
86 Eq,
87 PartialOrd,
88 Ord,
89 Hash,
90 Default,
91)]
92#[archive(check_bytes)]
93#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
94pub struct SequenceNumber(pub u64);
95
96impl SequenceNumber {
97 pub const ZERO: Self = Self(0);
98
99 pub fn next(&self) -> Self {
100 Self(self.0 + 1)
101 }
102}
103
104impl fmt::Display for SequenceNumber {
105 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
106 write!(f, "{}", self.0)
107 }
108}
109
110#[derive(Archive, Deserialize, Serialize, Debug, Clone)]
111#[archive(check_bytes)]
112pub struct Event {
113 pub origin_node: NodeId,
114 pub seq: SequenceNumber,
115 pub hlc: Hlc,
116 pub event_type: u32,
117 pub payload: Vec<u8>,
118}
119
120#[derive(Debug, Clone)]
121pub struct Chunk {
122 pub seq: SequenceNumber,
123 pub data: Vec<u8>,
124 pub crc: u32,
125}
126
127#[cfg(test)]
128mod tests {
129 use super::*;
130
131 #[test]
132 fn test_hlc_tick_advances_physical_clock() {
133 let mut h = Hlc::new(100, 5);
134 h.tick(200);
135 assert_eq!(h.physical, 200);
136 assert_eq!(h.logical, 0);
137 }
138
139 #[test]
140 fn test_hlc_tick_same_wall_increments_logical() {
141 let mut h = Hlc::new(100, 0);
142 h.tick(100);
143 assert_eq!(h.physical, 100);
144 assert_eq!(h.logical, 1);
145 h.tick(100);
146 assert_eq!(h.logical, 2);
147 }
148
149 #[test]
150 fn test_hlc_tick_clock_rollback_does_not_regress() {
151 let mut h = Hlc::new(1000, 0);
153 h.tick(50);
154 assert_eq!(h.physical, 1000, "physical must not decrease");
155 assert_eq!(
156 h.logical, 1,
157 "logical increments when wall is behind physical"
158 );
159 }
160
161 #[test]
162 fn test_hlc_tick_sequence_is_strictly_monotonic() {
163 let mut h = Hlc::default();
164 let mut prev = h;
165 for now in [100u64, 100, 100, 200, 200, 50, 300, 300, 300, 400] {
167 h.tick(now);
168 assert!(h > prev, "HLC must be strictly monotonic at wall={now}");
169 prev = h;
170 }
171 }
172
173 #[test]
174 fn test_hlc_sync_remote_physical_ahead() {
175 let mut local = Hlc::new(100, 0);
176 let remote = Hlc::new(200, 5);
177 local.sync(100, &remote);
178 assert_eq!(local.physical, 200);
180 assert_eq!(local.logical, 6); }
182
183 #[test]
184 fn test_hlc_sync_local_physical_ahead() {
185 let mut local = Hlc::new(500, 3);
186 let remote = Hlc::new(100, 99);
187 local.sync(100, &remote);
188 assert_eq!(local.physical, 500);
190 assert_eq!(local.logical, 4); }
192
193 #[test]
194 fn test_hlc_sync_wall_clock_ahead_of_both() {
195 let mut local = Hlc::new(100, 0);
196 let remote = Hlc::new(100, 0);
197 local.sync(500, &remote);
198 assert_eq!(local.physical, 500);
200 assert_eq!(local.logical, 0); }
202
203 #[test]
204 fn test_hlc_sync_tie_uses_max_logical() {
205 let mut local = Hlc::new(100, 3);
206 let remote = Hlc::new(100, 7);
207 local.sync(100, &remote);
208 assert_eq!(local.physical, 100);
210 assert_eq!(local.logical, 8); }
212
213 #[test]
214 fn test_hlc_sync_always_strictly_ahead_of_remote() {
215 let mut h = Hlc::default();
216 let remote = Hlc::new(999, 42);
217 h.sync(1, &remote);
218 assert!(
219 h > remote,
220 "local HLC must be strictly ahead of remote after sync"
221 );
222 }
223
224 #[test]
225 fn test_hlc_total_order() {
226 let a = Hlc::new(100, 5);
227 let b = Hlc::new(100, 6);
228 let c = Hlc::new(200, 0);
229 assert!(a < b, "same physical, lower logical is smaller");
230 assert!(b < c, "higher physical wins regardless of logical");
231 assert!(a < c);
232 }
233}