nomad_protocol/sync/
sender.rs1use std::time::{Duration, Instant};
6
7use super::message::SyncMessage;
8
9pub const DEFAULT_COLLECTION_INTERVAL: Duration = Duration::from_millis(8);
11
12pub const DEFAULT_DELAYED_ACK_TIMEOUT: Duration = Duration::from_millis(100);
14
15#[derive(Debug)]
17pub struct SyncSender {
18 min_send_interval: Duration,
20
21 collection_interval: Duration,
23
24 delayed_ack_timeout: Duration,
26
27 last_send_time: Option<Instant>,
29
30 pending_since: Option<Instant>,
32
33 ack_pending_since: Option<Instant>,
35
36 pending_message: Option<SyncMessage>,
38}
39
40impl SyncSender {
41 pub fn new() -> Self {
43 Self {
44 min_send_interval: Duration::from_millis(20), collection_interval: DEFAULT_COLLECTION_INTERVAL,
46 delayed_ack_timeout: DEFAULT_DELAYED_ACK_TIMEOUT,
47 last_send_time: None,
48 pending_since: None,
49 ack_pending_since: None,
50 pending_message: None,
51 }
52 }
53
54 pub fn with_intervals(
56 min_send_interval: Duration,
57 collection_interval: Duration,
58 delayed_ack_timeout: Duration,
59 ) -> Self {
60 Self {
61 min_send_interval,
62 collection_interval,
63 delayed_ack_timeout,
64 last_send_time: None,
65 pending_since: None,
66 ack_pending_since: None,
67 pending_message: None,
68 }
69 }
70
71 pub fn queue_message(&mut self, msg: SyncMessage) {
75 let now = Instant::now();
76
77 if msg.is_ack_only() {
78 if self.ack_pending_since.is_none() {
80 self.ack_pending_since = Some(now);
81 }
82 } else {
83 if self.pending_since.is_none() {
85 self.pending_since = Some(now);
86 }
87 self.ack_pending_since = None;
89 }
90
91 self.pending_message = Some(msg);
93 }
94
95 pub fn should_send(&self) -> bool {
97 self.should_send_at(Instant::now())
98 }
99
100 pub fn should_send_at(&self, now: Instant) -> bool {
102 let Some(msg) = self.pending_message.as_ref() else {
103 return false;
104 };
105
106 if self.last_send_time.is_some_and(|last| now.duration_since(last) < self.min_send_interval) {
108 return false;
109 }
110
111 if msg.is_ack_only() {
112 self.ack_pending_since
114 .is_some_and(|since| now.duration_since(since) >= self.delayed_ack_timeout)
115 } else {
116 self.pending_since
118 .is_none_or(|since| now.duration_since(since) >= self.collection_interval)
119 }
120 }
121
122 pub fn take_if_ready(&mut self) -> Option<SyncMessage> {
124 self.take_if_ready_at(Instant::now())
125 }
126
127 pub fn take_if_ready_at(&mut self, now: Instant) -> Option<SyncMessage> {
129 if self.should_send_at(now) {
130 self.take_message_at(now)
131 } else {
132 None
133 }
134 }
135
136 pub fn take_message(&mut self) -> Option<SyncMessage> {
138 self.take_message_at(Instant::now())
139 }
140
141 fn take_message_at(&mut self, now: Instant) -> Option<SyncMessage> {
143 if let Some(msg) = self.pending_message.take() {
144 self.last_send_time = Some(now);
145 self.pending_since = None;
146 self.ack_pending_since = None;
147 Some(msg)
148 } else {
149 None
150 }
151 }
152
153 pub fn time_until_send(&self) -> Option<Duration> {
155 self.time_until_send_at(Instant::now())
156 }
157
158 pub fn time_until_send_at(&self, now: Instant) -> Option<Duration> {
160 let msg = self.pending_message.as_ref()?;
161
162 let pacing_remaining = self.last_send_time.map_or(Duration::ZERO, |last| {
164 let elapsed = now.duration_since(last);
165 self.min_send_interval.saturating_sub(elapsed)
166 });
167
168 let batch_remaining = if msg.is_ack_only() {
170 self.ack_pending_since.map_or(Duration::ZERO, |since| {
171 let elapsed = now.duration_since(since);
172 self.delayed_ack_timeout.saturating_sub(elapsed)
173 })
174 } else {
175 self.pending_since.map_or(Duration::ZERO, |since| {
176 let elapsed = now.duration_since(since);
177 self.collection_interval.saturating_sub(elapsed)
178 })
179 };
180
181 Some(pacing_remaining.max(batch_remaining))
182 }
183
184 pub fn has_pending(&self) -> bool {
186 self.pending_message.is_some()
187 }
188
189 pub fn pending_message(&self) -> Option<&SyncMessage> {
191 self.pending_message.as_ref()
192 }
193
194 pub fn cancel_pending(&mut self) {
196 self.pending_message = None;
197 self.pending_since = None;
198 self.ack_pending_since = None;
199 }
200
201 pub fn mark_ack_needed(&mut self) {
203 if self.ack_pending_since.is_none() && self.pending_message.is_none() {
204 self.ack_pending_since = Some(Instant::now());
205 }
206 }
207
208 pub fn reset(&mut self) {
210 self.last_send_time = None;
211 self.pending_since = None;
212 self.ack_pending_since = None;
213 self.pending_message = None;
214 }
215}
216
217impl Default for SyncSender {
218 fn default() -> Self {
219 Self::new()
220 }
221}
222
223#[cfg(test)]
224mod tests {
225 use super::*;
226
227 fn create_state_msg(version: u64) -> SyncMessage {
228 SyncMessage::new(version, 0, 0, vec![1, 2, 3])
229 }
230
231 fn create_ack_msg(version: u64) -> SyncMessage {
232 SyncMessage::ack_only(version, version)
233 }
234
235 #[test]
236 fn test_new_sender() {
237 let sender = SyncSender::new();
238 assert!(!sender.has_pending());
239 assert!(!sender.should_send());
240 }
241
242 #[test]
243 fn test_queue_state_message() {
244 let mut sender = SyncSender::new();
245 let msg = create_state_msg(1);
246
247 sender.queue_message(msg.clone());
248
249 assert!(sender.has_pending());
250 assert_eq!(sender.pending_message().unwrap().sender_state_num, 1);
251 }
252
253 #[test]
254 fn test_collection_interval() {
255 let mut sender = SyncSender::with_intervals(
256 Duration::from_millis(0), Duration::from_millis(10), Duration::from_millis(100),
259 );
260
261 let start = Instant::now();
262 sender.queue_message(create_state_msg(1));
263
264 assert!(!sender.should_send_at(start));
266
267 let after_collection = start + Duration::from_millis(11);
269 assert!(sender.should_send_at(after_collection));
270 }
271
272 #[test]
273 fn test_delayed_ack() {
274 let mut sender = SyncSender::with_intervals(
275 Duration::from_millis(0),
276 Duration::from_millis(10),
277 Duration::from_millis(50), );
279
280 let start = Instant::now();
281 sender.queue_message(create_ack_msg(1));
282
283 assert!(!sender.should_send_at(start));
285
286 let after_timeout = start + Duration::from_millis(51);
288 assert!(sender.should_send_at(after_timeout));
289 }
290
291 #[test]
292 fn test_pacing() {
293 let mut sender = SyncSender::with_intervals(
294 Duration::from_millis(20), Duration::from_millis(0),
296 Duration::from_millis(0),
297 );
298
299 let start = Instant::now();
300
301 sender.queue_message(create_state_msg(1));
303 assert!(sender.should_send_at(start));
304
305 sender.take_message_at(start);
307
308 sender.queue_message(create_state_msg(2));
310
311 assert!(!sender.should_send_at(start + Duration::from_millis(10)));
313
314 assert!(sender.should_send_at(start + Duration::from_millis(21)));
316 }
317
318 #[test]
319 fn test_take_if_ready() {
320 let mut sender = SyncSender::with_intervals(
321 Duration::from_millis(0),
322 Duration::from_millis(0),
323 Duration::from_millis(0),
324 );
325
326 sender.queue_message(create_state_msg(1));
327
328 let msg = sender.take_if_ready();
329 assert!(msg.is_some());
330 assert_eq!(msg.unwrap().sender_state_num, 1);
331 assert!(!sender.has_pending());
332 }
333
334 #[test]
335 fn test_time_until_send() {
336 let mut sender = SyncSender::with_intervals(
337 Duration::from_millis(20),
338 Duration::from_millis(10),
339 Duration::from_millis(100),
340 );
341
342 let start = Instant::now();
343 sender.queue_message(create_state_msg(1));
344
345 let wait = sender.time_until_send_at(start);
347 assert!(wait.is_some());
348 assert!(wait.unwrap() <= Duration::from_millis(10));
349 }
350
351 #[test]
352 fn test_message_replacement() {
353 let mut sender = SyncSender::new();
354
355 sender.queue_message(create_state_msg(1));
356 sender.queue_message(create_state_msg(2));
357
358 assert_eq!(sender.pending_message().unwrap().sender_state_num, 2);
360 }
361
362 #[test]
363 fn test_cancel_pending() {
364 let mut sender = SyncSender::new();
365
366 sender.queue_message(create_state_msg(1));
367 assert!(sender.has_pending());
368
369 sender.cancel_pending();
370 assert!(!sender.has_pending());
371 }
372
373 #[test]
374 fn test_reset() {
375 let mut sender = SyncSender::new();
376 let start = Instant::now();
377
378 sender.queue_message(create_state_msg(1));
379 sender.take_message_at(start);
380
381 sender.queue_message(create_state_msg(2));
382
383 sender.reset();
384
385 assert!(!sender.has_pending());
386 assert!(sender.last_send_time.is_none());
387 }
388}