Skip to main content

zamsync_core/
event.rs

1use rkyv::{Archive, Deserialize, Serialize};
2use std::fmt;
3
4pub const WAL_MAGIC: [u8; 4] = [0x5A, 0x41, 0x4D, 0x21];
5pub const WAL_VERSION: u8 = 1;
6/// WAL records with this version byte have their payload encrypted with
7/// ChaCha20-Poly1305. Format: [nonce: 12][ciphertext][tag: 16].
8pub const WAL_VERSION_ENCRYPTED: u8 = 2;
9
10#[derive(
11    Archive,
12    Deserialize,
13    Serialize,
14    Debug,
15    Clone,
16    Copy,
17    PartialEq,
18    Eq,
19    PartialOrd,
20    Ord,
21    Hash,
22    Default,
23)]
24#[archive(check_bytes)]
25pub struct Hlc {
26    pub physical: u64,
27    pub logical: u32,
28}
29
30impl Hlc {
31    pub fn new(physical: u64, logical: u32) -> Self {
32        Self { physical, logical }
33    }
34
35    pub fn tick(&mut self, now_ms: u64) {
36        if now_ms > self.physical {
37            self.physical = now_ms;
38            self.logical = 0;
39        } else {
40            self.logical += 1;
41        }
42    }
43
44    pub fn sync(&mut self, now_ms: u64, remote: &Hlc) {
45        let max_phys = now_ms.max(self.physical).max(remote.physical);
46        if max_phys == self.physical && max_phys == remote.physical {
47            self.logical = self.logical.max(remote.logical) + 1;
48        } else if max_phys == self.physical {
49            self.logical += 1;
50        } else if max_phys == remote.physical {
51            self.physical = remote.physical;
52            self.logical = remote.logical + 1;
53        } else {
54            self.physical = max_phys;
55            self.logical = 0;
56        }
57    }
58}
59
60#[derive(
61    Archive,
62    Deserialize,
63    Serialize,
64    Debug,
65    Clone,
66    Copy,
67    PartialEq,
68    Eq,
69    PartialOrd,
70    Ord,
71    Hash,
72    Default,
73)]
74#[archive(check_bytes)]
75#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
76pub struct NodeId(pub u32);
77
78#[derive(
79    Archive,
80    Deserialize,
81    Serialize,
82    Debug,
83    Clone,
84    Copy,
85    PartialEq,
86    Eq,
87    PartialOrd,
88    Ord,
89    Hash,
90    Default,
91)]
92#[archive(check_bytes)]
93#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
94pub struct SequenceNumber(pub u64);
95
96impl SequenceNumber {
97    pub const ZERO: Self = Self(0);
98
99    pub fn next(&self) -> Self {
100        Self(self.0 + 1)
101    }
102}
103
104impl fmt::Display for SequenceNumber {
105    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
106        write!(f, "{}", self.0)
107    }
108}
109
110#[derive(Archive, Deserialize, Serialize, Debug, Clone)]
111#[archive(check_bytes)]
112pub struct Event {
113    pub origin_node: NodeId,
114    pub seq: SequenceNumber,
115    pub hlc: Hlc,
116    pub event_type: u32,
117    pub payload: Vec<u8>,
118}
119
120#[derive(Debug, Clone)]
121pub struct Chunk {
122    pub seq: SequenceNumber,
123    pub data: Vec<u8>,
124    pub crc: u32,
125}
126
127#[cfg(test)]
128mod tests {
129    use super::*;
130
131    #[test]
132    fn test_hlc_tick_advances_physical_clock() {
133        let mut h = Hlc::new(100, 5);
134        h.tick(200);
135        assert_eq!(h.physical, 200);
136        assert_eq!(h.logical, 0);
137    }
138
139    #[test]
140    fn test_hlc_tick_same_wall_increments_logical() {
141        let mut h = Hlc::new(100, 0);
142        h.tick(100);
143        assert_eq!(h.physical, 100);
144        assert_eq!(h.logical, 1);
145        h.tick(100);
146        assert_eq!(h.logical, 2);
147    }
148
149    #[test]
150    fn test_hlc_tick_clock_rollback_does_not_regress() {
151        // Wall clock rolls back (e.g. NTP correction) -- HLC must not go backward.
152        let mut h = Hlc::new(1000, 0);
153        h.tick(50);
154        assert_eq!(h.physical, 1000, "physical must not decrease");
155        assert_eq!(
156            h.logical, 1,
157            "logical increments when wall is behind physical"
158        );
159    }
160
161    #[test]
162    fn test_hlc_tick_sequence_is_strictly_monotonic() {
163        let mut h = Hlc::default();
164        let mut prev = h;
165        // Interleave advancing and stale wall-clock readings.
166        for now in [100u64, 100, 100, 200, 200, 50, 300, 300, 300, 400] {
167            h.tick(now);
168            assert!(h > prev, "HLC must be strictly monotonic at wall={now}");
169            prev = h;
170        }
171    }
172
173    #[test]
174    fn test_hlc_sync_remote_physical_ahead() {
175        let mut local = Hlc::new(100, 0);
176        let remote = Hlc::new(200, 5);
177        local.sync(100, &remote);
178        // max_phys = 200 (remote wins)
179        assert_eq!(local.physical, 200);
180        assert_eq!(local.logical, 6); // remote.logical + 1
181    }
182
183    #[test]
184    fn test_hlc_sync_local_physical_ahead() {
185        let mut local = Hlc::new(500, 3);
186        let remote = Hlc::new(100, 99);
187        local.sync(100, &remote);
188        // max_phys = 500 (local wins)
189        assert_eq!(local.physical, 500);
190        assert_eq!(local.logical, 4); // self.logical + 1
191    }
192
193    #[test]
194    fn test_hlc_sync_wall_clock_ahead_of_both() {
195        let mut local = Hlc::new(100, 0);
196        let remote = Hlc::new(100, 0);
197        local.sync(500, &remote);
198        // max_phys = 500 (wall clock is ahead of both local and remote)
199        assert_eq!(local.physical, 500);
200        assert_eq!(local.logical, 0); // fresh physical, logical resets
201    }
202
203    #[test]
204    fn test_hlc_sync_tie_uses_max_logical() {
205        let mut local = Hlc::new(100, 3);
206        let remote = Hlc::new(100, 7);
207        local.sync(100, &remote);
208        // max_phys = 100 == local.physical == remote.physical
209        assert_eq!(local.physical, 100);
210        assert_eq!(local.logical, 8); // max(3, 7) + 1
211    }
212
213    #[test]
214    fn test_hlc_sync_always_strictly_ahead_of_remote() {
215        let mut h = Hlc::default();
216        let remote = Hlc::new(999, 42);
217        h.sync(1, &remote);
218        assert!(
219            h > remote,
220            "local HLC must be strictly ahead of remote after sync"
221        );
222    }
223
224    #[test]
225    fn test_hlc_total_order() {
226        let a = Hlc::new(100, 5);
227        let b = Hlc::new(100, 6);
228        let c = Hlc::new(200, 0);
229        assert!(a < b, "same physical, lower logical is smaller");
230        assert!(b < c, "higher physical wins regardless of logical");
231        assert!(a < c);
232    }
233}