soe-protocol 1.0.1

Sony Online Entertainment protocol implementation in Rust.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
//! The reliable data input channel: validates, orders and reassembles incoming
//! reliable data, decrypts/unbundles it, and emits acknowledgements.
//!
//! This is an I/O-agnostic component. Incoming packets are fed via
//! [`ReliableDataInputChannel::handle_reliable_data`] /
//! [`ReliableDataInputChannel::handle_reliable_data_fragment`], and the channel
//! accumulates outgoing acknowledgement packets (drained via
//! [`ReliableDataInputChannel::take_outgoing`]) and decoded application data
//! (drained via [`ReliableDataInputChannel::take_app_data`]). Time is supplied by
//! the caller as [`Instant`] values.

use std::time::{Duration, Instant};

use bytes::{Bytes, BytesMut};

use crate::constants::MULTI_DATA_INDICATOR;
use crate::io::BinaryReader;
use crate::protocol::OpCode;
use crate::rc4::Rc4KeyState;
use crate::varint::data_bundle;

use super::true_incoming_sequence;

/// The size of a master fragment's big-endian `u32` total-length prefix.
const FRAGMENT_COMPLETE_LENGTH_SIZE: usize = 4;

/// An upper bound on how much capacity to pre-allocate for a reassembly buffer based
/// on a fragment's self-reported total length. The buffer still grows to fit data
/// that actually arrives; this only prevents a malicious master fragment from forcing
/// a huge up-front allocation from a tiny packet.
const MAX_FRAGMENT_PREALLOC: usize = 64 * 1024;

/// Signals that incoming reliable data was malformed. The session should respond by
/// terminating the connection as [`DisconnectReason::CorruptPacket`](crate::protocol::DisconnectReason::CorruptPacket).
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct CorruptData;

/// Statistics gathered while receiving reliable data.
#[derive(Debug, Default, Clone)]
pub struct DataInputStats {
    /// Total reliable data packets received, including duplicates.
    pub total_received: u64,
    /// Number of duplicate reliable data packets received.
    pub duplicate_count: u64,
    /// Number of reliable data packets received out of order.
    pub out_of_order_count: u64,
    /// Total application bytes received (excluding indicators/padding).
    pub total_received_bytes: u64,
    /// Number of acknowledgement packets emitted.
    pub acknowledge_count: u64,
}

/// Configuration controlling the input channel's behaviour.
#[derive(Debug, Clone)]
pub struct InputConfig {
    /// Maximum number of incoming reliable data packets that may be stashed.
    pub max_queued_incoming: u16,
    /// Whether every data packet is acknowledged individually.
    pub acknowledge_all_data: bool,
    /// The acknowledgement window used to decide when to send an `AcknowledgeAll`.
    pub data_ack_window: u16,
    /// Maximum delay before acknowledging incoming reliable data sequences.
    pub max_ack_delay: Duration,
}

impl Default for InputConfig {
    fn default() -> Self {
        Self {
            max_queued_incoming: 256,
            acknowledge_all_data: false,
            data_ack_window: 32,
            max_ack_delay: Duration::from_millis(2),
        }
    }
}

/// A contextual packet the channel wishes to send (without OP code or CRC framing,
/// which the session layer applies). For this channel it is always an acknowledgement
/// carrying a single sequence number.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct OutgoingContextual {
    /// The OP code of the packet ([`OpCode::Acknowledge`] or [`OpCode::AcknowledgeAll`]).
    pub op_code: OpCode,
    /// The acknowledged sequence number.
    pub sequence: u16,
}

#[derive(Debug)]
struct Stashed {
    data: Bytes,
    is_fragment: bool,
}

/// Handles reliable data packets, extracting the proxied application data in order.
#[derive(Debug)]
pub struct ReliableDataInputChannel {
    config: InputConfig,
    cipher: Option<Rc4KeyState>,

    /// The next reliable data sequence we expect to receive.
    window_start_sequence: i64,

    /// Buffer accumulating fragments of the current data unit.
    current_buffer: Option<BytesMut>,
    /// The expected total length of the current data unit.
    expected_data_length: usize,

    /// The last sequence we acknowledged via `AcknowledgeAll`.
    last_ack_all_sequence: i64,
    last_ack_all_time: Instant,

    stash: Vec<Option<Stashed>>,
    stats: DataInputStats,

