nomad_protocol/sync/
receiver.rs

1//! Receiver-side sync logic
2//!
3//! Handles incoming sync messages and manages duplicate detection.
4
5use super::message::{MessageError, SyncMessage};
6
7/// Result of receiving a sync message
8#[derive(Debug, Clone, PartialEq, Eq)]
9pub enum ReceiveResult {
10    /// New state update received
11    NewState {
12        /// The sender's version
13        sender_version: u64,
14        /// Version acknowledged by sender
15        acked_version: u64,
16        /// Base version for the diff
17        base_version: u64,
18    },
19    /// Ack-only message (no state change)
20    AckOnly {
21        /// The sender's version
22        sender_version: u64,
23        /// Version acknowledged by sender
24        acked_version: u64,
25    },
26    /// Duplicate message (already seen this version)
27    Duplicate {
28        /// The version that was duplicated
29        version: u64,
30    },
31    /// Old message (version lower than already received)
32    Stale {
33        /// The stale version received
34        received: u64,
35        /// Our current peer version
36        current: u64,
37    },
38}
39
40/// Receiver for incoming sync messages
41///
42/// Tracks received versions and detects duplicates/stale messages.
43#[derive(Debug, Clone)]
44pub struct SyncReceiver {
45    /// Highest version received from peer
46    highest_received: u64,
47
48    /// Version we've acknowledged to peer
49    last_acked_to_peer: u64,
50}
51
52impl SyncReceiver {
53    /// Create a new receiver
54    pub fn new() -> Self {
55        Self {
56            highest_received: 0,
57            last_acked_to_peer: 0,
58        }
59    }
60
61    /// Get highest version received from peer
62    pub fn highest_received(&self) -> u64 {
63        self.highest_received
64    }
65
66    /// Get last version we acknowledged to peer
67    pub fn last_acked_to_peer(&self) -> u64 {
68        self.last_acked_to_peer
69    }
70
71    /// Check if we need to send an ack
72    pub fn needs_ack(&self) -> bool {
73        self.highest_received > self.last_acked_to_peer
74    }
75
76    /// Mark that we've sent an ack for the given version
77    pub fn mark_acked(&mut self, version: u64) {
78        if version > self.last_acked_to_peer {
79            self.last_acked_to_peer = version;
80        }
81    }
82
83    /// Process a raw message from wire format
84    pub fn receive_raw(&mut self, data: &[u8]) -> Result<(ReceiveResult, SyncMessage), MessageError> {
85        let msg = SyncMessage::decode(data)?;
86        let result = self.receive(&msg);
87        Ok((result, msg))
88    }
89
90    /// Process an already-decoded message
91    pub fn receive(&mut self, msg: &SyncMessage) -> ReceiveResult {
92        let sender_version = msg.sender_state_num;
93
94        // Check for stale/duplicate
95        if sender_version < self.highest_received {
96            return ReceiveResult::Stale {
97                received: sender_version,
98                current: self.highest_received,
99            };
100        }
101
102        if sender_version == self.highest_received && sender_version > 0 {
103            return ReceiveResult::Duplicate {
104                version: sender_version,
105            };
106        }
107
108        // New message
109        self.highest_received = sender_version;
110
111        if msg.is_ack_only() {
112            ReceiveResult::AckOnly {
113                sender_version,
114                acked_version: msg.acked_state_num,
115            }
116        } else {
117            ReceiveResult::NewState {
118                sender_version,
119                acked_version: msg.acked_state_num,
120                base_version: msg.base_state_num,
121            }
122        }
123    }
124
125    /// Reset receiver state
126    pub fn reset(&mut self) {
127        self.highest_received = 0;
128        self.last_acked_to_peer = 0;
129    }
130}
131
132impl Default for SyncReceiver {
133    fn default() -> Self {
134        Self::new()
135    }
136}
137
138/// Receiver with history for out-of-order message handling
139///
140/// Maintains a sliding window of received versions for detecting
141/// duplicates even when messages arrive out of order.
142#[derive(Debug, Clone)]
143pub struct OrderedReceiver {
144    /// Base receiver
145    inner: SyncReceiver,
146
147    /// Bitmap of received versions (sliding window)
148    /// The highest bit (bit 63) always represents highest_received.
149    /// Bit i represents version: highest_received - (63 - i)
150    /// So bit 63 = highest_received, bit 62 = highest_received - 1, etc.
151    received_bitmap: u64,
152}
153
154/// Size of the duplicate detection window
155const WINDOW_SIZE: u64 = 64;
156
157impl OrderedReceiver {
158    /// Create a new ordered receiver
159    pub fn new() -> Self {
160        Self {
161            inner: SyncReceiver::new(),
162            received_bitmap: 0,
163        }
164    }
165
166    /// Get highest version received from peer
167    pub fn highest_received(&self) -> u64 {
168        self.inner.highest_received
169    }
170
171    /// Check if we need to send an ack
172    pub fn needs_ack(&self) -> bool {
173        self.inner.needs_ack()
174    }
175
176    /// Mark that we've sent an ack
177    pub fn mark_acked(&mut self, version: u64) {
178        self.inner.mark_acked(version);
179    }
180
181    /// Convert version to bit index in the bitmap
182    /// Returns None if version is outside the window
183    fn version_to_bit_index(&self, version: u64) -> Option<usize> {
184        if version == 0 || version > self.inner.highest_received {
185            return None;
186        }
187
188        let offset = self.inner.highest_received - version;
189        if offset >= WINDOW_SIZE {
190            return None; // Too old
191        }
192
193        // bit 63 = highest_received (offset 0)
194        // bit 62 = highest_received - 1 (offset 1)
195        // ...
196        Some((63 - offset) as usize)
197    }
198
199    /// Check if a version has been received
200    pub fn has_received(&self, version: u64) -> bool {
201        if version > self.inner.highest_received {
202            return false;
203        }
204
205        if version == 0 {
206            return true; // Version 0 is initial state
207        }
208
209        // If too old for window, assume received
210        let offset = self.inner.highest_received - version;
211        if offset >= WINDOW_SIZE {
212            return true;
213        }
214
215        match self.version_to_bit_index(version) {
216            Some(bit_index) => (self.received_bitmap & (1u64 << bit_index)) != 0,
217            None => true, // Out of window, assume received
218        }
219    }
220
221    /// Process an already-decoded message
222    pub fn receive(&mut self, msg: &SyncMessage) -> ReceiveResult {
223        let sender_version = msg.sender_state_num;
224
225        // Check if we've already received this version
226        if self.has_received(sender_version) && sender_version > 0 {
227            return ReceiveResult::Duplicate {
228                version: sender_version,
229            };
230        }
231
232        // Update bitmap
233        if sender_version > self.inner.highest_received {
234            // Shift bitmap for new highest
235            let shift = sender_version - self.inner.highest_received;
236            if shift >= WINDOW_SIZE {
237                // Complete reset, only new version is set
238                self.received_bitmap = 1u64 << 63;
239            } else {
240                // Shift existing bits down and set new highest
241                self.received_bitmap >>= shift;
242                self.received_bitmap |= 1u64 << 63;
243            }
244        } else if sender_version > 0 {
245            // Mark in existing bitmap (out of order arrival)
246            if let Some(bit_index) = self.version_to_bit_index(sender_version) {
247                self.received_bitmap |= 1u64 << bit_index;
248            }
249        }
250
251        // Update inner receiver (updates highest_received)
252        self.inner.receive(msg)
253    }
254
255    /// Reset receiver state
256    pub fn reset(&mut self) {
257        self.inner.reset();
258        self.received_bitmap = 0;
259    }
260}
261
262impl Default for OrderedReceiver {
263    fn default() -> Self {
264        Self::new()
265    }
266}
267
268#[cfg(test)]
269mod tests {
270    use super::*;
271
272    fn create_state_msg(version: u64) -> SyncMessage {
273        SyncMessage::new(version, 0, 0, vec![1, 2, 3])
274    }
275
276    fn create_ack_msg(sender_version: u64, acked_version: u64) -> SyncMessage {
277        SyncMessage::ack_only(sender_version, acked_version)
278    }
279
280    mod sync_receiver {
281        use super::*;
282
283        #[test]
284        fn test_new_receiver() {
285            let receiver = SyncReceiver::new();
286            assert_eq!(receiver.highest_received(), 0);
287            assert!(!receiver.needs_ack());
288        }
289
290        #[test]
291        fn test_receive_new_state() {
292            let mut receiver = SyncReceiver::new();
293
294            let result = receiver.receive(&create_state_msg(1));
295
296            assert!(matches!(result, ReceiveResult::NewState { sender_version: 1, .. }));
297            assert_eq!(receiver.highest_received(), 1);
298            assert!(receiver.needs_ack());
299        }
300
301        #[test]
302        fn test_receive_ack_only() {
303            let mut receiver = SyncReceiver::new();
304
305            let result = receiver.receive(&create_ack_msg(1, 5));
306
307            assert!(matches!(
308                result,
309                ReceiveResult::AckOnly { sender_version: 1, acked_version: 5 }
310            ));
311        }
312
313        #[test]
314        fn test_duplicate_detection() {
315            let mut receiver = SyncReceiver::new();
316
317            receiver.receive(&create_state_msg(5));
318            let result = receiver.receive(&create_state_msg(5));
319
320            assert!(matches!(result, ReceiveResult::Duplicate { version: 5 }));
321        }
322
323        #[test]
324        fn test_stale_detection() {
325            let mut receiver = SyncReceiver::new();
326
327            receiver.receive(&create_state_msg(10));
328            let result = receiver.receive(&create_state_msg(5));
329
330            assert!(matches!(
331                result,
332                ReceiveResult::Stale { received: 5, current: 10 }
333            ));
334        }
335
336        #[test]
337        fn test_needs_ack() {
338            let mut receiver = SyncReceiver::new();
339
340            assert!(!receiver.needs_ack());
341
342            receiver.receive(&create_state_msg(1));
343            assert!(receiver.needs_ack());
344
345            receiver.mark_acked(1);
346            assert!(!receiver.needs_ack());
347
348            receiver.receive(&create_state_msg(2));
349            assert!(receiver.needs_ack());
350        }
351
352        #[test]
353        fn test_reset() {
354            let mut receiver = SyncReceiver::new();
355            receiver.receive(&create_state_msg(5));
356            receiver.mark_acked(5);
357
358            receiver.reset();
359
360            assert_eq!(receiver.highest_received(), 0);
361            assert_eq!(receiver.last_acked_to_peer(), 0);
362        }
363    }
364
365    mod ordered_receiver {
366        use super::*;
367
368        #[test]
369        fn test_out_of_order_duplicate() {
370            let mut receiver = OrderedReceiver::new();
371
372            // Receive 1, 2, 3
373            receiver.receive(&create_state_msg(1));
374            receiver.receive(&create_state_msg(2));
375            receiver.receive(&create_state_msg(3));
376
377            // Receive 2 again (out of order duplicate)
378            let result = receiver.receive(&create_state_msg(2));
379            assert!(matches!(result, ReceiveResult::Duplicate { version: 2 }));
380        }
381
382        #[test]
383        fn test_has_received() {
384            let mut receiver = OrderedReceiver::new();
385
386            receiver.receive(&create_state_msg(5));
387            receiver.receive(&create_state_msg(10));
388            receiver.receive(&create_state_msg(7)); // Out of order
389
390            assert!(receiver.has_received(5));
391            assert!(receiver.has_received(7));
392            assert!(receiver.has_received(10));
393            assert!(!receiver.has_received(6));
394            assert!(!receiver.has_received(8));
395        }
396
397        #[test]
398        fn test_window_sliding() {
399            let mut receiver = OrderedReceiver::new();
400
401            // Receive version 1
402            receiver.receive(&create_state_msg(1));
403            assert!(receiver.has_received(1));
404
405            // Receive version 100 (slides window past 1)
406            receiver.receive(&create_state_msg(100));
407
408            // Version 1 should still be considered received (too old, assume true)
409            assert!(receiver.has_received(1));
410        }
411
412        #[test]
413        fn test_reset() {
414            let mut receiver = OrderedReceiver::new();
415            receiver.receive(&create_state_msg(5));
416
417            receiver.reset();
418
419            assert_eq!(receiver.highest_received(), 0);
420            assert!(!receiver.has_received(5));
421        }
422    }
423}