nomad_protocol/sync/
tracker.rs

1//! Sync state tracker
2//!
3//! Tracks local and remote state versions for synchronization.
4//! Each endpoint maintains its own tracker instance.
5
6use super::message::SyncMessage;
7
8/// Sync tracker state (each endpoint maintains this)
9///
10/// Tracks version numbers for synchronization:
11/// - `current_num`: Version of current local state
12/// - `last_sent_num`: Version of last state we sent to peer
13/// - `last_acked`: Highest version our peer acknowledged receiving
14/// - `peer_state_num`: Highest version we've received from peer
15#[derive(Debug, Clone, Default)]
16pub struct SyncTracker {
17    /// Version of current local state (monotonic)
18    current_num: u64,
19    /// Version of last sent state
20    last_sent_num: u64,
21    /// Highest version acked by peer
22    last_acked: u64,
23    /// Highest version received from peer
24    peer_state_num: u64,
25}
26
27impl SyncTracker {
28    /// Create a new sync tracker
29    pub fn new() -> Self {
30        Self::default()
31    }
32
33    /// Create a tracker with initial state
34    pub fn with_initial_version(version: u64) -> Self {
35        Self {
36            current_num: version,
37            last_sent_num: 0,
38            last_acked: 0,
39            peer_state_num: 0,
40        }
41    }
42
43    /// Get current local state version
44    pub fn current_version(&self) -> u64 {
45        self.current_num
46    }
47
48    /// Get last sent version
49    pub fn last_sent_version(&self) -> u64 {
50        self.last_sent_num
51    }
52
53    /// Get highest version acked by peer
54    pub fn last_acked_version(&self) -> u64 {
55        self.last_acked
56    }
57
58    /// Get highest version received from peer
59    pub fn peer_version(&self) -> u64 {
60        self.peer_state_num
61    }
62
63    /// Check if we have pending updates to send
64    pub fn has_pending_updates(&self) -> bool {
65        self.current_num > self.last_sent_num
66    }
67
68    /// Check if we need to send an ack
69    pub fn needs_ack(&self) -> bool {
70        self.peer_state_num > self.last_acked
71    }
72
73    /// Check if the state is in sync with peer
74    pub fn is_synchronized(&self) -> bool {
75        self.last_acked == self.current_num && !self.needs_ack()
76    }
77
78    /// Bump local state version (call when local state changes)
79    pub fn bump_version(&mut self) -> u64 {
80        self.current_num += 1;
81        self.current_num
82    }
83
84    /// Record that we sent a message
85    ///
86    /// Returns the version number that was marked as sent.
87    pub fn record_sent(&mut self, sent_version: u64) {
88        if sent_version > self.last_sent_num {
89            self.last_sent_num = sent_version;
90        }
91    }
92
93    /// Process an incoming sync message
94    ///
95    /// Updates:
96    /// - `peer_state_num` from the sender's current version
97    /// - `last_acked` from the sender's ack field
98    ///
99    /// Returns `true` if the message contained new state (not just an ack).
100    pub fn process_incoming(&mut self, msg: &SyncMessage) -> bool {
101        // Update what peer has acked about our state
102        if msg.acked_state_num > self.last_acked {
103            self.last_acked = msg.acked_state_num;
104        }
105
106        // Update peer's state version if this is newer
107        let is_new_state = msg.sender_state_num > self.peer_state_num;
108        if is_new_state {
109            self.peer_state_num = msg.sender_state_num;
110        }
111
112        is_new_state && !msg.is_ack_only()
113    }
114
115    /// Create a sync message with current state info
116    ///
117    /// The caller should fill in the diff payload.
118    pub fn create_message(&self, diff: Vec<u8>, base_state_num: u64) -> SyncMessage {
119        SyncMessage::new(
120            self.current_num,
121            self.peer_state_num,
122            base_state_num,
123            diff,
124        )
125    }
126
127    /// Create an ack-only message
128    pub fn create_ack(&self) -> SyncMessage {
129        SyncMessage::ack_only(self.current_num, self.peer_state_num)
130    }
131
132    /// Reset the tracker to initial state
133    pub fn reset(&mut self) {
134        *self = Self::default();
135    }
136
137    /// Get the base state number that should be used for diff computation
138    ///
139    /// This is the last version we know the peer has acknowledged.
140    pub fn diff_base_version(&self) -> u64 {
141        self.last_acked
142    }
143}
144
145#[cfg(test)]
146mod tests {
147    use super::*;
148
149    #[test]
150    fn test_new_tracker() {
151        let tracker = SyncTracker::new();
152        assert_eq!(tracker.current_version(), 0);
153        assert_eq!(tracker.last_sent_version(), 0);
154        assert_eq!(tracker.last_acked_version(), 0);
155        assert_eq!(tracker.peer_version(), 0);
156    }
157
158    #[test]
159    fn test_bump_version() {
160        let mut tracker = SyncTracker::new();
161
162        assert_eq!(tracker.bump_version(), 1);
163        assert_eq!(tracker.bump_version(), 2);
164        assert_eq!(tracker.bump_version(), 3);
165        assert_eq!(tracker.current_version(), 3);
166    }
167
168    #[test]
169    fn test_has_pending_updates() {
170        let mut tracker = SyncTracker::new();
171
172        assert!(!tracker.has_pending_updates());
173
174        tracker.bump_version();
175        assert!(tracker.has_pending_updates());
176
177        tracker.record_sent(1);
178        assert!(!tracker.has_pending_updates());
179
180        tracker.bump_version();
181        assert!(tracker.has_pending_updates());
182    }
183
184    #[test]
185    fn test_process_incoming() {
186        let mut tracker = SyncTracker::new();
187        tracker.bump_version(); // version = 1
188
189        // Simulate receiving a message from peer with their version 5, acking our version 1
190        let msg = SyncMessage::new(5, 1, 4, vec![1, 2, 3]);
191        let has_new_state = tracker.process_incoming(&msg);
192
193        assert!(has_new_state);
194        assert_eq!(tracker.peer_version(), 5);
195        assert_eq!(tracker.last_acked_version(), 1);
196    }
197
198    #[test]
199    fn test_process_ack_only() {
200        let mut tracker = SyncTracker::new();
201        tracker.bump_version();
202        tracker.bump_version(); // version = 2
203
204        // Peer sends ack-only with their version 3, acking our version 2
205        let msg = SyncMessage::ack_only(3, 2);
206        let has_new_state = tracker.process_incoming(&msg);
207
208        // Ack-only should not report as "new state" even though peer version updated
209        assert!(!has_new_state);
210        assert_eq!(tracker.peer_version(), 3);
211        assert_eq!(tracker.last_acked_version(), 2);
212    }
213
214    #[test]
215    fn test_needs_ack() {
216        let mut tracker = SyncTracker::new();
217
218        assert!(!tracker.needs_ack());
219
220        // Receive a message
221        let msg = SyncMessage::new(5, 0, 0, vec![1, 2, 3]);
222        tracker.process_incoming(&msg);
223
224        // We should need to ack
225        assert!(tracker.needs_ack());
226        assert_eq!(tracker.peer_version(), 5);
227    }
228
229    #[test]
230    fn test_create_message() {
231        let mut tracker = SyncTracker::new();
232        tracker.bump_version(); // version = 1
233
234        // Simulate having received peer version 3
235        let incoming = SyncMessage::new(3, 0, 0, vec![]);
236        tracker.process_incoming(&incoming);
237
238        let msg = tracker.create_message(vec![10, 20, 30], 0);
239        assert_eq!(msg.sender_state_num, 1);
240        assert_eq!(msg.acked_state_num, 3); // Acking peer's version
241        assert_eq!(msg.diff, vec![10, 20, 30]);
242    }
243
244    #[test]
245    fn test_create_ack() {
246        let mut tracker = SyncTracker::new();
247        tracker.bump_version(); // version = 1
248
249        // Simulate having received peer version 5
250        let incoming = SyncMessage::new(5, 0, 0, vec![1]);
251        tracker.process_incoming(&incoming);
252
253        let ack = tracker.create_ack();
254        assert!(ack.is_ack_only());
255        assert_eq!(ack.sender_state_num, 1);
256        assert_eq!(ack.acked_state_num, 5);
257    }
258
259    #[test]
260    fn test_is_synchronized() {
261        let mut tracker = SyncTracker::new();
262
263        // Initially synchronized (both at 0)
264        assert!(tracker.is_synchronized());
265
266        // Bump version - not synced
267        tracker.bump_version();
268        assert!(!tracker.is_synchronized());
269
270        // Simulate full sync cycle
271        tracker.record_sent(1);
272        let ack = SyncMessage::new(1, 1, 0, vec![]);
273        tracker.process_incoming(&ack);
274
275        // Now synchronized
276        assert!(tracker.is_synchronized());
277    }
278
279    #[test]
280    fn test_diff_base_version() {
281        let mut tracker = SyncTracker::new();
282
283        assert_eq!(tracker.diff_base_version(), 0);
284
285        // Peer acks version 5
286        let msg = SyncMessage::new(10, 5, 0, vec![]);
287        tracker.process_incoming(&msg);
288
289        // Now we should compute diffs from version 5
290        // (But that's our version, peer acked 5, so we might need to adjust logic)
291        // Actually, diff_base is about what peer knows about our state
292        assert_eq!(tracker.diff_base_version(), 5);
293    }
294
295    #[test]
296    fn test_with_initial_version() {
297        let tracker = SyncTracker::with_initial_version(100);
298        assert_eq!(tracker.current_version(), 100);
299        assert_eq!(tracker.last_sent_version(), 0);
300    }
301
302    #[test]
303    fn test_reset() {
304        let mut tracker = SyncTracker::new();
305        tracker.bump_version();
306        tracker.bump_version();
307        tracker.record_sent(2);
308
309        let msg = SyncMessage::new(5, 2, 0, vec![1]);
310        tracker.process_incoming(&msg);
311
312        tracker.reset();
313
314        assert_eq!(tracker.current_version(), 0);
315        assert_eq!(tracker.last_sent_version(), 0);
316        assert_eq!(tracker.last_acked_version(), 0);
317        assert_eq!(tracker.peer_version(), 0);
318    }
319}