Skip to main content

soe_protocol/channel/
output.rs

1//! The reliable data output channel: converts application data into ordered,
2//! fragmented reliable data packets, and resends them until acknowledged.
3//!
4//! This is a port of the reference implementation's simplified
5//! `ReliableDataOutputChannel2`, which trades the original's multi-packet bundling
6//! for a much simpler (and less bug-prone) go-back-N style window.
7//!
8//! Like the input channel, this is an I/O-agnostic component: enqueued data is fragmented
9//! into outgoing packets which accumulate in an internal queue. Calling
10//! [`ReliableDataOutputChannel::run_tick`] moves due packets into the outgoing buffer
11//! (drained via [`ReliableDataOutputChannel::take_outgoing`]). Acknowledgements are
12//! fed back in via [`ReliableDataOutputChannel::notify_of_acknowledge`] /
13//! [`ReliableDataOutputChannel::notify_of_acknowledge_all`]. Time is supplied by the
14//! caller as [`Instant`] values.
15
16use std::collections::VecDeque;
17use std::time::{Duration, Instant};
18
19use bytes::{BufMut, Bytes, BytesMut};
20
21use crate::protocol::OpCode;
22use crate::rc4::Rc4KeyState;
23
24use super::true_incoming_sequence;
25
26/// The size of a reliable data packet's sequence prefix.
27const SEQUENCE_SIZE: usize = 2;
28/// The size of a master fragment's total-length prefix.
29const FRAGMENT_LENGTH_SIZE: usize = 4;
30
31/// Adaptive retransmit-timeout (RTO) tuning. Uses the Jacobson/Karels SRTT/RTTVAR
32/// estimator (SIGCOMM '88, "Congestion Avoidance and Control"), as later standardized
33/// for TCP in RFC 6298.
34/// `RTO = SRTT + max(RTO_GRANULARITY, RTT_K * RTTVAR)`, clamped to [RTO_MIN, RTO_MAX].
35const RTT_K: u32 = 4;
36/// Floor on the variance term so the RTO keeps headroom above a perfectly steady RTT
37/// (otherwise RTTVAR -> 0 makes RTO == RTT and the timer fires right as acks arrive).
38const RTO_GRANULARITY: Duration = Duration::from_millis(100);
39/// Lower clamp on the computed RTO; avoids spurious resends on very low-RTT links.
40const RTO_MIN: Duration = Duration::from_millis(200);
41/// Upper clamp on the computed RTO (also the ceiling for exponential backoff).
42const RTO_MAX: Duration = Duration::from_secs(8);
43
44/// Statistics gathered while sending reliable data.
45#[derive(Debug, Default, Clone)]
46pub struct DataOutputStats {
47    /// Total reliable data packets dispatched, including re-sends.
48    pub total_sent: u64,
49    /// Total reliable data packets that were re-sent.
50    pub total_resent: u64,
51    /// Total acknowledgement packets received (including ack-alls).
52    pub incoming_acknowledge_count: u64,
53    /// Total reliable data packets acknowledged (including via ack-all).
54    pub actual_acknowledge_count: u64,
55}
56
57/// Configuration controlling the output channel's behaviour.
58#[derive(Debug, Clone)]
59pub struct OutputConfig {
60    /// The maximum length, in bytes, of the data portion (sequence + data) of a
61    /// single reliable data packet. This is the remote UDP length minus the OP code
62    /// and CRC.
63    pub max_data_length: usize,
64    /// The maximum number of unacknowledged reliable data packets that may be in
65    /// flight at once (the send window).
66    pub max_queued_outgoing: usize,
67    /// The INITIAL retransmit timeout, used before any round-trip time has been
68    /// measured. Once acknowledgements start arriving the channel derives an adaptive
69    /// RTO from the measured RTT (see `RTT_K`/`RTO_MIN`/`RTO_MAX`), so this value only
70    /// governs the very first window.
71    pub ack_wait: Duration,
72}
73
74impl Default for OutputConfig {
75    fn default() -> Self {
76        Self {
77            max_data_length: 508,
78            max_queued_outgoing: 196,
79            ack_wait: Duration::from_millis(500),
80        }
81    }
82}
83
84/// A reliable data packet the channel wishes to send (without OP code or CRC
85/// framing, which the session layer applies).
86#[derive(Debug, Clone, PartialEq, Eq)]
87pub struct OutgoingReliable {
88    /// The OP code of the packet ([`OpCode::ReliableData`] or
89    /// [`OpCode::ReliableDataFragment`]).
90    pub op_code: OpCode,
91    /// The packet payload: a big-endian `u16` sequence, an optional big-endian `u32`
92    /// total-length prefix (master fragments only), and the data chunk.
93    pub payload: Bytes,
94}
95
96#[derive(Debug)]
97struct StashedOutputPacket {
98    is_fragment: bool,
99    data: Bytes,
100    sent: bool,
101    /// When this packet was most recently (re)sent, for RTT measurement and per-packet
102    /// retransmit timing. `None` until first dispatched.
103    sent_at: Option<Instant>,
104    /// Set once the packet has been retransmitted: its RTT becomes ambiguous, so it must
105    /// not be used as a round-trip sample (Karn's algorithm).
106    resent: bool,
107}
108
109/// Converts application data into ordered, fragmented reliable data packets.
110#[derive(Debug)]
111pub struct ReliableDataOutputChannel {
112    config: OutputConfig,
113    cipher: Option<Rc4KeyState>,
114
115    dispatch_queue: VecDeque<(i64, StashedOutputPacket)>,
116
117    /// The total number of sequences that have been output.
118    total_sequence: i64,
119    /// The maximum sequence number that the client is known to have received.
120    max_client_sequence: i64,
121    /// The index into `dispatch_queue` of the next packet to dispatch.
122    current_dispatch_index: usize,
123
124    /// Smoothed round-trip time estimate (`None` until the first RTT sample).
125    srtt: Option<Duration>,
126    /// Round-trip time variation estimate (the RTTVAR term of the Jacobson/Karels estimator).
127    rttvar: Duration,
128    /// Current retransmit timeout: adaptive once RTT is known, else `config.ack_wait`.
129    rto: Duration,
130
131    outgoing: Vec<OutgoingReliable>,
132    stats: DataOutputStats,
133}
134
135impl ReliableDataOutputChannel {
136    /// Creates a new output channel. `cipher` is the initial RC4 key state; pass
137    /// `Some(..)` to enable RC4 encryption of the proxied application data, or `None`
138    /// to pass it through unencrypted.
139    pub fn new(config: OutputConfig, cipher: Option<Rc4KeyState>, _now: Instant) -> Self {
140        let initial_rto = config.ack_wait;
141        Self {
142            config,
143            cipher,
144            dispatch_queue: VecDeque::new(),
145            total_sequence: 0,
146            max_client_sequence: 0,
147            current_dispatch_index: 0,
148            srtt: None,
149            rttvar: Duration::ZERO,
150            rto: initial_rto,
151            outgoing: Vec::new(),
152            stats: DataOutputStats::default(),
153        }
154    }
155
156    /// Returns the gathered output statistics.
157    pub fn stats(&self) -> &DataOutputStats {
158        &self.stats
159    }
160
161    /// Drains the outgoing reliable data packets accumulated so far.
162    pub fn take_outgoing(&mut self) -> Vec<OutgoingReliable> {
163        std::mem::take(&mut self.outgoing)
164    }
165
166    /// Returns the number of reliable data packets currently awaiting acknowledgement.
167    pub fn queued_len(&self) -> usize {
168        self.dispatch_queue.len()
169    }
170
171    /// Sets the maximum length of the data portion (sequence + data) of a single
172    /// packet. Should not be called after data has been enqueued.
173    pub fn set_max_data_length(&mut self, max_data_length: usize) {
174        self.config.max_data_length = max_data_length;
175    }
176
177    fn max_chunk(&self) -> usize {
178        self.config.max_data_length - SEQUENCE_SIZE
179    }
180
181    /// Enqueues application data to be sent on the reliable channel. The data is
182    /// fragmented as required to fit within the configured maximum packet length.
183    pub fn enqueue_data(&mut self, data: &[u8]) {
184        if data.is_empty() {
185            return;
186        }
187
188        let mut remaining: Bytes = match &mut self.cipher {
189            Some(_) => self.encrypt(data),
190            None => Bytes::copy_from_slice(data),
191        };
192
193        let is_fragment = remaining.len() > self.max_chunk();
194        self.stash_fragment(&mut remaining, true, is_fragment);
195        while !remaining.is_empty() {
196            self.stash_fragment(&mut remaining, false, true);
197        }
198    }
199
200    /// Runs a tick of the output channel, moving due packets into the outgoing
201    /// buffer. If the oldest in-flight packet has gone unacknowledged for longer than
202    /// the current (adaptive) retransmit timeout, dispatch restarts from the front of
203    /// the window (go-back-N) and the RTO is backed off exponentially (Karn).
204    pub fn run_tick(&mut self, now: Instant) {
205        // Retransmission timeout is keyed off the OLDEST in-flight packet's own send time
206        // (not a single global timer), so a long quiet period before the first ack can't
207        // make every tick resend the whole window.
208        let timed_out = match self.dispatch_queue.front() {
209            Some((_, front)) if front.sent => front
210                .sent_at
211                .is_some_and(|sent_at| now.duration_since(sent_at) > self.rto),
212            _ => false,
213        };
214        if timed_out {
215            self.current_dispatch_index = 0;
216            // Karn's exponential backoff; a fresh (unambiguous) RTT sample resets this.
217            self.rto = (self.rto * 2).min(RTO_MAX);
218        }
219
220        let max_index = self
221            .dispatch_queue
222            .len()
223            .min(self.config.max_queued_outgoing);
224
225        while self.current_dispatch_index < max_index {
226            let (_, packet) = &mut self.dispatch_queue[self.current_dispatch_index];
227            let op_code = if packet.is_fragment {
228                OpCode::ReliableDataFragment
229            } else {
230                OpCode::ReliableData
231            };
232
233            self.stats.total_sent += 1;
234            if packet.sent {
235                self.stats.total_resent += 1;
236                // A retransmitted packet's RTT is ambiguous; exclude it from sampling.
237                packet.resent = true;
238            }
239            packet.sent = true;
240            packet.sent_at = Some(now);
241
242            let payload = packet.data.clone();
243            self.outgoing.push(OutgoingReliable { op_code, payload });
244            self.current_dispatch_index += 1;
245        }
246    }
247
248    /// Folds a fresh, unambiguous round-trip sample into the smoothed RTT/variance
249    /// estimates and recomputes the adaptive retransmit timeout (Jacobson/Karels estimator).
250    fn update_rto(&mut self, sample: Duration) {
251        match self.srtt {
252            None => {
253                self.srtt = Some(sample);
254                self.rttvar = sample / 2;
255            }
256            Some(srtt) => {
257                let diff = srtt.abs_diff(sample);
258                // RTTVAR = 3/4 * RTTVAR + 1/4 * |SRTT - sample|
259                self.rttvar = (self.rttvar * 3 + diff) / 4;
260                // SRTT = 7/8 * SRTT + 1/8 * sample
261                self.srtt = Some((srtt * 7 + sample) / 8);
262            }
263        }
264        let srtt = self.srtt.unwrap_or(sample);
265        let rto = srtt + std::cmp::max(RTO_GRANULARITY, self.rttvar * RTT_K);
266        self.rto = rto.clamp(RTO_MIN, RTO_MAX);
267    }
268
269    /// Notifies the channel that the remote has acknowledged a single sequence.
270    pub fn notify_of_acknowledge(&mut self, sequence: u16, now: Instant) {
271        let seq = self.true_incoming(sequence);
272        self.stats.incoming_acknowledge_count += 1;
273
274        if let Some(pos) = self.dispatch_queue.iter().position(|(s, _)| *s == seq) {
275            let (_, pkt) = &self.dispatch_queue[pos];
276            let sample = (pkt.sent && !pkt.resent)
277                .then(|| pkt.sent_at.map(|sent_at| now.duration_since(sent_at)))
278                .flatten();
279            self.dispatch_queue.remove(pos);
280            self.current_dispatch_index = self.current_dispatch_index.saturating_sub(1);
281            self.stats.actual_acknowledge_count += 1;
282            if let Some(sample) = sample {
283                self.update_rto(sample);
284            }
285        }
286
287        if seq > self.max_client_sequence {
288            self.max_client_sequence = seq;
289        }
290    }
291
292    /// Notifies the channel that the remote has acknowledged all sequences up to and
293    /// including the given one.
294    pub fn notify_of_acknowledge_all(&mut self, sequence: u16, now: Instant) {
295        let seq = self.true_incoming(sequence);
296        self.stats.incoming_acknowledge_count += 1;
297
298        let mut sample: Option<Duration> = None;
299        loop {
300            let (pop, this_sample) = match self.dispatch_queue.front() {
301                Some((s, pkt)) if *s <= seq => {
302                    let smp = (pkt.sent && !pkt.resent)
303                        .then(|| pkt.sent_at.map(|sent_at| now.duration_since(sent_at)))
304                        .flatten();
305                    (true, smp)
306                }
307                _ => (false, None),
308            };
309            if !pop {
310                break;
311            }
312            // Keep the freshest (most recently sent) unambiguous sample in this batch.
313            if this_sample.is_some() {
314                sample = this_sample;
315            }
316            self.dispatch_queue.pop_front();
317            self.current_dispatch_index = self.current_dispatch_index.saturating_sub(1);
318            self.stats.actual_acknowledge_count += 1;
319        }
320
321        if let Some(sample) = sample {
322            self.update_rto(sample);
323        }
324
325        if seq > self.max_client_sequence {
326            self.max_client_sequence = seq;
327        }
328    }
329
330    fn stash_fragment(&mut self, data: &mut Bytes, is_master: bool, is_fragment: bool) {
331        let mut amount = data.len().min(self.max_chunk());
332
333        let mut buf = BytesMut::with_capacity(SEQUENCE_SIZE + FRAGMENT_LENGTH_SIZE + amount);
334        buf.put_u16(self.total_sequence as u16);
335
336        if is_master && is_fragment {
337            buf.put_u32(data.len() as u32);
338            amount -= FRAGMENT_LENGTH_SIZE;
339        }
340
341        buf.extend_from_slice(&data[..amount]);
342
343        self.dispatch_queue.push_back((
344            self.total_sequence,
345            StashedOutputPacket {
346                is_fragment,
347                data: buf.freeze(),
348                sent: false,
349                sent_at: None,
350                resent: false,
351            },
352        ));
353
354        self.total_sequence += 1;
355        *data = data.slice(amount..);
356    }
357
358    /// Encrypts `data` with the channel's RC4 cipher. A leading zero byte is
359    /// prepended when the ciphertext itself begins with a zero, mirroring the input
360    /// channel's padding-strip logic.
361    fn encrypt(&mut self, data: &[u8]) -> Bytes {
362        let cipher = self
363            .cipher
364            .as_mut()
365            .expect("encrypt called without a cipher");
366
367        let mut buf = BytesMut::with_capacity(data.len() + 1);
368        buf.put_u8(0);
369        buf.extend_from_slice(data);
370        cipher.transform_in_place(&mut buf[1..]);
371
372        let frozen = buf.freeze();
373        if frozen[1] == 0 {
374            frozen
375        } else {
376            frozen.slice(1..)
377        }
378    }
379
380    fn true_incoming(&self, packet_sequence: u16) -> i64 {
381        true_incoming_sequence(
382            packet_sequence,
383            self.max_client_sequence,
384            self.config.max_queued_outgoing as i64,
385        )
386    }
387}
388
389#[cfg(test)]
390mod tests {
391    use super::*;
392
393    const MAX_DATA_LENGTH: usize = 506; // 512 (udp) - 2 (op) - 2 (seq) - 2 (crc)
394    const FRAGMENT_WINDOW_SIZE: usize = 8;
395
396    struct Clock {
397        now: Instant,
398    }
399
400    impl Clock {
401        fn new() -> Self {
402            Self {
403                now: Instant::now(),
404            }
405        }
406        fn advance(&mut self, by: Duration) -> Instant {
407            self.now += by;
408            self.now
409        }
410    }
411
412    fn new_channel(clock: &Clock) -> ReliableDataOutputChannel {
413        let config = OutputConfig {
414            max_data_length: MAX_DATA_LENGTH + SEQUENCE_SIZE,
415            max_queued_outgoing: FRAGMENT_WINDOW_SIZE,
416            ack_wait: Duration::from_millis(500),
417        };
418        ReliableDataOutputChannel::new(config, None, clock.now)
419    }
420
421    /// A deterministic pseudo-random byte buffer.
422    fn generate_packet(size: usize) -> Vec<u8> {
423        let mut state: u32 = 0x1234_5678 ^ size as u32;
424        (0..size)
425            .map(|_| {
426                state = state.wrapping_mul(1_664_525).wrapping_add(1_013_904_223);
427                (state >> 24) as u8
428            })
429            .collect()
430    }
431
432    /// Asserts that the data carried by `packets` (stripping the sequence and, for
433    /// the first packet if `expect_master_fragment`, the length prefix) concatenates
434    /// to exactly `buffer`.
435    fn assert_packets_equal_buffer(
436        packets: &[OutgoingReliable],
437        buffer: &[u8],
438        mut expect_master_fragment: bool,
439    ) {
440        let mut position = 0;
441        for packet in packets {
442            let data_offset = SEQUENCE_SIZE
443                + if expect_master_fragment {
444                    FRAGMENT_LENGTH_SIZE
445                } else {
446                    0
447                };
448            expect_master_fragment = false;
449
450            let data = &packet.payload[data_offset..];
451            assert!(
452                position + data.len() <= buffer.len(),
453                "received more data than expected"
454            );
455            assert_eq!(&buffer[position..position + data.len()], data);
456            position += data.len();
457        }
458        assert_eq!(position, buffer.len(), "did not receive the whole buffer");
459    }
460
461    #[test]
462    fn repeats_data_on_ack_failure() {
463        let mut clock = Clock::new();
464        let mut ch = new_channel(&clock);
465
466        let fragment_count = 4;
467        let packet_length = MAX_DATA_LENGTH - 4 + MAX_DATA_LENGTH * (fragment_count - 1);
468        let packet = generate_packet(packet_length);
469
470        ch.enqueue_data(&packet);
471        ch.run_tick(clock.advance(Duration::from_millis(1)));
472        assert_packets_equal_buffer(&ch.take_outgoing(), &packet, true);
473
474        // Don't acknowledge; after the ack wait elapses the data is resent in full.
475        ch.run_tick(clock.advance(Duration::from_millis(600)));
476        assert_packets_equal_buffer(&ch.take_outgoing(), &packet, true);
477    }
478
479    #[test]
480    fn repeats_data_from_arbitrary_position_on_ack_delay() {
481        let mut clock = Clock::new();
482        let mut ch = new_channel(&clock);
483
484        let fragment_count = 4;
485        let packet_length = MAX_DATA_LENGTH - 4 + MAX_DATA_LENGTH * (fragment_count - 1);
486        let packet = generate_packet(packet_length);
487
488        ch.enqueue_data(&packet);
489        ch.run_tick(clock.advance(Duration::from_millis(1)));
490        assert_packets_equal_buffer(&ch.take_outgoing(), &packet, true);
491
492        ch.notify_of_acknowledge_all(1, clock.advance(Duration::from_millis(1)));
493
494        ch.run_tick(clock.advance(Duration::from_millis(600)));
495        // The master fragment (MAX-4) and the next fragment (MAX) were acknowledged.
496        let expected_consumed = MAX_DATA_LENGTH - 4 + MAX_DATA_LENGTH;
497        assert_packets_equal_buffer(&ch.take_outgoing(), &packet[expected_consumed..], false);
498    }
499
500    #[test]
501    fn repeats_full_window_from_arbitrary_position_on_ack_delay() {
502        let mut clock = Clock::new();
503        let mut ch = new_channel(&clock);
504
505        let fragment_count = FRAGMENT_WINDOW_SIZE * 2;
506        let packet_length = MAX_DATA_LENGTH - 4 + MAX_DATA_LENGTH * (fragment_count - 1);
507        let packet = generate_packet(packet_length);
508
509        ch.enqueue_data(&packet);
510        ch.run_tick(clock.advance(Duration::from_millis(1)));
511
512        // Only a full window of packets is sent initially.
513        let expected_receive_length =
514            MAX_DATA_LENGTH - 4 + MAX_DATA_LENGTH * (FRAGMENT_WINDOW_SIZE - 1);
515        assert_packets_equal_buffer(
516            &ch.take_outgoing(),
517            &packet[..expected_receive_length],
518            true,
519        );
520
521        ch.notify_of_acknowledge_all(
522            (FRAGMENT_WINDOW_SIZE - 2) as u16,
523            clock.advance(Duration::from_millis(1)),
524        );
525        ch.run_tick(clock.advance(Duration::from_millis(600)));
526
527        let expected_consumed = MAX_DATA_LENGTH - 4 + MAX_DATA_LENGTH * (FRAGMENT_WINDOW_SIZE - 2);
528        let expected_repeat_length = MAX_DATA_LENGTH * FRAGMENT_WINDOW_SIZE;
529        assert_packets_equal_buffer(
530            &ch.take_outgoing(),
531            &packet[expected_consumed..expected_consumed + expected_repeat_length],
532            false,
533        );
534    }
535
536    #[test]
537    fn single_small_packet_is_not_fragmented() {
538        let mut clock = Clock::new();
539        let mut ch = new_channel(&clock);
540
541        let data = generate_packet(32);
542        ch.enqueue_data(&data);
543        ch.run_tick(clock.advance(Duration::from_millis(1)));
544
545        let outgoing = ch.take_outgoing();
546        assert_eq!(outgoing.len(), 1);
547        assert_eq!(outgoing[0].op_code, OpCode::ReliableData);
548        // No length prefix: payload is [seq u16][data].
549        assert_eq!(&outgoing[0].payload[SEQUENCE_SIZE..], &data[..]);
550    }
551
552    #[test]
553    fn single_ack_removes_specific_packet() {
554        let mut clock = Clock::new();
555        let mut ch = new_channel(&clock);
556
557        let packet_length = MAX_DATA_LENGTH - 4 + MAX_DATA_LENGTH * 3;
558        let packet = generate_packet(packet_length);
559        ch.enqueue_data(&packet);
560        assert_eq!(ch.queued_len(), 4);
561
562        ch.run_tick(clock.advance(Duration::from_millis(1)));
563        let _ = ch.take_outgoing();
564
565        ch.notify_of_acknowledge(2, clock.advance(Duration::from_millis(1)));
566        assert_eq!(ch.queued_len(), 3);
567        assert_eq!(ch.stats().actual_acknowledge_count, 1);
568    }
569
570    /// Across consecutive ticks WITHOUT acknowledgement, the number of unacknowledged
571    /// packets in flight must never exceed `max_queued_outgoing`. (Regression: the window
572    /// ceiling was computed relative to the already-advanced dispatch index, so each tick
573    /// admitted another full window -> unbounded in-flight growth -> client RCVBUF overflow.)
574    #[test]
575    fn window_does_not_grow_across_ticks_without_ack() {
576        let mut clock = Clock::new();
577        let mut ch = new_channel(&clock);
578
579        // Enqueue far more than one window's worth of fragments.
580        let fragment_count = FRAGMENT_WINDOW_SIZE * 4;
581        let packet_length = MAX_DATA_LENGTH - 4 + MAX_DATA_LENGTH * (fragment_count - 1);
582        let packet = generate_packet(packet_length);
583        ch.enqueue_data(&packet);
584
585        // Tick 1: a full window goes out.
586        ch.run_tick(clock.advance(Duration::from_millis(1)));
587        let mut in_flight = ch.take_outgoing().len();
588        assert_eq!(
589            in_flight, FRAGMENT_WINDOW_SIZE,
590            "first tick should send exactly one window"
591        );
592
593        // Several more ticks, no ack, well within ack_wait: nothing new may be sent
594        // because the window is still full of unacknowledged packets.
595        for _ in 0..5 {
596            ch.run_tick(clock.advance(Duration::from_millis(10)));
597            in_flight += ch.take_outgoing().len();
598            assert!(
599                in_flight <= FRAGMENT_WINDOW_SIZE,
600                "in-flight unacked packets ({in_flight}) exceeded the window ({FRAGMENT_WINDOW_SIZE})",
601            );
602        }
603    }
604
605    /// Once a real round-trip time has been measured, the retransmit timeout must adapt
606    /// upward so a quiet gap LONGER than the initial `ack_wait` no longer triggers a
607    /// spurious resend. A fixed RTO (== ack_wait) would resend here; the adaptive RTO
608    /// (SRTT + 4*RTTVAR after a ~500ms sample => ~1.5s) must not.
609    #[test]
610    fn adaptive_rto_suppresses_resend_after_learning_high_rtt() {
611        let mut clock = Clock::new();
612        let mut ch = new_channel(&clock); // ack_wait = 500ms
613
614        // Send one window.
615        let fragment_count = FRAGMENT_WINDOW_SIZE + 4;
616        let packet_length = MAX_DATA_LENGTH - 4 + MAX_DATA_LENGTH * (fragment_count - 1);
617        let packet = generate_packet(packet_length);
618        ch.enqueue_data(&packet);
619        ch.run_tick(clock.advance(Duration::from_millis(1)));
620        let _ = ch.take_outgoing();
621
622        // Acknowledge the whole first window after a ~500ms round trip: this feeds the
623        // RTO estimator a 500ms sample, raising the RTO well above the 500ms ack_wait.
624        ch.notify_of_acknowledge_all(
625            (FRAGMENT_WINDOW_SIZE - 1) as u16,
626            clock.advance(Duration::from_millis(500)),
627        );
628
629        // The newly admitted window is now in flight. Advance 600ms (> the old fixed
630        // ack_wait) with no further ack. With a fixed RTO this resends the window; with
631        // the adaptive RTO (~1.5s) it must NOT.
632        ch.run_tick(clock.advance(Duration::from_millis(1)));
633        let _ = ch.take_outgoing();
634        ch.run_tick(clock.advance(Duration::from_millis(600)));
635        let resent = ch.take_outgoing();
636
637        assert!(
638            resent.is_empty(),
639            "adaptive RTO must not resend within the learned RTT, but resent {} packets",
640            resent.len()
641        );
642        assert_eq!(
643            ch.stats().total_resent,
644            0,
645            "no packet should have been retransmitted after the RTO adapted to the RTT"
646        );
647    }
648
649    /// End-to-end drain at RTT ~= the initial `ack_wait`: the channel must deliver every
650    /// packet while keeping in-flight within ~1 window and the on-wire datagram count close
651    /// to the unique count (no resend storm). Models a delayed pipe with cumulative acks.
652    #[test]
653    fn adaptive_rto_bounds_inflight_at_high_rtt() {
654        let mut clock = Clock::new();
655        let mut ch = new_channel(&clock);
656
657        let one_way = Duration::from_millis(250); // RTT ~= ack_wait (500ms)
658        let tick = Duration::from_millis(5);
659
660        let fragment_count = 30;
661        let packet_length = MAX_DATA_LENGTH - 4 + MAX_DATA_LENGTH * (fragment_count - 1);
662        let packet = generate_packet(packet_length);
663        ch.enqueue_data(&packet);
664        let unique = ch.queued_len();
665
666        let mut to_client: Vec<(Instant, u16)> = Vec::new();
667        let mut to_server: Vec<(Instant, u16)> = Vec::new();
668        let mut received = vec![false; unique];
669
670        let mut total_on_wire = 0usize;
671        let mut highest_sent: i64 = -1;
672        let mut last_ack: i64 = -1;
673        let mut max_in_flight: i64 = 0;
674
675        for _ in 0..800 {
676            let now = clock.advance(tick);
677
678            // Deliver acks that have arrived back at the server.
679            to_server.retain(|&(at, ack)| {
680                if at <= now {
681                    ch.notify_of_acknowledge_all(ack, now);
682                    last_ack = last_ack.max(ack as i64);
683                    false
684                } else {
685                    true
686                }
687            });
688
689            // Deliver datagrams that have arrived at the client; ack the highest
690            // contiguous sequence seen so far.
691            let mut delivered_any = false;
692            to_client.retain(|&(at, seq)| {
693                if at <= now {
694                    received[seq as usize] = true;
695                    delivered_any = true;
696                    false
697                } else {
698                    true
699                }
700            });
701            if delivered_any {
702                let mut hw: i64 = -1;
703                for (seq, got) in received.iter().enumerate() {
704                    if *got {
705                        hw = seq as i64;
706                    } else {
707                        break;
708                    }
709                }
710                if hw >= 0 {
711                    to_server.push((now + one_way, hw as u16));
712                }
713            }
714
715            ch.run_tick(now);
716            for out in ch.take_outgoing() {
717                let seq = u16::from_be_bytes([out.payload[0], out.payload[1]]);
718                total_on_wire += 1;
719                highest_sent = highest_sent.max(seq as i64);
720                to_client.push((now + one_way, seq));
721            }
722
723            max_in_flight = max_in_flight.max(highest_sent - last_ack);
724
725            if last_ack >= 0 && last_ack as usize + 1 == unique {
726                break;
727            }
728        }
729
730        assert!(
731            last_ack >= 0 && last_ack as usize + 1 == unique,
732            "channel did not drain all {unique} packets (acked through {last_ack})"
733        );
734        assert!(
735            max_in_flight <= FRAGMENT_WINDOW_SIZE as i64 + 2,
736            "in-flight ({max_in_flight}) far exceeded the window ({FRAGMENT_WINDOW_SIZE}) -> resend storm",
737        );
738        assert!(
739            total_on_wire <= unique + unique / 4,
740            "sent {total_on_wire} datagrams for {unique} unique packets (>1.25x = resend storm)",
741        );
742    }
743}