nomad_protocol/sync/
sender.rs

1//! Sender-side sync logic
2//!
3//! Manages outbound sync messages with pacing and batching.
4
5use std::time::{Duration, Instant};
6
7use super::message::SyncMessage;
8
9/// Default collection interval for batching rapid state changes
10pub const DEFAULT_COLLECTION_INTERVAL: Duration = Duration::from_millis(8);
11
12/// Default delayed ack timeout
13pub const DEFAULT_DELAYED_ACK_TIMEOUT: Duration = Duration::from_millis(100);
14
15/// Sender state for managing outbound sync messages
16#[derive(Debug)]
17pub struct SyncSender {
18    /// Minimum interval between sends (pacing)
19    min_send_interval: Duration,
20
21    /// Collection interval for batching rapid changes
22    collection_interval: Duration,
23
24    /// Delayed ack timeout
25    delayed_ack_timeout: Duration,
26
27    /// Last time we sent a message
28    last_send_time: Option<Instant>,
29
30    /// Time when pending state change was first detected
31    pending_since: Option<Instant>,
32
33    /// Time when ack became pending
34    ack_pending_since: Option<Instant>,
35
36    /// Pending message to send
37    pending_message: Option<SyncMessage>,
38}
39
40impl SyncSender {
41    /// Create a new sender with default settings
42    pub fn new() -> Self {
43        Self {
44            min_send_interval: Duration::from_millis(20), // 50 Hz max
45            collection_interval: DEFAULT_COLLECTION_INTERVAL,
46            delayed_ack_timeout: DEFAULT_DELAYED_ACK_TIMEOUT,
47            last_send_time: None,
48            pending_since: None,
49            ack_pending_since: None,
50            pending_message: None,
51        }
52    }
53
54    /// Create a sender with custom intervals
55    pub fn with_intervals(
56        min_send_interval: Duration,
57        collection_interval: Duration,
58        delayed_ack_timeout: Duration,
59    ) -> Self {
60        Self {
61            min_send_interval,
62            collection_interval,
63            delayed_ack_timeout,
64            last_send_time: None,
65            pending_since: None,
66            ack_pending_since: None,
67            pending_message: None,
68        }
69    }
70
71    /// Queue a message for sending
72    ///
73    /// The message will be held until the pacing interval allows sending.
74    pub fn queue_message(&mut self, msg: SyncMessage) {
75        let now = Instant::now();
76
77        if msg.is_ack_only() {
78            // Track ack pending time
79            if self.ack_pending_since.is_none() {
80                self.ack_pending_since = Some(now);
81            }
82        } else {
83            // Track state change pending time
84            if self.pending_since.is_none() {
85                self.pending_since = Some(now);
86            }
87            // Clear ack pending since we're sending state
88            self.ack_pending_since = None;
89        }
90
91        // Replace any pending message with newer one
92        self.pending_message = Some(msg);
93    }
94
95    /// Check if we should send now
96    pub fn should_send(&self) -> bool {
97        self.should_send_at(Instant::now())
98    }
99
100    /// Check if we should send at a given time
101    pub fn should_send_at(&self, now: Instant) -> bool {
102        let Some(msg) = self.pending_message.as_ref() else {
103            return false;
104        };
105
106        // Check pacing interval
107        if self.last_send_time.is_some_and(|last| now.duration_since(last) < self.min_send_interval) {
108            return false;
109        }
110
111        if msg.is_ack_only() {
112            // Ack-only: wait for delayed ack timeout
113            self.ack_pending_since
114                .is_some_and(|since| now.duration_since(since) >= self.delayed_ack_timeout)
115        } else {
116            // State update: wait for collection interval
117            self.pending_since
118                .is_none_or(|since| now.duration_since(since) >= self.collection_interval)
119        }
120    }
121
122    /// Take the pending message if we should send now
123    pub fn take_if_ready(&mut self) -> Option<SyncMessage> {
124        self.take_if_ready_at(Instant::now())
125    }
126
127    /// Take the pending message if ready at a given time
128    pub fn take_if_ready_at(&mut self, now: Instant) -> Option<SyncMessage> {
129        if self.should_send_at(now) {
130            self.take_message_at(now)
131        } else {
132            None
133        }
134    }
135
136    /// Force-take the pending message (bypass timing checks)
137    pub fn take_message(&mut self) -> Option<SyncMessage> {
138        self.take_message_at(Instant::now())
139    }
140
141    /// Force-take the pending message at a given time
142    fn take_message_at(&mut self, now: Instant) -> Option<SyncMessage> {
143        if let Some(msg) = self.pending_message.take() {
144            self.last_send_time = Some(now);
145            self.pending_since = None;
146            self.ack_pending_since = None;
147            Some(msg)
148        } else {
149            None
150        }
151    }
152
153    /// Get time until next allowed send
154    pub fn time_until_send(&self) -> Option<Duration> {
155        self.time_until_send_at(Instant::now())
156    }
157
158    /// Get time until next allowed send at a given time
159    pub fn time_until_send_at(&self, now: Instant) -> Option<Duration> {
160        let msg = self.pending_message.as_ref()?;
161
162        // Time until pacing allows
163        let pacing_remaining = self.last_send_time.map_or(Duration::ZERO, |last| {
164            let elapsed = now.duration_since(last);
165            self.min_send_interval.saturating_sub(elapsed)
166        });
167
168        // Time until collection/ack timeout
169        let batch_remaining = if msg.is_ack_only() {
170            self.ack_pending_since.map_or(Duration::ZERO, |since| {
171                let elapsed = now.duration_since(since);
172                self.delayed_ack_timeout.saturating_sub(elapsed)
173            })
174        } else {
175            self.pending_since.map_or(Duration::ZERO, |since| {
176                let elapsed = now.duration_since(since);
177                self.collection_interval.saturating_sub(elapsed)
178            })
179        };
180
181        Some(pacing_remaining.max(batch_remaining))
182    }
183
184    /// Check if there's a pending message
185    pub fn has_pending(&self) -> bool {
186        self.pending_message.is_some()
187    }
188
189    /// Get reference to pending message
190    pub fn pending_message(&self) -> Option<&SyncMessage> {
191        self.pending_message.as_ref()
192    }
193
194    /// Cancel pending message
195    pub fn cancel_pending(&mut self) {
196        self.pending_message = None;
197        self.pending_since = None;
198        self.ack_pending_since = None;
199    }
200
201    /// Mark that an ack is needed (triggers delayed ack timer)
202    pub fn mark_ack_needed(&mut self) {
203        if self.ack_pending_since.is_none() && self.pending_message.is_none() {
204            self.ack_pending_since = Some(Instant::now());
205        }
206    }
207
208    /// Reset sender state
209    pub fn reset(&mut self) {
210        self.last_send_time = None;
211        self.pending_since = None;
212        self.ack_pending_since = None;
213        self.pending_message = None;
214    }
215}
216
217impl Default for SyncSender {
218    fn default() -> Self {
219        Self::new()
220    }
221}
222
223#[cfg(test)]
224mod tests {
225    use super::*;
226
227    fn create_state_msg(version: u64) -> SyncMessage {
228        SyncMessage::new(version, 0, 0, vec![1, 2, 3])
229    }
230
231    fn create_ack_msg(version: u64) -> SyncMessage {
232        SyncMessage::ack_only(version, version)
233    }
234
235    #[test]
236    fn test_new_sender() {
237        let sender = SyncSender::new();
238        assert!(!sender.has_pending());
239        assert!(!sender.should_send());
240    }
241
242    #[test]
243    fn test_queue_state_message() {
244        let mut sender = SyncSender::new();
245        let msg = create_state_msg(1);
246
247        sender.queue_message(msg.clone());
248
249        assert!(sender.has_pending());
250        assert_eq!(sender.pending_message().unwrap().sender_state_num, 1);
251    }
252
253    #[test]
254    fn test_collection_interval() {
255        let mut sender = SyncSender::with_intervals(
256            Duration::from_millis(0), // No pacing
257            Duration::from_millis(10), // 10ms collection
258            Duration::from_millis(100),
259        );
260
261        let start = Instant::now();
262        sender.queue_message(create_state_msg(1));
263
264        // Shouldn't send immediately
265        assert!(!sender.should_send_at(start));
266
267        // Should send after collection interval
268        let after_collection = start + Duration::from_millis(11);
269        assert!(sender.should_send_at(after_collection));
270    }
271
272    #[test]
273    fn test_delayed_ack() {
274        let mut sender = SyncSender::with_intervals(
275            Duration::from_millis(0),
276            Duration::from_millis(10),
277            Duration::from_millis(50), // 50ms delayed ack
278        );
279
280        let start = Instant::now();
281        sender.queue_message(create_ack_msg(1));
282
283        // Shouldn't send immediately
284        assert!(!sender.should_send_at(start));
285
286        // Should send after delayed ack timeout
287        let after_timeout = start + Duration::from_millis(51);
288        assert!(sender.should_send_at(after_timeout));
289    }
290
291    #[test]
292    fn test_pacing() {
293        let mut sender = SyncSender::with_intervals(
294            Duration::from_millis(20), // 20ms pacing
295            Duration::from_millis(0),
296            Duration::from_millis(0),
297        );
298
299        let start = Instant::now();
300
301        // First message should send immediately (no last_send_time)
302        sender.queue_message(create_state_msg(1));
303        assert!(sender.should_send_at(start));
304
305        // Take it
306        sender.take_message_at(start);
307
308        // Queue another
309        sender.queue_message(create_state_msg(2));
310
311        // Shouldn't send yet (pacing)
312        assert!(!sender.should_send_at(start + Duration::from_millis(10)));
313
314        // Should send after pacing interval
315        assert!(sender.should_send_at(start + Duration::from_millis(21)));
316    }
317
318    #[test]
319    fn test_take_if_ready() {
320        let mut sender = SyncSender::with_intervals(
321            Duration::from_millis(0),
322            Duration::from_millis(0),
323            Duration::from_millis(0),
324        );
325
326        sender.queue_message(create_state_msg(1));
327
328        let msg = sender.take_if_ready();
329        assert!(msg.is_some());
330        assert_eq!(msg.unwrap().sender_state_num, 1);
331        assert!(!sender.has_pending());
332    }
333
334    #[test]
335    fn test_time_until_send() {
336        let mut sender = SyncSender::with_intervals(
337            Duration::from_millis(20),
338            Duration::from_millis(10),
339            Duration::from_millis(100),
340        );
341
342        let start = Instant::now();
343        sender.queue_message(create_state_msg(1));
344
345        // Should wait for collection interval
346        let wait = sender.time_until_send_at(start);
347        assert!(wait.is_some());
348        assert!(wait.unwrap() <= Duration::from_millis(10));
349    }
350
351    #[test]
352    fn test_message_replacement() {
353        let mut sender = SyncSender::new();
354
355        sender.queue_message(create_state_msg(1));
356        sender.queue_message(create_state_msg(2));
357
358        // Should have replaced with newer message
359        assert_eq!(sender.pending_message().unwrap().sender_state_num, 2);
360    }
361
362    #[test]
363    fn test_cancel_pending() {
364        let mut sender = SyncSender::new();
365
366        sender.queue_message(create_state_msg(1));
367        assert!(sender.has_pending());
368
369        sender.cancel_pending();
370        assert!(!sender.has_pending());
371    }
372
373    #[test]
374    fn test_reset() {
375        let mut sender = SyncSender::new();
376        let start = Instant::now();
377
378        sender.queue_message(create_state_msg(1));
379        sender.take_message_at(start);
380
381        sender.queue_message(create_state_msg(2));
382
383        sender.reset();
384
385        assert!(!sender.has_pending());
386        assert!(sender.last_send_time.is_none());
387    }
388}