    outgoing: Vec<OutgoingContextual>,
    app_data: Vec<Bytes>,
}

impl ReliableDataInputChannel {
    /// Creates a new input channel. `cipher` is the initial RC4 key state; pass
    /// `Some(..)` to enable RC4 decryption of the proxied application data, or
    /// `None` to pass it through unencrypted.
    pub fn new(config: InputConfig, cipher: Option<Rc4KeyState>, now: Instant) -> Self {
        let capacity = config.max_queued_incoming as usize;
        let stash = std::iter::repeat_with(|| None).take(capacity).collect();

        Self {
            config,
            cipher,
            window_start_sequence: 0,
            current_buffer: None,
            expected_data_length: 0,
            last_ack_all_sequence: -1,
            last_ack_all_time: now,
            stash,
            stats: DataInputStats::default(),
            outgoing: Vec::new(),
            app_data: Vec::new(),
        }
    }

    /// Returns the gathered input statistics.
    pub fn stats(&self) -> &DataInputStats {
        &self.stats
    }

    /// Drains the outgoing acknowledgement packets accumulated so far.
    pub fn take_outgoing(&mut self) -> Vec<OutgoingContextual> {
        std::mem::take(&mut self.outgoing)
    }

    /// Drains the decoded application data buffers accumulated so far.
    pub fn take_app_data(&mut self) -> Vec<Bytes> {
        std::mem::take(&mut self.app_data)
    }

    fn max_queued(&self) -> i64 {
        self.config.max_queued_incoming as i64
    }

    /// Runs periodic channel logic: emits a buffered `AcknowledgeAll` when due.
    pub fn run_tick(&mut self, now: Instant) {
        let to_ack = self.window_start_sequence - 1;

        // No need to ack-all if acking everything individually, or we've already
        // acked up to the current window start.
        if self.config.acknowledge_all_data || to_ack <= self.last_ack_all_sequence {
            return;
        }

        let need_ack = now.duration_since(self.last_ack_all_time) > self.config.max_ack_delay
            || to_ack >= self.last_ack_all_sequence + (self.config.data_ack_window / 2) as i64;

        if need_ack {
            self.send_ack_all(to_ack, now);
        }
    }

    /// Handles a [`OpCode::ReliableData`] packet (OP code already stripped). Returns
    /// `Err` if the data (or any stashed fragment it releases) is malformed.
    pub fn handle_reliable_data(&mut self, data: Bytes, now: Instant) -> Result<(), CorruptData> {
        if !self.preprocess(&data, false, now) {
            return Ok(());
        }
        self.process_data(data.slice(2..));
        self.window_start_sequence += 1;
        self.consume_stashed()
    }

    /// Handles a [`OpCode::ReliableDataFragment`] packet (OP code already stripped).
    /// Returns `Err` if the fragment (or any stashed fragment it releases) is
    /// malformed.
    pub fn handle_reliable_data_fragment(
        &mut self,
        data: Bytes,
        now: Instant,
    ) -> Result<(), CorruptData> {
        if !self.preprocess(&data, true, now) {
            return Ok(());
        }
        self.write_immediate_fragment(&data[2..])?;
        self.window_start_sequence += 1;
        self.try_process_current_buffer();
        self.consume_stashed()
    }

    fn emit(&mut self, op_code: OpCode, sequence: u16) {
        self.outgoing.push(OutgoingContextual { op_code, sequence });
    }

    fn send_ack_all(&mut self, sequence: i64, now: Instant) {
        self.emit(OpCode::AcknowledgeAll, sequence as u16);
        self.stats.acknowledge_count += 1;
        self.last_ack_all_sequence = sequence;
        self.last_ack_all_time = now;
    }

    /// Validates and, if necessary, stashes incoming reliable data. Returns `true`
    /// if the data (with its sequence stripped) should be processed immediately.
    fn preprocess(&mut self, data: &Bytes, is_fragment: bool, now: Instant) -> bool {
        self.stats.total_received += 1;

        let (sequence, packet_sequence) = match self.is_valid_reliable_data(data, now) {
            Some(v) => v,
            None => return false,
        };

        let ahead = sequence != self.window_start_sequence;

        // Ack now if in ack-all mode or this is ahead of our expectations.
        if self.config.acknowledge_all_data || ahead {
            self.emit(OpCode::Acknowledge, packet_sequence);
        }

        if !ahead {
            return true;
        }

        // Out of order: stash it.
        self.stats.out_of_order_count += 1;
        let spot = sequence.rem_euclid(self.max_queued()) as usize;
        if self.stash[spot].is_some() {
            self.stats.duplicate_count += 1;
            return false;
        }

        self.stash[spot] = Some(Stashed {
            data: data.slice(2..),
            is_fragment,
        });
        false
    }

