Skip to main content

zamsync_core/
event.rs

1use rkyv::{Archive, Deserialize, Serialize};
2use std::fmt;
3
4pub const WAL_MAGIC: [u8; 4] = [0x5A, 0x41, 0x4D, 0x21];
5pub const WAL_VERSION: u8 = 1;
6/// WAL records with this version byte have their payload encrypted with
7/// ChaCha20-Poly1305. Format: [nonce: 12][ciphertext][tag: 16].
8pub const WAL_VERSION_ENCRYPTED: u8 = 2;
9
10#[derive(
11    Archive,
12    Deserialize,
13    Serialize,
14    Debug,
15    Clone,
16    Copy,
17    PartialEq,
18    Eq,
19    PartialOrd,
20    Ord,
21    Hash,
22    Default,
23)]
24#[archive(check_bytes)]
25pub struct Hlc {
26    pub physical: u64,
27    pub logical: u32,
28}
29
30impl Hlc {
31    pub fn new(physical: u64, logical: u32) -> Self {
32        Self { physical, logical }
33    }
34
35    pub fn tick(&mut self, now_ms: u64) {
36        if now_ms > self.physical {
37            self.physical = now_ms;
38            self.logical = 0;
39        } else {
40            self.logical += 1;
41        }
42    }
43
44    pub fn sync(&mut self, now_ms: u64, remote: &Hlc) {
45        let max_phys = now_ms.max(self.physical).max(remote.physical);
46        if max_phys == self.physical && max_phys == remote.physical {
47            self.logical = self.logical.max(remote.logical) + 1;
48        } else if max_phys == self.physical {
49            self.logical += 1;
50        } else if max_phys == remote.physical {
51            self.physical = remote.physical;
52            self.logical = remote.logical + 1;
53        } else {
54            self.physical = max_phys;
55            self.logical = 0;
56        }
57    }
58}
59
60#[derive(
61    Archive,
62    Deserialize,
63    Serialize,
64    Debug,
65    Clone,
66    Copy,
67    PartialEq,
68    Eq,
69    PartialOrd,
70    Ord,
71    Hash,
72    Default,
73)]
74#[archive(check_bytes)]
75#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
76pub struct NodeId(pub u32);
77
78#[derive(
79    Archive,
80    Deserialize,
81    Serialize,
82    Debug,
83    Clone,
84    Copy,
85    PartialEq,
86    Eq,
87    PartialOrd,
88    Ord,
89    Hash,
90    Default,
91)]
92#[archive(check_bytes)]
93#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
94pub struct SequenceNumber(pub u64);
95
96impl SequenceNumber {
97    pub const ZERO: Self = Self(0);
98
99    pub fn next(&self) -> Self {
100        Self(self.0 + 1)
101    }
102}
103
104impl fmt::Display for SequenceNumber {
105    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
106        write!(f, "{}", self.0)
107    }
108}
109
110#[derive(Archive, Deserialize, Serialize, Debug, Clone)]
111#[archive(check_bytes)]
112pub struct Event {
113    pub origin_node: NodeId,
114    pub seq: SequenceNumber,
115    pub hlc: Hlc,
116    pub event_type: u32,
117    pub payload: Vec<u8>,
118}
119
120#[derive(Debug, Clone)]
121pub struct Chunk {
122    pub seq: SequenceNumber,
123    pub data: Vec<u8>,
124    pub crc: u32,
125}
126
127#[cfg(test)]
128mod tests {
129    use super::*;
130
131    #[test]
132    fn test_hlc_tick_advances_physical_clock() {
133        let mut h = Hlc::new(100, 5);
134        h.tick(200);
135        assert_eq!(h.physical, 200);
136        assert_eq!(h.logical, 0);
137    }
138
139    #[test]
140    fn test_hlc_tick_same_wall_increments_logical() {
141        let mut h = Hlc::new(100, 0);
142        h.tick(100);
143        assert_eq!(h.physical, 100);
144        assert_eq!(h.logical, 1);
145        h.tick(100);
146        assert_eq!(h.logical, 2);
147    }
148
149    #[test]
150    fn test_hlc_tick_clock_rollback_does_not_regress() {
151        // Wall clock rolls back (e.g. NTP correction) -- HLC must not go backward.
152        let mut h = Hlc::new(1000, 0);
153        h.tick(50);
154        assert_eq!(h.physical, 1000, "physical must not decrease");
155        assert_eq!(h.logical, 1, "logical increments when wall is behind physical");
156    }
157
158    #[test]
159    fn test_hlc_tick_sequence_is_strictly_monotonic() {
160        let mut h = Hlc::default();
161        let mut prev = h;
162        // Interleave advancing and stale wall-clock readings.
163        for now in [100u64, 100, 100, 200, 200, 50, 300, 300, 300, 400] {
164            h.tick(now);
165            assert!(h > prev, "HLC must be strictly monotonic at wall={now}");
166            prev = h;
167        }
168    }
169
170    #[test]
171    fn test_hlc_sync_remote_physical_ahead() {
172        let mut local = Hlc::new(100, 0);
173        let remote = Hlc::new(200, 5);
174        local.sync(100, &remote);
175        // max_phys = 200 (remote wins)
176        assert_eq!(local.physical, 200);
177        assert_eq!(local.logical, 6); // remote.logical + 1
178    }
179
180    #[test]
181    fn test_hlc_sync_local_physical_ahead() {
182        let mut local = Hlc::new(500, 3);
183        let remote = Hlc::new(100, 99);
184        local.sync(100, &remote);
185        // max_phys = 500 (local wins)
186        assert_eq!(local.physical, 500);
187        assert_eq!(local.logical, 4); // self.logical + 1
188    }
189
190    #[test]
191    fn test_hlc_sync_wall_clock_ahead_of_both() {
192        let mut local = Hlc::new(100, 0);
193        let remote = Hlc::new(100, 0);
194        local.sync(500, &remote);
195        // max_phys = 500 (wall clock is ahead of both local and remote)
196        assert_eq!(local.physical, 500);
197        assert_eq!(local.logical, 0); // fresh physical, logical resets
198    }
199
200    #[test]
201    fn test_hlc_sync_tie_uses_max_logical() {
202        let mut local = Hlc::new(100, 3);
203        let remote = Hlc::new(100, 7);
204        local.sync(100, &remote);
205        // max_phys = 100 == local.physical == remote.physical
206        assert_eq!(local.physical, 100);
207        assert_eq!(local.logical, 8); // max(3, 7) + 1
208    }
209
210    #[test]
211    fn test_hlc_sync_always_strictly_ahead_of_remote() {
212        let mut h = Hlc::default();
213        let remote = Hlc::new(999, 42);
214        h.sync(1, &remote);
215        assert!(h > remote, "local HLC must be strictly ahead of remote after sync");
216    }
217
218    #[test]
219    fn test_hlc_total_order() {
220        let a = Hlc::new(100, 5);
221        let b = Hlc::new(100, 6);
222        let c = Hlc::new(200, 0);
223        assert!(a < b, "same physical, lower logical is smaller");
224        assert!(b < c, "higher physical wins regardless of logical");
225        assert!(a < c);
226    }
227}