1use std::time::{Duration, Instant};
7
8use crate::mmp::report::SenderReport;
9use crate::mmp::{
10 COLD_START_SAMPLES, DEFAULT_COLD_START_INTERVAL_MS, MAX_REPORT_INTERVAL_MS,
11 MIN_REPORT_INTERVAL_MS,
12};
13
14pub struct SenderState {
19 cumulative_packets_sent: u64,
21 cumulative_bytes_sent: u64,
22
23 interval_start_counter: u64,
25 interval_start_timestamp: u32,
26 interval_bytes_sent: u32,
27 last_counter: u64,
29 last_timestamp: u32,
31 interval_has_data: bool,
33
34 last_report_time: Option<Instant>,
36 report_interval: Duration,
37
38 consecutive_send_failures: u32,
41
42 srtt_sample_count: u32,
45}
46
47impl SenderState {
48 pub fn new() -> Self {
49 Self::new_with_cold_start(DEFAULT_COLD_START_INTERVAL_MS)
50 }
51
52 pub fn new_with_cold_start(cold_start_ms: u64) -> Self {
57 Self {
58 cumulative_packets_sent: 0,
59 cumulative_bytes_sent: 0,
60 interval_start_counter: 0,
61 interval_start_timestamp: 0,
62 interval_bytes_sent: 0,
63 last_counter: 0,
64 last_timestamp: 0,
65 interval_has_data: false,
66 last_report_time: None,
67 report_interval: Duration::from_millis(cold_start_ms),
68 consecutive_send_failures: 0,
69 srtt_sample_count: 0,
70 }
71 }
72
73 pub fn record_sent(&mut self, counter: u64, timestamp: u32, bytes: usize) {
79 if !self.interval_has_data {
80 self.interval_start_counter = counter;
81 self.interval_start_timestamp = timestamp;
82 self.interval_has_data = true;
83 }
84 self.last_counter = counter;
85 self.last_timestamp = timestamp;
86 self.interval_bytes_sent = self.interval_bytes_sent.saturating_add(bytes as u32);
87 self.cumulative_packets_sent += 1;
88 self.cumulative_bytes_sent += bytes as u64;
89 }
90
91 pub fn build_report(&mut self, now: Instant) -> Option<SenderReport> {
95 if !self.interval_has_data {
96 return None;
97 }
98
99 let report = SenderReport {
100 interval_start_counter: self.interval_start_counter,
101 interval_end_counter: self.last_counter,
102 interval_start_timestamp: self.interval_start_timestamp,
103 interval_end_timestamp: self.last_timestamp,
104 interval_bytes_sent: self.interval_bytes_sent,
105 cumulative_packets_sent: self.cumulative_packets_sent,
106 cumulative_bytes_sent: self.cumulative_bytes_sent,
107 };
108
109 self.interval_has_data = false;
111 self.interval_bytes_sent = 0;
112 self.last_report_time = Some(now);
113
114 Some(report)
115 }
116
117 pub fn should_send_report(&self, now: Instant) -> bool {
122 if !self.interval_has_data {
123 return false;
124 }
125 match self.last_report_time {
126 None => true, Some(last) => {
128 let effective = self
129 .report_interval
130 .mul_f64(self.send_failure_backoff_multiplier());
131 now.duration_since(last) >= effective
132 }
133 }
134 }
135
136 pub fn record_send_failure(&mut self) -> u32 {
138 self.consecutive_send_failures += 1;
139 self.consecutive_send_failures
140 }
141
142 pub fn record_send_success(&mut self) -> u32 {
144 let prev = self.consecutive_send_failures;
145 self.consecutive_send_failures = 0;
146 prev
147 }
148
149 pub fn send_failure_backoff_multiplier(&self) -> f64 {
154 if self.consecutive_send_failures == 0 {
155 1.0
156 } else {
157 2.0_f64.powi(self.consecutive_send_failures.min(5) as i32)
158 }
159 }
160
161 pub fn update_report_interval_from_srtt(&mut self, srtt_us: i64) {
168 self.srtt_sample_count = self.srtt_sample_count.saturating_add(1);
169 let floor = if self.srtt_sample_count <= COLD_START_SAMPLES {
170 DEFAULT_COLD_START_INTERVAL_MS
171 } else {
172 MIN_REPORT_INTERVAL_MS
173 };
174 self.update_report_interval_with_bounds(srtt_us, floor, MAX_REPORT_INTERVAL_MS);
175 }
176
177 pub fn update_report_interval_with_bounds(&mut self, srtt_us: i64, min_ms: u64, max_ms: u64) {
182 if srtt_us <= 0 {
183 return;
184 }
185 let interval_us = (srtt_us * 2) as u64;
186 let interval_ms = (interval_us / 1000).clamp(min_ms, max_ms);
187 self.report_interval = Duration::from_millis(interval_ms);
188 }
189
190 pub fn cumulative_packets_sent(&self) -> u64 {
193 self.cumulative_packets_sent
194 }
195
196 pub fn cumulative_bytes_sent(&self) -> u64 {
197 self.cumulative_bytes_sent
198 }
199
200 pub fn report_interval(&self) -> Duration {
201 self.report_interval
202 }
203
204 pub fn consecutive_send_failures(&self) -> u32 {
205 self.consecutive_send_failures
206 }
207}
208
209impl Default for SenderState {
210 fn default() -> Self {
211 Self::new()
212 }
213}
214
215#[cfg(test)]
220mod tests {
221 use super::*;
222
223 #[test]
224 fn test_new_sender_state() {
225 let s = SenderState::new();
226 assert_eq!(s.cumulative_packets_sent(), 0);
227 assert_eq!(s.cumulative_bytes_sent(), 0);
228 }
229
230 #[test]
231 fn test_record_sent() {
232 let mut s = SenderState::new();
233 s.record_sent(1, 100, 500);
234 s.record_sent(2, 200, 600);
235 assert_eq!(s.cumulative_packets_sent(), 2);
236 assert_eq!(s.cumulative_bytes_sent(), 1100);
237 }
238
239 #[test]
240 fn test_build_report_empty() {
241 let mut s = SenderState::new();
242 assert!(s.build_report(Instant::now()).is_none());
243 }
244
245 #[test]
246 fn test_build_report() {
247 let mut s = SenderState::new();
248 s.record_sent(10, 1000, 500);
249 s.record_sent(11, 1100, 600);
250 s.record_sent(12, 1200, 400);
251
252 let report = s.build_report(Instant::now()).unwrap();
253 assert_eq!(report.interval_start_counter, 10);
254 assert_eq!(report.interval_end_counter, 12);
255 assert_eq!(report.interval_start_timestamp, 1000);
256 assert_eq!(report.interval_end_timestamp, 1200);
257 assert_eq!(report.interval_bytes_sent, 1500);
258 assert_eq!(report.cumulative_packets_sent, 3);
259 assert_eq!(report.cumulative_bytes_sent, 1500);
260 }
261
262 #[test]
263 fn test_build_report_resets_interval() {
264 let mut s = SenderState::new();
265 s.record_sent(1, 100, 500);
266 let _ = s.build_report(Instant::now());
267
268 assert!(s.build_report(Instant::now()).is_none());
270
271 s.record_sent(2, 200, 300);
273 let report = s.build_report(Instant::now()).unwrap();
274 assert_eq!(report.interval_start_counter, 2);
275 assert_eq!(report.interval_bytes_sent, 300);
276 assert_eq!(report.cumulative_packets_sent, 2);
278 assert_eq!(report.cumulative_bytes_sent, 800);
279 }
280
281 #[test]
282 fn test_should_send_report_no_data() {
283 let s = SenderState::new();
284 assert!(!s.should_send_report(Instant::now()));
285 }
286
287 #[test]
288 fn test_should_send_report_first_time() {
289 let mut s = SenderState::new();
290 s.record_sent(1, 100, 500);
291 assert!(s.should_send_report(Instant::now()));
292 }
293
294 #[test]
295 fn test_should_send_report_respects_interval() {
296 let mut s = SenderState::new();
297 let t0 = Instant::now();
298 s.record_sent(1, 100, 500);
299 let _ = s.build_report(t0);
300
301 s.record_sent(2, 200, 500);
302 assert!(!s.should_send_report(t0));
304
305 let t1 = t0 + s.report_interval() + Duration::from_millis(1);
307 assert!(s.should_send_report(t1));
308 }
309
310 #[test]
311 fn test_update_report_interval_cold_start() {
312 let mut s = SenderState::new();
313 s.update_report_interval_from_srtt(50_000);
316 assert_eq!(s.report_interval(), Duration::from_millis(200));
317
318 s.update_report_interval_from_srtt(500_000);
320 assert_eq!(s.report_interval(), Duration::from_millis(1000));
321 }
322
323 #[test]
324 fn test_update_report_interval_after_cold_start() {
325 let mut s = SenderState::new();
326 for _ in 0..COLD_START_SAMPLES {
328 s.update_report_interval_from_srtt(500_000);
329 }
330
331 s.update_report_interval_from_srtt(50_000);
334 assert_eq!(
335 s.report_interval(),
336 Duration::from_millis(MIN_REPORT_INTERVAL_MS)
337 );
338
339 s.update_report_interval_from_srtt(3_000_000);
341 assert_eq!(
342 s.report_interval(),
343 Duration::from_millis(MAX_REPORT_INTERVAL_MS)
344 );
345 }
346
347 #[test]
348 fn test_backoff_multiplier_progression() {
349 let mut s = SenderState::new();
350
351 assert_eq!(s.send_failure_backoff_multiplier(), 1.0);
353 assert_eq!(s.consecutive_send_failures(), 0);
354
355 let expected = [2.0, 4.0, 8.0, 16.0, 32.0];
357 for (i, &exp) in expected.iter().enumerate() {
358 let count = s.record_send_failure();
359 assert_eq!(count, (i + 1) as u32);
360 assert_eq!(s.send_failure_backoff_multiplier(), exp);
361 }
362
363 s.record_send_failure(); assert_eq!(s.send_failure_backoff_multiplier(), 32.0);
366 s.record_send_failure(); assert_eq!(s.send_failure_backoff_multiplier(), 32.0);
368 }
369
370 #[test]
371 fn test_backoff_reset_on_success() {
372 let mut s = SenderState::new();
373
374 s.record_send_failure();
376 s.record_send_failure();
377 s.record_send_failure();
378 assert_eq!(s.consecutive_send_failures(), 3);
379 assert_eq!(s.send_failure_backoff_multiplier(), 8.0);
380
381 let prev = s.record_send_success();
383 assert_eq!(prev, 3);
384 assert_eq!(s.consecutive_send_failures(), 0);
385 assert_eq!(s.send_failure_backoff_multiplier(), 1.0);
386 }
387
388 #[test]
389 fn test_backoff_success_with_no_prior_failures() {
390 let mut s = SenderState::new();
391
392 let prev = s.record_send_success();
394 assert_eq!(prev, 0);
395 assert_eq!(s.consecutive_send_failures(), 0);
396 }
397
398 #[test]
399 fn test_should_send_report_respects_backoff() {
400 let mut s = SenderState::new();
401 let t0 = Instant::now();
402 s.record_sent(1, 100, 500);
403 let _ = s.build_report(t0);
404
405 s.record_send_failure();
407
408 s.record_sent(2, 200, 500);
409
410 let t1 = t0 + s.report_interval() + Duration::from_millis(1);
412 assert!(!s.should_send_report(t1));
413
414 let t2 = t0 + s.report_interval() * 2 + Duration::from_millis(1);
416 assert!(s.should_send_report(t2));
417 }
418}