    /// Checks whether the given data is within the current window. Returns the true
    /// sequence and embedded packet sequence if it should be processed/stashed.
    fn is_valid_reliable_data(&mut self, data: &[u8], now: Instant) -> Option<(i64, u16)> {
        if data.len() < 2 {
            return None;
        }
        let packet_sequence = u16::from_be_bytes([data[0], data[1]]);
        let sequence = true_incoming_sequence(
            packet_sequence,
            self.window_start_sequence,
            self.max_queued(),
        );

        // Too far ahead of our window; drop it.
        if sequence > self.window_start_sequence + self.max_queued() {
            return None;
        }

        // Inside the window.
        if sequence >= self.window_start_sequence {
            return Some((sequence, packet_sequence));
        }

        // Already processed: nudge the remote, but not too frequently.
        if now.duration_since(self.last_ack_all_time) < self.config.max_ack_delay {
            self.send_ack_all(self.window_start_sequence - 1, now);
        }
        self.stats.duplicate_count += 1;
        None
    }

    /// Appends fragment data to the current buffer, allocating it (and reading the
    /// total length prefix) if this is the master fragment. Returns `Err` if a master
    /// fragment is too short to contain its length prefix.
    fn write_immediate_fragment(&mut self, data: &[u8]) -> Result<(), CorruptData> {
        if let Some(buf) = &mut self.current_buffer {
            buf.extend_from_slice(data);
        } else {
            // A master fragment must carry a 4-byte total-length prefix. A shorter one
            // is malformed (and previously panicked when indexing the prefix).
            if data.len() < FRAGMENT_COMPLETE_LENGTH_SIZE {
                return Err(CorruptData);
            }
            let expected = u32::from_be_bytes([data[0], data[1], data[2], data[3]]) as usize;
            self.expected_data_length = expected;
            // Cap the up-front allocation: the buffer still grows to fit data that
            // actually arrives, but a tiny packet can't claim (e.g.) 4 GiB and force a
            // matching allocation.
            let mut buf = BytesMut::with_capacity(expected.min(MAX_FRAGMENT_PREALLOC));
            buf.extend_from_slice(&data[FRAGMENT_COMPLETE_LENGTH_SIZE..]);
            self.current_buffer = Some(buf);
        }
        Ok(())
    }

    /// The pre-allocated capacity of the in-progress reassembly buffer, if any.
    /// Used by tests to assert that a hostile master fragment can't force a huge
    /// up-front allocation.
    #[cfg(test)]
    fn current_buffer_capacity(&self) -> Option<usize> {
        self.current_buffer.as_ref().map(|b| b.capacity())
    }

    fn try_process_current_buffer(&mut self) {
        let ready =
            matches!(&self.current_buffer, Some(buf) if buf.len() >= self.expected_data_length);
        if !ready {
            return;
        }
        let buf = self.current_buffer.take().unwrap();
        self.process_data(buf.freeze());
        self.expected_data_length = 0;
    }

    fn consume_stashed(&mut self) -> Result<(), CorruptData> {
        loop {
            let spot = self.window_start_sequence.rem_euclid(self.max_queued()) as usize;
            let Some(item) = self.stash[spot].take() else {
                break;
            };

            if item.is_fragment {
                self.write_immediate_fragment(&item.data)?;
                self.try_process_current_buffer();
            } else {
                self.process_data(item.data);
            }

            self.window_start_sequence += 1;
        }
        Ok(())
    }

    fn process_data(&mut self, data: Bytes) {
        if data.len() > 2 && data[0..2] == MULTI_DATA_INDICATOR {
            let mut reader = BinaryReader::new(&data);
            // Skip the multi-data indicator.
            if reader.skip(2).is_err() {
                return;
            }
            while reader.remaining() > 0 {
                let len = match data_bundle::read(&mut reader) {
                    Ok(l) => l as usize,
                    Err(_) => break,
                };
                let start = reader.offset();
                if reader.skip(len).is_err() {
                    break;
                }
                let chunk = data.slice(start..start + len);
                self.decrypt_and_handle(chunk);
            }
        } else {
            self.decrypt_and_handle(data);
        }
    }

