Skip to main content

soe_protocol/channel/
input.rs

1//! The reliable data input channel: validates, orders and reassembles incoming
2//! reliable data, decrypts/unbundles it, and emits acknowledgements.
3//!
4//! This is an I/O-agnostic component. Incoming packets are fed via
5//! [`ReliableDataInputChannel::handle_reliable_data`] /
6//! [`ReliableDataInputChannel::handle_reliable_data_fragment`], and the channel
7//! accumulates outgoing acknowledgement packets (drained via
8//! [`ReliableDataInputChannel::take_outgoing`]) and decoded application data
9//! (drained via [`ReliableDataInputChannel::take_app_data`]). Time is supplied by
10//! the caller as [`Instant`] values.
11
12use std::time::{Duration, Instant};
13
14use bytes::{Bytes, BytesMut};
15
16use crate::constants::MULTI_DATA_INDICATOR;
17use crate::io::BinaryReader;
18use crate::protocol::OpCode;
19use crate::rc4::Rc4KeyState;
20use crate::varint::data_bundle;
21
22use super::true_incoming_sequence;
23
24/// The size of a master fragment's big-endian `u32` total-length prefix.
25const FRAGMENT_COMPLETE_LENGTH_SIZE: usize = 4;
26
27/// An upper bound on how much capacity to pre-allocate for a reassembly buffer based
28/// on a fragment's self-reported total length. The buffer still grows to fit data
29/// that actually arrives; this only prevents a malicious master fragment from forcing
30/// a huge up-front allocation from a tiny packet.
31const MAX_FRAGMENT_PREALLOC: usize = 64 * 1024;
32
33/// Signals that incoming reliable data was malformed. The session should respond by
34/// terminating the connection as [`DisconnectReason::CorruptPacket`](crate::protocol::DisconnectReason::CorruptPacket).
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub struct CorruptData;
37
38/// Statistics gathered while receiving reliable data.
39#[derive(Debug, Default, Clone)]
40pub struct DataInputStats {
41    /// Total reliable data packets received, including duplicates.
42    pub total_received: u64,
43    /// Number of duplicate reliable data packets received.
44    pub duplicate_count: u64,
45    /// Number of reliable data packets received out of order.
46    pub out_of_order_count: u64,
47    /// Total application bytes received (excluding indicators/padding).
48    pub total_received_bytes: u64,
49    /// Number of acknowledgement packets emitted.
50    pub acknowledge_count: u64,
51}
52
53/// Configuration controlling the input channel's behaviour.
54#[derive(Debug, Clone)]
55pub struct InputConfig {
56    /// Maximum number of incoming reliable data packets that may be stashed.
57    pub max_queued_incoming: u16,
58    /// Whether every data packet is acknowledged individually.
59    pub acknowledge_all_data: bool,
60    /// The acknowledgement window used to decide when to send an `AcknowledgeAll`.
61    pub data_ack_window: u16,
62    /// Maximum delay before acknowledging incoming reliable data sequences.
63    pub max_ack_delay: Duration,
64}
65
66impl Default for InputConfig {
67    fn default() -> Self {
68        Self {
69            max_queued_incoming: 256,
70            acknowledge_all_data: false,
71            data_ack_window: 32,
72            max_ack_delay: Duration::from_millis(2),
73        }
74    }
75}
76
77/// A contextual packet the channel wishes to send (without OP code or CRC framing,
78/// which the session layer applies). For this channel it is always an acknowledgement
79/// carrying a single sequence number.
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub struct OutgoingContextual {
82    /// The OP code of the packet ([`OpCode::Acknowledge`] or [`OpCode::AcknowledgeAll`]).
83    pub op_code: OpCode,
84    /// The acknowledged sequence number.
85    pub sequence: u16,
86}
87
88#[derive(Debug)]
89struct Stashed {
90    data: Bytes,
91    is_fragment: bool,
92}
93
94/// Handles reliable data packets, extracting the proxied application data in order.
95#[derive(Debug)]
96pub struct ReliableDataInputChannel {
97    config: InputConfig,
98    cipher: Option<Rc4KeyState>,
99
100    /// The next reliable data sequence we expect to receive.
101    window_start_sequence: i64,
102
103    /// Buffer accumulating fragments of the current data unit.
104    current_buffer: Option<BytesMut>,
105    /// The expected total length of the current data unit.
106    expected_data_length: usize,
107
108    /// The last sequence we acknowledged via `AcknowledgeAll`.
109    last_ack_all_sequence: i64,
110    last_ack_all_time: Instant,
111
112    stash: Vec<Option<Stashed>>,
113    stats: DataInputStats,
114
115    outgoing: Vec<OutgoingContextual>,
116    app_data: Vec<Bytes>,
117}
118
119impl ReliableDataInputChannel {
120    /// Creates a new input channel. `cipher` is the initial RC4 key state; pass
121    /// `Some(..)` to enable RC4 decryption of the proxied application data, or
122    /// `None` to pass it through unencrypted.
123    pub fn new(config: InputConfig, cipher: Option<Rc4KeyState>, now: Instant) -> Self {
124        let capacity = config.max_queued_incoming as usize;
125        let stash = std::iter::repeat_with(|| None).take(capacity).collect();
126
127        Self {
128            config,
129            cipher,
130            window_start_sequence: 0,
131            current_buffer: None,
132            expected_data_length: 0,
133            last_ack_all_sequence: -1,
134            last_ack_all_time: now,
135            stash,
136            stats: DataInputStats::default(),
137            outgoing: Vec::new(),
138            app_data: Vec::new(),
139        }
140    }
141
142    /// Returns the gathered input statistics.
143    pub fn stats(&self) -> &DataInputStats {
144        &self.stats
145    }
146
147    /// Drains the outgoing acknowledgement packets accumulated so far.
148    pub fn take_outgoing(&mut self) -> Vec<OutgoingContextual> {
149        std::mem::take(&mut self.outgoing)
150    }
151
152    /// Drains the decoded application data buffers accumulated so far.
153    pub fn take_app_data(&mut self) -> Vec<Bytes> {
154        std::mem::take(&mut self.app_data)
155    }
156
157    fn max_queued(&self) -> i64 {
158        self.config.max_queued_incoming as i64
159    }
160
161    /// Runs periodic channel logic: emits a buffered `AcknowledgeAll` when due.
162    pub fn run_tick(&mut self, now: Instant) {
163        let to_ack = self.window_start_sequence - 1;
164
165        // No need to ack-all if acking everything individually, or we've already
166        // acked up to the current window start.
167        if self.config.acknowledge_all_data || to_ack <= self.last_ack_all_sequence {
168            return;
169        }
170
171        let need_ack = now.duration_since(self.last_ack_all_time) > self.config.max_ack_delay
172            || to_ack >= self.last_ack_all_sequence + (self.config.data_ack_window / 2) as i64;
173
174        if need_ack {
175            self.send_ack_all(to_ack, now);
176        }
177    }
178
179    /// Handles a [`OpCode::ReliableData`] packet (OP code already stripped). Returns
180    /// `Err` if the data (or any stashed fragment it releases) is malformed.
181    pub fn handle_reliable_data(&mut self, data: Bytes, now: Instant) -> Result<(), CorruptData> {
182        if !self.preprocess(&data, false, now) {
183            return Ok(());
184        }
185        self.process_data(data.slice(2..));
186        self.window_start_sequence += 1;
187        self.consume_stashed()
188    }
189
190    /// Handles a [`OpCode::ReliableDataFragment`] packet (OP code already stripped).
191    /// Returns `Err` if the fragment (or any stashed fragment it releases) is
192    /// malformed.
193    pub fn handle_reliable_data_fragment(
194        &mut self,
195        data: Bytes,
196        now: Instant,
197    ) -> Result<(), CorruptData> {
198        if !self.preprocess(&data, true, now) {
199            return Ok(());
200        }
201        self.write_immediate_fragment(&data[2..])?;
202        self.window_start_sequence += 1;
203        self.try_process_current_buffer();
204        self.consume_stashed()
205    }
206
207    fn emit(&mut self, op_code: OpCode, sequence: u16) {
208        self.outgoing.push(OutgoingContextual { op_code, sequence });
209    }
210
211    fn send_ack_all(&mut self, sequence: i64, now: Instant) {
212        self.emit(OpCode::AcknowledgeAll, sequence as u16);
213        self.stats.acknowledge_count += 1;
214        self.last_ack_all_sequence = sequence;
215        self.last_ack_all_time = now;
216    }
217
218    /// Validates and, if necessary, stashes incoming reliable data. Returns `true`
219    /// if the data (with its sequence stripped) should be processed immediately.
220    fn preprocess(&mut self, data: &Bytes, is_fragment: bool, now: Instant) -> bool {
221        self.stats.total_received += 1;
222
223        let (sequence, packet_sequence) = match self.is_valid_reliable_data(data, now) {
224            Some(v) => v,
225            None => return false,
226        };
227
228        let ahead = sequence != self.window_start_sequence;
229
230        // Ack now if in ack-all mode or this is ahead of our expectations.
231        if self.config.acknowledge_all_data || ahead {
232            self.emit(OpCode::Acknowledge, packet_sequence);
233        }
234
235        if !ahead {
236            return true;
237        }
238
239        // Out of order: stash it.
240        self.stats.out_of_order_count += 1;
241        let spot = sequence.rem_euclid(self.max_queued()) as usize;
242        if self.stash[spot].is_some() {
243            self.stats.duplicate_count += 1;
244            return false;
245        }
246
247        self.stash[spot] = Some(Stashed {
248            data: data.slice(2..),
249            is_fragment,
250        });
251        false
252    }
253
254    /// Checks whether the given data is within the current window. Returns the true
255    /// sequence and embedded packet sequence if it should be processed/stashed.
256    fn is_valid_reliable_data(&mut self, data: &[u8], now: Instant) -> Option<(i64, u16)> {
257        if data.len() < 2 {
258            return None;
259        }
260        let packet_sequence = u16::from_be_bytes([data[0], data[1]]);
261        let sequence = true_incoming_sequence(
262            packet_sequence,
263            self.window_start_sequence,
264            self.max_queued(),
265        );
266
267        // Too far ahead of our window; drop it.
268        if sequence > self.window_start_sequence + self.max_queued() {
269            return None;
270        }
271
272        // Inside the window.
273        if sequence >= self.window_start_sequence {
274            return Some((sequence, packet_sequence));
275        }
276
277        // Already processed: nudge the remote, but not too frequently.
278        if now.duration_since(self.last_ack_all_time) < self.config.max_ack_delay {
279            self.send_ack_all(self.window_start_sequence - 1, now);
280        }
281        self.stats.duplicate_count += 1;
282        None
283    }
284
285    /// Appends fragment data to the current buffer, allocating it (and reading the
286    /// total length prefix) if this is the master fragment. Returns `Err` if a master
287    /// fragment is too short to contain its length prefix.
288    fn write_immediate_fragment(&mut self, data: &[u8]) -> Result<(), CorruptData> {
289        if let Some(buf) = &mut self.current_buffer {
290            buf.extend_from_slice(data);
291        } else {
292            // A master fragment must carry a 4-byte total-length prefix. A shorter one
293            // is malformed (and previously panicked when indexing the prefix).
294            if data.len() < FRAGMENT_COMPLETE_LENGTH_SIZE {
295                return Err(CorruptData);
296            }
297            let expected = u32::from_be_bytes([data[0], data[1], data[2], data[3]]) as usize;
298            self.expected_data_length = expected;
299            // Cap the up-front allocation: the buffer still grows to fit data that
300            // actually arrives, but a tiny packet can't claim (e.g.) 4 GiB and force a
301            // matching allocation.
302            let mut buf = BytesMut::with_capacity(expected.min(MAX_FRAGMENT_PREALLOC));
303            buf.extend_from_slice(&data[FRAGMENT_COMPLETE_LENGTH_SIZE..]);
304            self.current_buffer = Some(buf);
305        }
306        Ok(())
307    }
308
309    /// The pre-allocated capacity of the in-progress reassembly buffer, if any.
310    /// Used by tests to assert that a hostile master fragment can't force a huge
311    /// up-front allocation.
312    #[cfg(test)]
313    fn current_buffer_capacity(&self) -> Option<usize> {
314        self.current_buffer.as_ref().map(|b| b.capacity())
315    }
316
317    fn try_process_current_buffer(&mut self) {
318        let ready =
319            matches!(&self.current_buffer, Some(buf) if buf.len() >= self.expected_data_length);
320        if !ready {
321            return;
322        }
323        let buf = self.current_buffer.take().unwrap();
324        self.process_data(buf.freeze());
325        self.expected_data_length = 0;
326    }
327
328    fn consume_stashed(&mut self) -> Result<(), CorruptData> {
329        loop {
330            let spot = self.window_start_sequence.rem_euclid(self.max_queued()) as usize;
331            let Some(item) = self.stash[spot].take() else {
332                break;
333            };
334
335            if item.is_fragment {
336                self.write_immediate_fragment(&item.data)?;
337                self.try_process_current_buffer();
338            } else {
339                self.process_data(item.data);
340            }
341
342            self.window_start_sequence += 1;
343        }
344        Ok(())
345    }
346
347    fn process_data(&mut self, data: Bytes) {
348        if data.len() > 2 && data[0..2] == MULTI_DATA_INDICATOR {
349            let mut reader = BinaryReader::new(&data);
350            // Skip the multi-data indicator.
351            if reader.skip(2).is_err() {
352                return;
353            }
354            while reader.remaining() > 0 {
355                let len = match data_bundle::read(&mut reader) {
356                    Ok(l) => l as usize,
357                    Err(_) => break,
358                };
359                let start = reader.offset();
360                if reader.skip(len).is_err() {
361                    break;
362                }
363                let chunk = data.slice(start..start + len);
364                self.decrypt_and_handle(chunk);
365            }
366        } else {
367            self.decrypt_and_handle(data);
368        }
369    }
370
371    fn decrypt_and_handle(&mut self, data: Bytes) {
372        let processed = match &mut self.cipher {
373            Some(cipher) => {
374                // A single leading 0x00 byte may pad encrypted data; ignore it.
375                let d = if data.len() > 1 && data[0] == 0 {
376                    data.slice(1..)
377                } else {
378                    data
379                };
380                let mut buf = BytesMut::from(&d[..]);
381                cipher.transform_in_place(&mut buf);
382                buf.freeze()
383            }
384            None => data,
385        };
386
387        self.stats.total_received_bytes += processed.len() as u64;
388        self.app_data.push(processed);
389    }
390}
391
392#[cfg(test)]
393mod tests {
394    use super::*;
395
396    /// A monotonic test clock that advances by 1ms on each read, so that
397    /// zero-delay ack logic fires deterministically.
398    struct Clock {
399        now: Instant,
400    }
401
402    impl Clock {
403        fn new() -> Self {
404            Self {
405                now: Instant::now(),
406            }
407        }
408        fn tick(&mut self) -> Instant {
409            self.now += Duration::from_millis(1);
410            self.now
411        }
412    }
413
414    fn config(acknowledge_all_data: bool) -> InputConfig {
415        InputConfig {
416            acknowledge_all_data,
417            max_ack_delay: Duration::ZERO,
418            ..InputConfig::default()
419        }
420    }
421
422    /// Builds a reliable data/fragment packet body (sequence + optional complete
423    /// length + data), returning (packet, data).
424    fn data_fragment(
425        sequence: u16,
426        complete_len: Option<u32>,
427        data_len: usize,
428    ) -> (Vec<u8>, Vec<u8>) {
429        let data: Vec<u8> = (0..data_len)
430            .map(|i| (i as u8).wrapping_mul(7).wrapping_add(sequence as u8))
431            .collect();
432        let mut buf = Vec::new();
433        buf.extend_from_slice(&sequence.to_be_bytes());
434        if let Some(cl) = complete_len {
435            buf.extend_from_slice(&cl.to_be_bytes());
436        }
437        buf.extend_from_slice(&data);
438        (buf, data)
439    }
440
441    /// Runs a tick, collects any newly-emitted acks into `pending`, then asserts the
442    /// front of `pending` matches the expected op/seq and pops it.
443    fn assert_pop_ack(
444        ch: &mut ReliableDataInputChannel,
445        clock: &mut Clock,
446        pending: &mut Vec<OutgoingContextual>,
447        expected_sequence: u16,
448        expect_all: bool,
449    ) {
450        ch.run_tick(clock.tick());
451        pending.extend(ch.take_outgoing());
452        assert!(!pending.is_empty(), "expected an ack to be pending");
453        let ack = pending.remove(0);
454        let expected_op = if expect_all {
455            OpCode::AcknowledgeAll
456        } else {
457            OpCode::Acknowledge
458        };
459        assert_eq!(ack.op_code, expected_op);
460        assert_eq!(ack.sequence, expected_sequence);
461    }
462
463    const DATA_LENGTH: usize = 16;
464
465    fn new_channel(clock: &Clock, ack_all: bool) -> ReliableDataInputChannel {
466        ReliableDataInputChannel::new(config(ack_all), None, clock.now)
467    }
468
469    fn run_sequential_fragment_insert(ack_all: bool) {
470        let mut clock = Clock::new();
471        let mut ch = new_channel(&clock, ack_all);
472        let mut pending: Vec<OutgoingContextual> = Vec::new();
473
474        let (f0, d0) = data_fragment(0, Some((DATA_LENGTH * 3) as u32), DATA_LENGTH);
475        let (f1, d1) = data_fragment(1, None, DATA_LENGTH);
476        let (f2, d2) = data_fragment(2, None, DATA_LENGTH);
477
478        ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&f0), clock.tick())
479            .unwrap();
480        assert_pop_ack(&mut ch, &mut clock, &mut pending, 0, !ack_all);
481        assert!(ch.take_app_data().is_empty());
482
483        ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&f1), clock.tick())
484            .unwrap();
485        assert_pop_ack(&mut ch, &mut clock, &mut pending, 1, !ack_all);
486        assert!(ch.take_app_data().is_empty());
487
488        ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&f2), clock.tick())
489            .unwrap();
490        assert_pop_ack(&mut ch, &mut clock, &mut pending, 2, !ack_all);
491        let app = ch.take_app_data();
492        assert_eq!(app.len(), 1);
493
494        let stitched = &app[0];
495        assert_eq!(&stitched[0..DATA_LENGTH], &d0[..]);
496        assert_eq!(&stitched[DATA_LENGTH..DATA_LENGTH * 2], &d1[..]);
497        assert_eq!(&stitched[DATA_LENGTH * 2..], &d2[..]);
498        assert!(pending.is_empty(), "no superfluous acks");
499    }
500
501    #[test]
502    fn sequential_fragment_insert() {
503        run_sequential_fragment_insert(true);
504        run_sequential_fragment_insert(false);
505    }
506
507    fn run_non_sequential_fragment_insert(ack_all: bool) {
508        let mut clock = Clock::new();
509        let mut ch = new_channel(&clock, ack_all);
510        let mut pending: Vec<OutgoingContextual> = Vec::new();
511
512        let (f0, d0) = data_fragment(0, Some((DATA_LENGTH * 3) as u32), DATA_LENGTH);
513        let (f1, d1) = data_fragment(1, None, DATA_LENGTH);
514        let (f2, d2) = data_fragment(2, None, DATA_LENGTH);
515
516        ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&f2), clock.tick())
517            .unwrap();
518        assert_pop_ack(&mut ch, &mut clock, &mut pending, 2, false);
519
520        ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&f0), clock.tick())
521            .unwrap();
522        assert_pop_ack(&mut ch, &mut clock, &mut pending, 0, !ack_all);
523        assert!(ch.take_app_data().is_empty());
524
525        ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&f1), clock.tick())
526            .unwrap();
527        assert_pop_ack(
528            &mut ch,
529            &mut clock,
530            &mut pending,
531            if ack_all { 1 } else { 2 },
532            !ack_all,
533        );
534        let app = ch.take_app_data();
535        assert_eq!(app.len(), 1);
536
537        let stitched = &app[0];
538        assert_eq!(&stitched[0..DATA_LENGTH], &d0[..]);
539        assert_eq!(&stitched[DATA_LENGTH..DATA_LENGTH * 2], &d1[..]);
540        assert_eq!(&stitched[DATA_LENGTH * 2..], &d2[..]);
541        assert!(pending.is_empty(), "no superfluous acks");
542    }
543
544    #[test]
545    fn non_sequential_fragment_insert() {
546        run_non_sequential_fragment_insert(true);
547        run_non_sequential_fragment_insert(false);
548    }
549
550    fn run_non_fragment_insert(ack_all: bool) {
551        let mut clock = Clock::new();
552        let mut ch = new_channel(&clock, ack_all);
553        let mut pending: Vec<OutgoingContextual> = Vec::new();
554
555        let (p0, d0) = data_fragment(0, None, DATA_LENGTH);
556        let (p1, d1) = data_fragment(1, None, DATA_LENGTH);
557        let (p2, d2) = data_fragment(2, None, DATA_LENGTH);
558
559        ch.handle_reliable_data(Bytes::copy_from_slice(&p0), clock.tick())
560            .unwrap();
561        assert_pop_ack(&mut ch, &mut clock, &mut pending, 0, !ack_all);
562        let app = ch.take_app_data();
563        assert_eq!(app, vec![d0]);
564
565        ch.handle_reliable_data(Bytes::copy_from_slice(&p2), clock.tick())
566            .unwrap();
567        assert_pop_ack(&mut ch, &mut clock, &mut pending, 2, false);
568
569        ch.handle_reliable_data(Bytes::copy_from_slice(&p1), clock.tick())
570            .unwrap();
571        assert_pop_ack(
572            &mut ch,
573            &mut clock,
574            &mut pending,
575            if ack_all { 1 } else { 2 },
576            !ack_all,
577        );
578        let app = ch.take_app_data();
579        assert_eq!(app, vec![d1, d2]);
580        assert!(pending.is_empty(), "no superfluous acks");
581    }
582
583    #[test]
584    fn non_fragment_insert() {
585        run_non_fragment_insert(true);
586        run_non_fragment_insert(false);
587    }
588
589    fn run_fragmented_insert_of_two_datas(ack_all: bool) {
590        let mut clock = Clock::new();
591        let mut ch = new_channel(&clock, ack_all);
592        let mut pending: Vec<OutgoingContextual> = Vec::new();
593
594        let (f0, d0) = data_fragment(0, Some((DATA_LENGTH * 2) as u32), DATA_LENGTH);
595        let (f1, d1) = data_fragment(1, None, DATA_LENGTH);
596        let (f2, d2) = data_fragment(2, Some((DATA_LENGTH * 2) as u32), DATA_LENGTH);
597        let (f3, d3) = data_fragment(3, None, DATA_LENGTH);
598
599        ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&f0), clock.tick())
600            .unwrap();
601        assert_pop_ack(&mut ch, &mut clock, &mut pending, 0, !ack_all);
602        assert!(ch.take_app_data().is_empty());
603
604        ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&f1), clock.tick())
605            .unwrap();
606        assert_pop_ack(&mut ch, &mut clock, &mut pending, 1, !ack_all);
607        let app = ch.take_app_data();
608        assert_eq!(app.len(), 1);
609        assert_eq!(&app[0][..DATA_LENGTH], &d0[..]);
610        assert_eq!(&app[0][DATA_LENGTH..], &d1[..]);
611
612        ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&f3), clock.tick())
613            .unwrap();
614        assert_pop_ack(&mut ch, &mut clock, &mut pending, 3, false);
615        assert!(ch.take_app_data().is_empty());
616
617        ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&f2), clock.tick())
618            .unwrap();
619        assert_pop_ack(
620            &mut ch,
621            &mut clock,
622            &mut pending,
623            if ack_all { 2 } else { 3 },
624            !ack_all,
625        );
626        let app = ch.take_app_data();
627        assert_eq!(app.len(), 1);
628        assert_eq!(&app[0][..DATA_LENGTH], &d2[..]);
629        assert_eq!(&app[0][DATA_LENGTH..], &d3[..]);
630    }
631
632    #[test]
633    fn fragmented_insert_of_two_datas() {
634        run_fragmented_insert_of_two_datas(true);
635        run_fragmented_insert_of_two_datas(false);
636    }
637
638    fn run_sequence_waiting_on_data(ack_all: bool) {
639        let mut clock = Clock::new();
640        let mut ch = new_channel(&clock, ack_all);
641        let mut pending: Vec<OutgoingContextual> = Vec::new();
642
643        let (p0, d0) = data_fragment(0, None, DATA_LENGTH);
644        let (f1, d1) = data_fragment(1, Some((DATA_LENGTH * 2) as u32), DATA_LENGTH);
645        let (f2, d2) = data_fragment(2, None, DATA_LENGTH);
646
647        ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&f1), clock.tick())
648            .unwrap();
649        ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&f2), clock.tick())
650            .unwrap();
651        assert_pop_ack(&mut ch, &mut clock, &mut pending, 1, false);
652        assert_pop_ack(&mut ch, &mut clock, &mut pending, 2, false);
653
654        ch.handle_reliable_data(Bytes::copy_from_slice(&p0), clock.tick())
655            .unwrap();
656        assert_pop_ack(
657            &mut ch,
658            &mut clock,
659            &mut pending,
660            if ack_all { 0 } else { 2 },
661            !ack_all,
662        );
663
664        let app = ch.take_app_data();
665        assert_eq!(app.len(), 2);
666        assert_eq!(app[0], d0);
667        assert_eq!(&app[1][..DATA_LENGTH], &d1[..]);
668        assert_eq!(&app[1][DATA_LENGTH..], &d2[..]);
669    }
670
671    #[test]
672    fn sequence_waiting_on_data() {
673        run_sequence_waiting_on_data(true);
674        run_sequence_waiting_on_data(false);
675    }
676
677    fn run_multi_data(ack_all: bool) {
678        let mut clock = Clock::new();
679        let mut ch = new_channel(&clock, ack_all);
680        let mut pending: Vec<OutgoingContextual> = Vec::new();
681
682        // [seq u16][00 19][len=1][2][len=1][4]. A data-bundle length of 1 encodes
683        // as a single 0x01 byte.
684        let mut multi = vec![0u8, 0]; // sequence 0
685        multi.extend_from_slice(&MULTI_DATA_INDICATOR);
686        multi.extend_from_slice(&[1, 2]); // length 1, data byte 2
687        multi.extend_from_slice(&[1, 4]); // length 1, data byte 4
688
689        ch.handle_reliable_data(Bytes::copy_from_slice(&multi), clock.tick())
690            .unwrap();
691        assert_pop_ack(&mut ch, &mut clock, &mut pending, 0, !ack_all);
692        assert_eq!(ch.take_app_data(), vec![vec![2u8], vec![4u8]]);
693
694        multi[1] = 0x01; // increment sequence to 1
695        ch.handle_reliable_data(Bytes::copy_from_slice(&multi), clock.tick())
696            .unwrap();
697        assert_pop_ack(&mut ch, &mut clock, &mut pending, 1, !ack_all);
698        assert_eq!(ch.take_app_data(), vec![vec![2u8], vec![4u8]]);
699    }
700
701    #[test]
702    fn multi_data() {
703        run_multi_data(true);
704        run_multi_data(false);
705    }
706
707    /// Regression test for the long-connection sequence-wraparound bug: once the
708    /// window advances past the 16-bit boundary, the ack-all throttle must keep
709    /// working. Previously `last_ack_all_sequence` stored only the truncated 16-bit
710    /// wire value while `to_ack` was the full sequence, so after wraparound the two
711    /// could never compare equal and the channel emitted a redundant `AcknowledgeAll`
712    /// on every single tick for the rest of the connection.
713    #[test]
714    fn ack_all_throttled_after_sequence_wraparound() {
715        let mut clock = Clock::new();
716        // Not ack-all-per-packet mode, so ack-alls come from run_tick.
717        let mut ch = new_channel(&clock, false);
718
719        // Feed enough in-order reliable packets to push the window past 2^16.
720        let total: u32 = 65_540;
721        for i in 0..total {
722            let (pkt, _) = data_fragment((i & 0xFFFF) as u16, None, DATA_LENGTH);
723            ch.handle_reliable_data(Bytes::copy_from_slice(&pkt), clock.tick())
724                .unwrap();
725            // Drop accumulated acks/app-data to keep memory bounded.
726            ch.take_outgoing();
727            ch.take_app_data();
728        }
729
730        // First tick after the data: a single ack-all for the current window is
731        // expected (and arms the throttle).
732        ch.run_tick(clock.tick());
733        let first = ch.take_outgoing();
734        assert_eq!(first.len(), 1, "expected exactly one ack-all");
735        assert_eq!(first[0].op_code, OpCode::AcknowledgeAll);
736        assert_eq!(first[0].sequence, ((total - 1) & 0xFFFF) as u16);
737
738        // No new data has arrived, so subsequent ticks must NOT re-emit an ack-all.
739        // (max_ack_delay is ZERO in the test config, so any emission here is the bug.)
740        for _ in 0..5 {
741            ch.run_tick(clock.tick());
742            assert!(
743                ch.take_outgoing().is_empty(),
744                "ack-all throttle broke after wraparound: redundant ack emitted"
745            );
746        }
747    }
748
749    /// A master fragment must carry a 4-byte total-length prefix. A shorter one used
750    /// to be indexed unconditionally (`data[0..4]`), panicking the whole driver on a
751    /// hostile 1–3 byte fragment (a remote denial-of-service). It must now be reported
752    /// as corrupt instead.
753    #[test]
754    fn short_master_fragment_is_rejected_without_panic() {
755        let mut clock = Clock::new();
756        let mut ch = new_channel(&clock, false);
757
758        // Sequence 0 (the master fragment) with only 2 of the required 4 length bytes.
759        let mut pkt = Vec::new();
760        pkt.extend_from_slice(&0u16.to_be_bytes());
761        pkt.extend_from_slice(&[0xAB, 0xCD]);
762
763        let result = ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&pkt), clock.tick());
764        assert_eq!(result, Err(CorruptData));
765    }
766
767    /// A master fragment self-reports its total length. A malicious one can claim a
768    /// huge length (up to 4 GiB) while carrying only a few payload bytes; the channel
769    /// must not pre-allocate a matching buffer. This completes quickly only because
770    /// the up-front allocation is capped; an uncapped `with_capacity(u32::MAX)` would
771    /// attempt a multi-gigabyte allocation here.
772    #[test]
773    fn huge_claimed_length_master_fragment_does_not_preallocate() {
774        let mut clock = Clock::new();
775        let mut ch = new_channel(&clock, false);
776
777        let (pkt, _) = data_fragment(0, Some(u32::MAX), DATA_LENGTH);
778        // Must not panic/OOM, and a single fragment is valid so far (more awaited).
779        let result = ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&pkt), clock.tick());
780        assert_eq!(result, Ok(()));
781        // The reassembly buffer must NOT have pre-allocated the claimed 4 GiB; it is
782        // capped regardless of the self-reported length. (Without the cap this is
783        // ~u32::MAX bytes — the actual amplification attack.)
784        assert!(
785            ch.current_buffer_capacity().unwrap() <= MAX_FRAGMENT_PREALLOC,
786            "reassembly buffer pre-allocated more than the cap from a hostile length"
787        );
788        // The (incomplete) reassembly has produced no application data yet.
789        assert!(ch.take_app_data().is_empty());
790    }
791}