nomad_protocol/sync/
receiver.rs1use super::message::{MessageError, SyncMessage};
6
7#[derive(Debug, Clone, PartialEq, Eq)]
9pub enum ReceiveResult {
10 NewState {
12 sender_version: u64,
14 acked_version: u64,
16 base_version: u64,
18 },
19 AckOnly {
21 sender_version: u64,
23 acked_version: u64,
25 },
26 Duplicate {
28 version: u64,
30 },
31 Stale {
33 received: u64,
35 current: u64,
37 },
38}
39
40#[derive(Debug, Clone)]
44pub struct SyncReceiver {
45 highest_received: u64,
47
48 last_acked_to_peer: u64,
50}
51
52impl SyncReceiver {
53 pub fn new() -> Self {
55 Self {
56 highest_received: 0,
57 last_acked_to_peer: 0,
58 }
59 }
60
61 pub fn highest_received(&self) -> u64 {
63 self.highest_received
64 }
65
66 pub fn last_acked_to_peer(&self) -> u64 {
68 self.last_acked_to_peer
69 }
70
71 pub fn needs_ack(&self) -> bool {
73 self.highest_received > self.last_acked_to_peer
74 }
75
76 pub fn mark_acked(&mut self, version: u64) {
78 if version > self.last_acked_to_peer {
79 self.last_acked_to_peer = version;
80 }
81 }
82
83 pub fn receive_raw(&mut self, data: &[u8]) -> Result<(ReceiveResult, SyncMessage), MessageError> {
85 let msg = SyncMessage::decode(data)?;
86 let result = self.receive(&msg);
87 Ok((result, msg))
88 }
89
90 pub fn receive(&mut self, msg: &SyncMessage) -> ReceiveResult {
92 let sender_version = msg.sender_state_num;
93
94 if sender_version < self.highest_received {
96 return ReceiveResult::Stale {
97 received: sender_version,
98 current: self.highest_received,
99 };
100 }
101
102 if sender_version == self.highest_received && sender_version > 0 {
103 return ReceiveResult::Duplicate {
104 version: sender_version,
105 };
106 }
107
108 self.highest_received = sender_version;
110
111 if msg.is_ack_only() {
112 ReceiveResult::AckOnly {
113 sender_version,
114 acked_version: msg.acked_state_num,
115 }
116 } else {
117 ReceiveResult::NewState {
118 sender_version,
119 acked_version: msg.acked_state_num,
120 base_version: msg.base_state_num,
121 }
122 }
123 }
124
125 pub fn reset(&mut self) {
127 self.highest_received = 0;
128 self.last_acked_to_peer = 0;
129 }
130}
131
132impl Default for SyncReceiver {
133 fn default() -> Self {
134 Self::new()
135 }
136}
137
138#[derive(Debug, Clone)]
143pub struct OrderedReceiver {
144 inner: SyncReceiver,
146
147 received_bitmap: u64,
152}
153
154const WINDOW_SIZE: u64 = 64;
156
157impl OrderedReceiver {
158 pub fn new() -> Self {
160 Self {
161 inner: SyncReceiver::new(),
162 received_bitmap: 0,
163 }
164 }
165
166 pub fn highest_received(&self) -> u64 {
168 self.inner.highest_received
169 }
170
171 pub fn needs_ack(&self) -> bool {
173 self.inner.needs_ack()
174 }
175
176 pub fn mark_acked(&mut self, version: u64) {
178 self.inner.mark_acked(version);
179 }
180
181 fn version_to_bit_index(&self, version: u64) -> Option<usize> {
184 if version == 0 || version > self.inner.highest_received {
185 return None;
186 }
187
188 let offset = self.inner.highest_received - version;
189 if offset >= WINDOW_SIZE {
190 return None; }
192
193 Some((63 - offset) as usize)
197 }
198
199 pub fn has_received(&self, version: u64) -> bool {
201 if version > self.inner.highest_received {
202 return false;
203 }
204
205 if version == 0 {
206 return true; }
208
209 let offset = self.inner.highest_received - version;
211 if offset >= WINDOW_SIZE {
212 return true;
213 }
214
215 match self.version_to_bit_index(version) {
216 Some(bit_index) => (self.received_bitmap & (1u64 << bit_index)) != 0,
217 None => true, }
219 }
220
221 pub fn receive(&mut self, msg: &SyncMessage) -> ReceiveResult {
223 let sender_version = msg.sender_state_num;
224
225 if self.has_received(sender_version) && sender_version > 0 {
227 return ReceiveResult::Duplicate {
228 version: sender_version,
229 };
230 }
231
232 if sender_version > self.inner.highest_received {
234 let shift = sender_version - self.inner.highest_received;
236 if shift >= WINDOW_SIZE {
237 self.received_bitmap = 1u64 << 63;
239 } else {
240 self.received_bitmap >>= shift;
242 self.received_bitmap |= 1u64 << 63;
243 }
244 } else if sender_version > 0 {
245 if let Some(bit_index) = self.version_to_bit_index(sender_version) {
247 self.received_bitmap |= 1u64 << bit_index;
248 }
249 }
250
251 self.inner.receive(msg)
253 }
254
255 pub fn reset(&mut self) {
257 self.inner.reset();
258 self.received_bitmap = 0;
259 }
260}
261
262impl Default for OrderedReceiver {
263 fn default() -> Self {
264 Self::new()
265 }
266}
267
268#[cfg(test)]
269mod tests {
270 use super::*;
271
272 fn create_state_msg(version: u64) -> SyncMessage {
273 SyncMessage::new(version, 0, 0, vec![1, 2, 3])
274 }
275
276 fn create_ack_msg(sender_version: u64, acked_version: u64) -> SyncMessage {
277 SyncMessage::ack_only(sender_version, acked_version)
278 }
279
280 mod sync_receiver {
281 use super::*;
282
283 #[test]
284 fn test_new_receiver() {
285 let receiver = SyncReceiver::new();
286 assert_eq!(receiver.highest_received(), 0);
287 assert!(!receiver.needs_ack());
288 }
289
290 #[test]
291 fn test_receive_new_state() {
292 let mut receiver = SyncReceiver::new();
293
294 let result = receiver.receive(&create_state_msg(1));
295
296 assert!(matches!(result, ReceiveResult::NewState { sender_version: 1, .. }));
297 assert_eq!(receiver.highest_received(), 1);
298 assert!(receiver.needs_ack());
299 }
300
301 #[test]
302 fn test_receive_ack_only() {
303 let mut receiver = SyncReceiver::new();
304
305 let result = receiver.receive(&create_ack_msg(1, 5));
306
307 assert!(matches!(
308 result,
309 ReceiveResult::AckOnly { sender_version: 1, acked_version: 5 }
310 ));
311 }
312
313 #[test]
314 fn test_duplicate_detection() {
315 let mut receiver = SyncReceiver::new();
316
317 receiver.receive(&create_state_msg(5));
318 let result = receiver.receive(&create_state_msg(5));
319
320 assert!(matches!(result, ReceiveResult::Duplicate { version: 5 }));
321 }
322
323 #[test]
324 fn test_stale_detection() {
325 let mut receiver = SyncReceiver::new();
326
327 receiver.receive(&create_state_msg(10));
328 let result = receiver.receive(&create_state_msg(5));
329
330 assert!(matches!(
331 result,
332 ReceiveResult::Stale { received: 5, current: 10 }
333 ));
334 }
335
336 #[test]
337 fn test_needs_ack() {
338 let mut receiver = SyncReceiver::new();
339
340 assert!(!receiver.needs_ack());
341
342 receiver.receive(&create_state_msg(1));
343 assert!(receiver.needs_ack());
344
345 receiver.mark_acked(1);
346 assert!(!receiver.needs_ack());
347
348 receiver.receive(&create_state_msg(2));
349 assert!(receiver.needs_ack());
350 }
351
352 #[test]
353 fn test_reset() {
354 let mut receiver = SyncReceiver::new();
355 receiver.receive(&create_state_msg(5));
356 receiver.mark_acked(5);
357
358 receiver.reset();
359
360 assert_eq!(receiver.highest_received(), 0);
361 assert_eq!(receiver.last_acked_to_peer(), 0);
362 }
363 }
364
365 mod ordered_receiver {
366 use super::*;
367
368 #[test]
369 fn test_out_of_order_duplicate() {
370 let mut receiver = OrderedReceiver::new();
371
372 receiver.receive(&create_state_msg(1));
374 receiver.receive(&create_state_msg(2));
375 receiver.receive(&create_state_msg(3));
376
377 let result = receiver.receive(&create_state_msg(2));
379 assert!(matches!(result, ReceiveResult::Duplicate { version: 2 }));
380 }
381
382 #[test]
383 fn test_has_received() {
384 let mut receiver = OrderedReceiver::new();
385
386 receiver.receive(&create_state_msg(5));
387 receiver.receive(&create_state_msg(10));
388 receiver.receive(&create_state_msg(7)); assert!(receiver.has_received(5));
391 assert!(receiver.has_received(7));
392 assert!(receiver.has_received(10));
393 assert!(!receiver.has_received(6));
394 assert!(!receiver.has_received(8));
395 }
396
397 #[test]
398 fn test_window_sliding() {
399 let mut receiver = OrderedReceiver::new();
400
401 receiver.receive(&create_state_msg(1));
403 assert!(receiver.has_received(1));
404
405 receiver.receive(&create_state_msg(100));
407
408 assert!(receiver.has_received(1));
410 }
411
412 #[test]
413 fn test_reset() {
414 let mut receiver = OrderedReceiver::new();
415 receiver.receive(&create_state_msg(5));
416
417 receiver.reset();
418
419 assert_eq!(receiver.highest_received(), 0);
420 assert!(!receiver.has_received(5));
421 }
422 }
423}