    fn decrypt_and_handle(&mut self, data: Bytes) {
        let processed = match &mut self.cipher {
            Some(cipher) => {
                // A single leading 0x00 byte may pad encrypted data; ignore it.
                let d = if data.len() > 1 && data[0] == 0 {
                    data.slice(1..)
                } else {
                    data
                };
                let mut buf = BytesMut::from(&d[..]);
                cipher.transform_in_place(&mut buf);
                buf.freeze()
            }
            None => data,
        };

        self.stats.total_received_bytes += processed.len() as u64;
        self.app_data.push(processed);
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    /// A monotonic test clock that advances by 1ms on each read, so that
    /// zero-delay ack logic fires deterministically.
    struct Clock {
        now: Instant,
    }

    impl Clock {
        fn new() -> Self {
            Self {
                now: Instant::now(),
            }
        }
        fn tick(&mut self) -> Instant {
            self.now += Duration::from_millis(1);
            self.now
        }
    }

    fn config(acknowledge_all_data: bool) -> InputConfig {
        InputConfig {
            acknowledge_all_data,
            max_ack_delay: Duration::ZERO,
            ..InputConfig::default()
        }
    }

    /// Builds a reliable data/fragment packet body (sequence + optional complete
    /// length + data), returning (packet, data).
    fn data_fragment(
        sequence: u16,
        complete_len: Option<u32>,
        data_len: usize,
    ) -> (Vec<u8>, Vec<u8>) {
        let data: Vec<u8> = (0..data_len)
            .map(|i| (i as u8).wrapping_mul(7).wrapping_add(sequence as u8))
            .collect();
        let mut buf = Vec::new();
        buf.extend_from_slice(&sequence.to_be_bytes());
        if let Some(cl) = complete_len {
            buf.extend_from_slice(&cl.to_be_bytes());
        }
        buf.extend_from_slice(&data);
        (buf, data)
    }

    /// Runs a tick, collects any newly-emitted acks into `pending`, then asserts the
    /// front of `pending` matches the expected op/seq and pops it.
    fn assert_pop_ack(
        ch: &mut ReliableDataInputChannel,
        clock: &mut Clock,
        pending: &mut Vec<OutgoingContextual>,
        expected_sequence: u16,
        expect_all: bool,
    ) {
        ch.run_tick(clock.tick());
        pending.extend(ch.take_outgoing());
        assert!(!pending.is_empty(), "expected an ack to be pending");
        let ack = pending.remove(0);
        let expected_op = if expect_all {
            OpCode::AcknowledgeAll
        } else {
            OpCode::Acknowledge
        };
        assert_eq!(ack.op_code, expected_op);
        assert_eq!(ack.sequence, expected_sequence);
    }

    const DATA_LENGTH: usize = 16;

    fn new_channel(clock: &Clock, ack_all: bool) -> ReliableDataInputChannel {
        ReliableDataInputChannel::new(config(ack_all), None, clock.now)
    }

    fn run_sequential_fragment_insert(ack_all: bool) {
        let mut clock = Clock::new();
        let mut ch = new_channel(&clock, ack_all);
        let mut pending: Vec<OutgoingContextual> = Vec::new();

        let (f0, d0) = data_fragment(0, Some((DATA_LENGTH * 3) as u32), DATA_LENGTH);
        let (f1, d1) = data_fragment(1, None, DATA_LENGTH);
        let (f2, d2) = data_fragment(2, None, DATA_LENGTH);

        ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&f0), clock.tick())
            .unwrap();
        assert_pop_ack(&mut ch, &mut clock, &mut pending, 0, !ack_all);
        assert!(ch.take_app_data().is_empty());

        ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&f1), clock.tick())
            .unwrap();
        assert_pop_ack(&mut ch, &mut clock, &mut pending, 1, !ack_all);
        assert!(ch.take_app_data().is_empty());

        ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&f2), clock.tick())
            .unwrap();
        assert_pop_ack(&mut ch, &mut clock, &mut pending, 2, !ack_all);
        let app = ch.take_app_data();
        assert_eq!(app.len(), 1);

        let stitched = &app[0];
        assert_eq!(&stitched[0..DATA_LENGTH], &d0[..]);
        assert_eq!(&stitched[DATA_LENGTH..DATA_LENGTH * 2], &d1[..]);
        assert_eq!(&stitched[DATA_LENGTH * 2..], &d2[..]);
        assert!(pending.is_empty(), "no superfluous acks");
    }

    #[test]
    fn sequential_fragment_insert() {
        run_sequential_fragment_insert(true);
        run_sequential_fragment_insert(false);
    }

    fn run_non_sequential_fragment_insert(ack_all: bool) {
        let mut clock = Clock::new();
        let mut ch = new_channel(&clock, ack_all);
        let mut pending: Vec<OutgoingContextual> = Vec::new();

        let (f0, d0) = data_fragment(0, Some((DATA_LENGTH * 3) as u32), DATA_LENGTH);
        let (f1, d1) = data_fragment(1, None, DATA_LENGTH);
        let (f2, d2) = data_fragment(2, None, DATA_LENGTH);

        ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&f2), clock.tick())
            .unwrap();
        assert_pop_ack(&mut ch, &mut clock, &mut pending, 2, false);

        ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&f0), clock.tick())
            .unwrap();
        assert_pop_ack(&mut ch, &mut clock, &mut pending, 0, !ack_all);
        assert!(ch.take_app_data().is_empty());

        ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&f1), clock.tick())
            .unwrap();
        assert_pop_ack(
            &mut ch,
            &mut clock,
            &mut pending,
            if ack_all { 1 } else { 2 },
            !ack_all,
        );
        let app = ch.take_app_data();
        assert_eq!(app.len(), 1);

        let stitched = &app[0];
        assert_eq!(&stitched[0..DATA_LENGTH], &d0[..]);
        assert_eq!(&stitched[DATA_LENGTH..DATA_LENGTH * 2], &d1[..]);
        assert_eq!(&stitched[DATA_LENGTH * 2..], &d2[..]);
        assert!(pending.is_empty(), "no superfluous acks");
    }

    #[test]
    fn non_sequential_fragment_insert() {
        run_non_sequential_fragment_insert(true);
        run_non_sequential_fragment_insert(false);
    }

    fn run_non_fragment_insert(ack_all: bool) {
        let mut clock = Clock::new();
        let mut ch = new_channel(&clock, ack_all);
        let mut pending: Vec<OutgoingContextual> = Vec::new();

        let (p0, d0) = data_fragment(0, None, DATA_LENGTH);
        let (p1, d1) = data_fragment(1, None, DATA_LENGTH);
        let (p2, d2) = data_fragment(2, None, DATA_LENGTH);

        ch.handle_reliable_data(Bytes::copy_from_slice(&p0), clock.tick())
            .unwrap();
        assert_pop_ack(&mut ch, &mut clock, &mut pending, 0, !ack_all);
        let app = ch.take_app_data();
        assert_eq!(app, vec![d0]);

        ch.handle_reliable_data(Bytes::copy_from_slice(&p2), clock.tick())
            .unwrap();
        assert_pop_ack(&mut ch, &mut clock, &mut pending, 2, false);

        ch.handle_reliable_data(Bytes::copy_from_slice(&p1), clock.tick())
            .unwrap();
        assert_pop_ack(
            &mut ch,
            &mut clock,
            &mut pending,
            if ack_all { 1 } else { 2 },
            !ack_all,
        );
        let app = ch.take_app_data();
        assert_eq!(app, vec![d1, d2]);
        assert!(pending.is_empty(), "no superfluous acks");
    }

    #[test]
    fn non_fragment_insert() {
        run_non_fragment_insert(true);
        run_non_fragment_insert(false);
    }

    fn run_fragmented_insert_of_two_datas(ack_all: bool) {
        let mut clock = Clock::new();
        let mut ch = new_channel(&clock, ack_all);
        let mut pending: Vec<OutgoingContextual> = Vec::new();

        let (f0, d0) = data_fragment(0, Some((DATA_LENGTH * 2) as u32), DATA_LENGTH);
        let (f1, d1) = data_fragment(1, None, DATA_LENGTH);
        let (f2, d2) = data_fragment(2, Some((DATA_LENGTH * 2) as u32), DATA_LENGTH);
        let (f3, d3) = data_fragment(3, None, DATA_LENGTH);

        ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&f0), clock.tick())
            .unwrap();
        assert_pop_ack(&mut ch, &mut clock, &mut pending, 0, !ack_all);
        assert!(ch.take_app_data().is_empty());

        ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&f1), clock.tick())
            .unwrap();
        assert_pop_ack(&mut ch, &mut clock, &mut pending, 1, !ack_all);
        let app = ch.take_app_data();
        assert_eq!(app.len(), 1);
        assert_eq!(&app[0][..DATA_LENGTH], &d0[..]);
        assert_eq!(&app[0][DATA_LENGTH..], &d1[..]);

        ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&f3), clock.tick())
            .unwrap();
        assert_pop_ack(&mut ch, &mut clock, &mut pending, 3, false);
        assert!(ch.take_app_data().is_empty());

        ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&f2), clock.tick())
            .unwrap();
        assert_pop_ack(
            &mut ch,
            &mut clock,
            &mut pending,
            if ack_all { 2 } else { 3 },
            !ack_all,
        );
        let app = ch.take_app_data();
        assert_eq!(app.len(), 1);
        assert_eq!(&app[0][..DATA_LENGTH], &d2[..]);
        assert_eq!(&app[0][DATA_LENGTH..], &d3[..]);
    }

    #[test]
    fn fragmented_insert_of_two_datas() {
        run_fragmented_insert_of_two_datas(true);
        run_fragmented_insert_of_two_datas(false);
    }

    fn run_sequence_waiting_on_data(ack_all: bool) {
        let mut clock = Clock::new();
        let mut ch = new_channel(&clock, ack_all);
        let mut pending: Vec<OutgoingContextual> = Vec::new();

        let (p0, d0) = data_fragment(0, None, DATA_LENGTH);
        let (f1, d1) = data_fragment(1, Some((DATA_LENGTH * 2) as u32), DATA_LENGTH);
        let (f2, d2) = data_fragment(2, None, DATA_LENGTH);

        ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&f1), clock.tick())
            .unwrap();
        ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&f2), clock.tick())
            .unwrap();
        assert_pop_ack(&mut ch, &mut clock, &mut pending, 1, false);
        assert_pop_ack(&mut ch, &mut clock, &mut pending, 2, false);

        ch.handle_reliable_data(Bytes::copy_from_slice(&p0), clock.tick())
            .unwrap();
        assert_pop_ack(
            &mut ch,
            &mut clock,
            &mut pending,
            if ack_all { 0 } else { 2 },
            !ack_all,
        );

        let app = ch.take_app_data();
        assert_eq!(app.len(), 2);
        assert_eq!(app[0], d0);
        assert_eq!(&app[1][..DATA_LENGTH], &d1[..]);
        assert_eq!(&app[1][DATA_LENGTH..], &d2[..]);
    }

    #[test]
    fn sequence_waiting_on_data() {
        run_sequence_waiting_on_data(true);
        run_sequence_waiting_on_data(false);
    }

    fn run_multi_data(ack_all: bool) {
        let mut clock = Clock::new();
        let mut ch = new_channel(&clock, ack_all);
        let mut pending: Vec<OutgoingContextual> = Vec::new();

        // [seq u16][00 19][len=1][2][len=1][4]. A data-bundle length of 1 encodes
        // as a single 0x01 byte.
        let mut multi = vec![0u8, 0]; // sequence 0
        multi.extend_from_slice(&MULTI_DATA_INDICATOR);
        multi.extend_from_slice(&[1, 2]); // length 1, data byte 2
        multi.extend_from_slice(&[1, 4]); // length 1, data byte 4

        ch.handle_reliable_data(Bytes::copy_from_slice(&multi), clock.tick())
            .unwrap();
        assert_pop_ack(&mut ch, &mut clock, &mut pending, 0, !ack_all);
        assert_eq!(ch.take_app_data(), vec![vec![2u8], vec![4u8]]);

        multi[1] = 0x01; // increment sequence to 1
        ch.handle_reliable_data(Bytes::copy_from_slice(&multi), clock.tick())
            .unwrap();
        assert_pop_ack(&mut ch, &mut clock, &mut pending, 1, !ack_all);
        assert_eq!(ch.take_app_data(), vec![vec![2u8], vec![4u8]]);
    }

    #[test]
    fn multi_data() {
        run_multi_data(true);
        run_multi_data(false);
    }

    /// Regression test for the long-connection sequence-wraparound bug: once the
    /// window advances past the 16-bit boundary, the ack-all throttle must keep
    /// working. Previously `last_ack_all_sequence` stored only the truncated 16-bit
    /// wire value while `to_ack` was the full sequence, so after wraparound the two
    /// could never compare equal and the channel emitted a redundant `AcknowledgeAll`
    /// on every single tick for the rest of the connection.
    #[test]
    fn ack_all_throttled_after_sequence_wraparound() {
        let mut clock = Clock::new();
        // Not ack-all-per-packet mode, so ack-alls come from run_tick.
        let mut ch = new_channel(&clock, false);

        // Feed enough in-order reliable packets to push the window past 2^16.
        let total: u32 = 65_540;
        for i in 0..total {
            let (pkt, _) = data_fragment((i & 0xFFFF) as u16, None, DATA_LENGTH);
            ch.handle_reliable_data(Bytes::copy_from_slice(&pkt), clock.tick())
                .unwrap();
            // Drop accumulated acks/app-data to keep memory bounded.
            ch.take_outgoing();
            ch.take_app_data();
        }

        // First tick after the data: a single ack-all for the current window is
        // expected (and arms the throttle).
        ch.run_tick(clock.tick());
        let first = ch.take_outgoing();
        assert_eq!(first.len(), 1, "expected exactly one ack-all");
        assert_eq!(first[0].op_code, OpCode::AcknowledgeAll);
        assert_eq!(first[0].sequence, ((total - 1) & 0xFFFF) as u16);

        // No new data has arrived, so subsequent ticks must NOT re-emit an ack-all.
        // (max_ack_delay is ZERO in the test config, so any emission here is the bug.)
        for _ in 0..5 {
            ch.run_tick(clock.tick());
            assert!(
                ch.take_outgoing().is_empty(),
                "ack-all throttle broke after wraparound: redundant ack emitted"
            );
        }
    }

    /// A master fragment must carry a 4-byte total-length prefix. A shorter one used
    /// to be indexed unconditionally (`data[0..4]`), panicking the whole driver on a
    /// hostile 1–3 byte fragment (a remote denial-of-service). It must now be reported
    /// as corrupt instead.
    #[test]
    fn short_master_fragment_is_rejected_without_panic() {
        let mut clock = Clock::new();
        let mut ch = new_channel(&clock, false);

        // Sequence 0 (the master fragment) with only 2 of the required 4 length bytes.
        let mut pkt = Vec::new();
        pkt.extend_from_slice(&0u16.to_be_bytes());
        pkt.extend_from_slice(&[0xAB, 0xCD]);

        let result = ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&pkt), clock.tick());
        assert_eq!(result, Err(CorruptData));
    }

    /// A master fragment self-reports its total length. A malicious one can claim a
    /// huge length (up to 4 GiB) while carrying only a few payload bytes; the channel
    /// must not pre-allocate a matching buffer. This completes quickly only because
    /// the up-front allocation is capped; an uncapped `with_capacity(u32::MAX)` would
    /// attempt a multi-gigabyte allocation here.
    #[test]
    fn huge_claimed_length_master_fragment_does_not_preallocate() {
        let mut clock = Clock::new();
        let mut ch = new_channel(&clock, false);

        let (pkt, _) = data_fragment(0, Some(u32::MAX), DATA_LENGTH);
        // Must not panic/OOM, and a single fragment is valid so far (more awaited).
        let result = ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&pkt), clock.tick());
        assert_eq!(result, Ok(()));
        // The reassembly buffer must NOT have pre-allocated the claimed 4 GiB; it is
        // capped regardless of the self-reported length. (Without the cap this is
        // ~u32::MAX bytes — the actual amplification attack.)
        assert!(
            ch.current_buffer_capacity().unwrap() <= MAX_FRAGMENT_PREALLOC,
            "reassembly buffer pre-allocated more than the cap from a hostile length"
        );
        // The (incomplete) reassembly has produced no application data yet.
        assert!(ch.take_app_data().is_empty());
    }
}