nomad_protocol/sync/
tracker.rs1use super::message::SyncMessage;
7
8#[derive(Debug, Clone, Default)]
16pub struct SyncTracker {
17 current_num: u64,
19 last_sent_num: u64,
21 last_acked: u64,
23 peer_state_num: u64,
25}
26
27impl SyncTracker {
28 pub fn new() -> Self {
30 Self::default()
31 }
32
33 pub fn with_initial_version(version: u64) -> Self {
35 Self {
36 current_num: version,
37 last_sent_num: 0,
38 last_acked: 0,
39 peer_state_num: 0,
40 }
41 }
42
43 pub fn current_version(&self) -> u64 {
45 self.current_num
46 }
47
48 pub fn last_sent_version(&self) -> u64 {
50 self.last_sent_num
51 }
52
53 pub fn last_acked_version(&self) -> u64 {
55 self.last_acked
56 }
57
58 pub fn peer_version(&self) -> u64 {
60 self.peer_state_num
61 }
62
63 pub fn has_pending_updates(&self) -> bool {
65 self.current_num > self.last_sent_num
66 }
67
68 pub fn needs_ack(&self) -> bool {
70 self.peer_state_num > self.last_acked
71 }
72
73 pub fn is_synchronized(&self) -> bool {
75 self.last_acked == self.current_num && !self.needs_ack()
76 }
77
78 pub fn bump_version(&mut self) -> u64 {
80 self.current_num += 1;
81 self.current_num
82 }
83
84 pub fn record_sent(&mut self, sent_version: u64) {
88 if sent_version > self.last_sent_num {
89 self.last_sent_num = sent_version;
90 }
91 }
92
93 pub fn process_incoming(&mut self, msg: &SyncMessage) -> bool {
101 if msg.acked_state_num > self.last_acked {
103 self.last_acked = msg.acked_state_num;
104 }
105
106 let is_new_state = msg.sender_state_num > self.peer_state_num;
108 if is_new_state {
109 self.peer_state_num = msg.sender_state_num;
110 }
111
112 is_new_state && !msg.is_ack_only()
113 }
114
115 pub fn create_message(&self, diff: Vec<u8>, base_state_num: u64) -> SyncMessage {
119 SyncMessage::new(
120 self.current_num,
121 self.peer_state_num,
122 base_state_num,
123 diff,
124 )
125 }
126
127 pub fn create_ack(&self) -> SyncMessage {
129 SyncMessage::ack_only(self.current_num, self.peer_state_num)
130 }
131
132 pub fn reset(&mut self) {
134 *self = Self::default();
135 }
136
137 pub fn diff_base_version(&self) -> u64 {
141 self.last_acked
142 }
143}
144
145#[cfg(test)]
146mod tests {
147 use super::*;
148
149 #[test]
150 fn test_new_tracker() {
151 let tracker = SyncTracker::new();
152 assert_eq!(tracker.current_version(), 0);
153 assert_eq!(tracker.last_sent_version(), 0);
154 assert_eq!(tracker.last_acked_version(), 0);
155 assert_eq!(tracker.peer_version(), 0);
156 }
157
158 #[test]
159 fn test_bump_version() {
160 let mut tracker = SyncTracker::new();
161
162 assert_eq!(tracker.bump_version(), 1);
163 assert_eq!(tracker.bump_version(), 2);
164 assert_eq!(tracker.bump_version(), 3);
165 assert_eq!(tracker.current_version(), 3);
166 }
167
168 #[test]
169 fn test_has_pending_updates() {
170 let mut tracker = SyncTracker::new();
171
172 assert!(!tracker.has_pending_updates());
173
174 tracker.bump_version();
175 assert!(tracker.has_pending_updates());
176
177 tracker.record_sent(1);
178 assert!(!tracker.has_pending_updates());
179
180 tracker.bump_version();
181 assert!(tracker.has_pending_updates());
182 }
183
184 #[test]
185 fn test_process_incoming() {
186 let mut tracker = SyncTracker::new();
187 tracker.bump_version(); let msg = SyncMessage::new(5, 1, 4, vec![1, 2, 3]);
191 let has_new_state = tracker.process_incoming(&msg);
192
193 assert!(has_new_state);
194 assert_eq!(tracker.peer_version(), 5);
195 assert_eq!(tracker.last_acked_version(), 1);
196 }
197
198 #[test]
199 fn test_process_ack_only() {
200 let mut tracker = SyncTracker::new();
201 tracker.bump_version();
202 tracker.bump_version(); let msg = SyncMessage::ack_only(3, 2);
206 let has_new_state = tracker.process_incoming(&msg);
207
208 assert!(!has_new_state);
210 assert_eq!(tracker.peer_version(), 3);
211 assert_eq!(tracker.last_acked_version(), 2);
212 }
213
214 #[test]
215 fn test_needs_ack() {
216 let mut tracker = SyncTracker::new();
217
218 assert!(!tracker.needs_ack());
219
220 let msg = SyncMessage::new(5, 0, 0, vec![1, 2, 3]);
222 tracker.process_incoming(&msg);
223
224 assert!(tracker.needs_ack());
226 assert_eq!(tracker.peer_version(), 5);
227 }
228
229 #[test]
230 fn test_create_message() {
231 let mut tracker = SyncTracker::new();
232 tracker.bump_version(); let incoming = SyncMessage::new(3, 0, 0, vec![]);
236 tracker.process_incoming(&incoming);
237
238 let msg = tracker.create_message(vec![10, 20, 30], 0);
239 assert_eq!(msg.sender_state_num, 1);
240 assert_eq!(msg.acked_state_num, 3); assert_eq!(msg.diff, vec![10, 20, 30]);
242 }
243
244 #[test]
245 fn test_create_ack() {
246 let mut tracker = SyncTracker::new();
247 tracker.bump_version(); let incoming = SyncMessage::new(5, 0, 0, vec![1]);
251 tracker.process_incoming(&incoming);
252
253 let ack = tracker.create_ack();
254 assert!(ack.is_ack_only());
255 assert_eq!(ack.sender_state_num, 1);
256 assert_eq!(ack.acked_state_num, 5);
257 }
258
259 #[test]
260 fn test_is_synchronized() {
261 let mut tracker = SyncTracker::new();
262
263 assert!(tracker.is_synchronized());
265
266 tracker.bump_version();
268 assert!(!tracker.is_synchronized());
269
270 tracker.record_sent(1);
272 let ack = SyncMessage::new(1, 1, 0, vec![]);
273 tracker.process_incoming(&ack);
274
275 assert!(tracker.is_synchronized());
277 }
278
279 #[test]
280 fn test_diff_base_version() {
281 let mut tracker = SyncTracker::new();
282
283 assert_eq!(tracker.diff_base_version(), 0);
284
285 let msg = SyncMessage::new(10, 5, 0, vec![]);
287 tracker.process_incoming(&msg);
288
289 assert_eq!(tracker.diff_base_version(), 5);
293 }
294
295 #[test]
296 fn test_with_initial_version() {
297 let tracker = SyncTracker::with_initial_version(100);
298 assert_eq!(tracker.current_version(), 100);
299 assert_eq!(tracker.last_sent_version(), 0);
300 }
301
302 #[test]
303 fn test_reset() {
304 let mut tracker = SyncTracker::new();
305 tracker.bump_version();
306 tracker.bump_version();
307 tracker.record_sent(2);
308
309 let msg = SyncMessage::new(5, 2, 0, vec![1]);
310 tracker.process_incoming(&msg);
311
312 tracker.reset();
313
314 assert_eq!(tracker.current_version(), 0);
315 assert_eq!(tracker.last_sent_version(), 0);
316 assert_eq!(tracker.last_acked_version(), 0);
317 assert_eq!(tracker.peer_version(), 0);
318 }
319}