1use std::time::{Duration, Instant};
13
14use bytes::{Bytes, BytesMut};
15
16use crate::constants::MULTI_DATA_INDICATOR;
17use crate::io::BinaryReader;
18use crate::protocol::OpCode;
19use crate::rc4::Rc4KeyState;
20use crate::varint::data_bundle;
21
22use super::true_incoming_sequence;
23
24const FRAGMENT_COMPLETE_LENGTH_SIZE: usize = 4;
26
27const MAX_FRAGMENT_PREALLOC: usize = 64 * 1024;
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub struct CorruptData;
37
38#[derive(Debug, Default, Clone)]
40pub struct DataInputStats {
41 pub total_received: u64,
43 pub duplicate_count: u64,
45 pub out_of_order_count: u64,
47 pub total_received_bytes: u64,
49 pub acknowledge_count: u64,
51}
52
53#[derive(Debug, Clone)]
55pub struct InputConfig {
56 pub max_queued_incoming: u16,
58 pub acknowledge_all_data: bool,
60 pub data_ack_window: u16,
62 pub max_ack_delay: Duration,
64}
65
66impl Default for InputConfig {
67 fn default() -> Self {
68 Self {
69 max_queued_incoming: 256,
70 acknowledge_all_data: false,
71 data_ack_window: 32,
72 max_ack_delay: Duration::from_millis(2),
73 }
74 }
75}
76
77#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub struct OutgoingContextual {
82 pub op_code: OpCode,
84 pub sequence: u16,
86}
87
88#[derive(Debug)]
89struct Stashed {
90 data: Bytes,
91 is_fragment: bool,
92}
93
94#[derive(Debug)]
96pub struct ReliableDataInputChannel {
97 config: InputConfig,
98 cipher: Option<Rc4KeyState>,
99
100 window_start_sequence: i64,
102
103 current_buffer: Option<BytesMut>,
105 expected_data_length: usize,
107
108 last_ack_all_sequence: i64,
110 last_ack_all_time: Instant,
111
112 stash: Vec<Option<Stashed>>,
113 stats: DataInputStats,
114
115 outgoing: Vec<OutgoingContextual>,
116 app_data: Vec<Bytes>,
117}
118
119impl ReliableDataInputChannel {
120 pub fn new(config: InputConfig, cipher: Option<Rc4KeyState>, now: Instant) -> Self {
124 let capacity = config.max_queued_incoming as usize;
125 let stash = std::iter::repeat_with(|| None).take(capacity).collect();
126
127 Self {
128 config,
129 cipher,
130 window_start_sequence: 0,
131 current_buffer: None,
132 expected_data_length: 0,
133 last_ack_all_sequence: -1,
134 last_ack_all_time: now,
135 stash,
136 stats: DataInputStats::default(),
137 outgoing: Vec::new(),
138 app_data: Vec::new(),
139 }
140 }
141
142 pub fn stats(&self) -> &DataInputStats {
144 &self.stats
145 }
146
147 pub fn take_outgoing(&mut self) -> Vec<OutgoingContextual> {
149 std::mem::take(&mut self.outgoing)
150 }
151
152 pub fn take_app_data(&mut self) -> Vec<Bytes> {
154 std::mem::take(&mut self.app_data)
155 }
156
157 fn max_queued(&self) -> i64 {
158 self.config.max_queued_incoming as i64
159 }
160
161 pub fn run_tick(&mut self, now: Instant) {
163 let to_ack = self.window_start_sequence - 1;
164
165 if self.config.acknowledge_all_data || to_ack <= self.last_ack_all_sequence {
168 return;
169 }
170
171 let need_ack = now.duration_since(self.last_ack_all_time) > self.config.max_ack_delay
172 || to_ack >= self.last_ack_all_sequence + (self.config.data_ack_window / 2) as i64;
173
174 if need_ack {
175 self.send_ack_all(to_ack, now);
176 }
177 }
178
179 pub fn handle_reliable_data(&mut self, data: Bytes, now: Instant) -> Result<(), CorruptData> {
182 if !self.preprocess(&data, false, now) {
183 return Ok(());
184 }
185 self.process_data(data.slice(2..));
186 self.window_start_sequence += 1;
187 self.consume_stashed()
188 }
189
190 pub fn handle_reliable_data_fragment(
194 &mut self,
195 data: Bytes,
196 now: Instant,
197 ) -> Result<(), CorruptData> {
198 if !self.preprocess(&data, true, now) {
199 return Ok(());
200 }
201 self.write_immediate_fragment(&data[2..])?;
202 self.window_start_sequence += 1;
203 self.try_process_current_buffer();
204 self.consume_stashed()
205 }
206
207 fn emit(&mut self, op_code: OpCode, sequence: u16) {
208 self.outgoing.push(OutgoingContextual { op_code, sequence });
209 }
210
211 fn send_ack_all(&mut self, sequence: i64, now: Instant) {
212 self.emit(OpCode::AcknowledgeAll, sequence as u16);
213 self.stats.acknowledge_count += 1;
214 self.last_ack_all_sequence = sequence;
215 self.last_ack_all_time = now;
216 }
217
218 fn preprocess(&mut self, data: &Bytes, is_fragment: bool, now: Instant) -> bool {
221 self.stats.total_received += 1;
222
223 let (sequence, packet_sequence) = match self.is_valid_reliable_data(data, now) {
224 Some(v) => v,
225 None => return false,
226 };
227
228 let ahead = sequence != self.window_start_sequence;
229
230 if self.config.acknowledge_all_data || ahead {
232 self.emit(OpCode::Acknowledge, packet_sequence);
233 }
234
235 if !ahead {
236 return true;
237 }
238
239 self.stats.out_of_order_count += 1;
241 let spot = sequence.rem_euclid(self.max_queued()) as usize;
242 if self.stash[spot].is_some() {
243 self.stats.duplicate_count += 1;
244 return false;
245 }
246
247 self.stash[spot] = Some(Stashed {
248 data: data.slice(2..),
249 is_fragment,
250 });
251 false
252 }
253
254 fn is_valid_reliable_data(&mut self, data: &[u8], now: Instant) -> Option<(i64, u16)> {
257 if data.len() < 2 {
258 return None;
259 }
260 let packet_sequence = u16::from_be_bytes([data[0], data[1]]);
261 let sequence = true_incoming_sequence(
262 packet_sequence,
263 self.window_start_sequence,
264 self.max_queued(),
265 );
266
267 if sequence > self.window_start_sequence + self.max_queued() {
269 return None;
270 }
271
272 if sequence >= self.window_start_sequence {
274 return Some((sequence, packet_sequence));
275 }
276
277 if now.duration_since(self.last_ack_all_time) < self.config.max_ack_delay {
279 self.send_ack_all(self.window_start_sequence - 1, now);
280 }
281 self.stats.duplicate_count += 1;
282 None
283 }
284
285 fn write_immediate_fragment(&mut self, data: &[u8]) -> Result<(), CorruptData> {
289 if let Some(buf) = &mut self.current_buffer {
290 buf.extend_from_slice(data);
291 } else {
292 if data.len() < FRAGMENT_COMPLETE_LENGTH_SIZE {
295 return Err(CorruptData);
296 }
297 let expected = u32::from_be_bytes([data[0], data[1], data[2], data[3]]) as usize;
298 self.expected_data_length = expected;
299 let mut buf = BytesMut::with_capacity(expected.min(MAX_FRAGMENT_PREALLOC));
303 buf.extend_from_slice(&data[FRAGMENT_COMPLETE_LENGTH_SIZE..]);
304 self.current_buffer = Some(buf);
305 }
306 Ok(())
307 }
308
309 #[cfg(test)]
313 fn current_buffer_capacity(&self) -> Option<usize> {
314 self.current_buffer.as_ref().map(|b| b.capacity())
315 }
316
317 fn try_process_current_buffer(&mut self) {
318 let ready =
319 matches!(&self.current_buffer, Some(buf) if buf.len() >= self.expected_data_length);
320 if !ready {
321 return;
322 }
323 let buf = self.current_buffer.take().unwrap();
324 self.process_data(buf.freeze());
325 self.expected_data_length = 0;
326 }
327
328 fn consume_stashed(&mut self) -> Result<(), CorruptData> {
329 loop {
330 let spot = self.window_start_sequence.rem_euclid(self.max_queued()) as usize;
331 let Some(item) = self.stash[spot].take() else {
332 break;
333 };
334
335 if item.is_fragment {
336 self.write_immediate_fragment(&item.data)?;
337 self.try_process_current_buffer();
338 } else {
339 self.process_data(item.data);
340 }
341
342 self.window_start_sequence += 1;
343 }
344 Ok(())
345 }
346
347 fn process_data(&mut self, data: Bytes) {
348 if data.len() > 2 && data[0..2] == MULTI_DATA_INDICATOR {
349 let mut reader = BinaryReader::new(&data);
350 if reader.skip(2).is_err() {
352 return;
353 }
354 while reader.remaining() > 0 {
355 let len = match data_bundle::read(&mut reader) {
356 Ok(l) => l as usize,
357 Err(_) => break,
358 };
359 let start = reader.offset();
360 if reader.skip(len).is_err() {
361 break;
362 }
363 let chunk = data.slice(start..start + len);
364 self.decrypt_and_handle(chunk);
365 }
366 } else {
367 self.decrypt_and_handle(data);
368 }
369 }
370
371 fn decrypt_and_handle(&mut self, data: Bytes) {
372 let processed = match &mut self.cipher {
373 Some(cipher) => {
374 let d = if data.len() > 1 && data[0] == 0 {
376 data.slice(1..)
377 } else {
378 data
379 };
380 let mut buf = BytesMut::from(&d[..]);
381 cipher.transform_in_place(&mut buf);
382 buf.freeze()
383 }
384 None => data,
385 };
386
387 self.stats.total_received_bytes += processed.len() as u64;
388 self.app_data.push(processed);
389 }
390}
391
392#[cfg(test)]
393mod tests {
394 use super::*;
395
396 struct Clock {
399 now: Instant,
400 }
401
402 impl Clock {
403 fn new() -> Self {
404 Self {
405 now: Instant::now(),
406 }
407 }
408 fn tick(&mut self) -> Instant {
409 self.now += Duration::from_millis(1);
410 self.now
411 }
412 }
413
414 fn config(acknowledge_all_data: bool) -> InputConfig {
415 InputConfig {
416 acknowledge_all_data,
417 max_ack_delay: Duration::ZERO,
418 ..InputConfig::default()
419 }
420 }
421
422 fn data_fragment(
425 sequence: u16,
426 complete_len: Option<u32>,
427 data_len: usize,
428 ) -> (Vec<u8>, Vec<u8>) {
429 let data: Vec<u8> = (0..data_len)
430 .map(|i| (i as u8).wrapping_mul(7).wrapping_add(sequence as u8))
431 .collect();
432 let mut buf = Vec::new();
433 buf.extend_from_slice(&sequence.to_be_bytes());
434 if let Some(cl) = complete_len {
435 buf.extend_from_slice(&cl.to_be_bytes());
436 }
437 buf.extend_from_slice(&data);
438 (buf, data)
439 }
440
441 fn assert_pop_ack(
444 ch: &mut ReliableDataInputChannel,
445 clock: &mut Clock,
446 pending: &mut Vec<OutgoingContextual>,
447 expected_sequence: u16,
448 expect_all: bool,
449 ) {
450 ch.run_tick(clock.tick());
451 pending.extend(ch.take_outgoing());
452 assert!(!pending.is_empty(), "expected an ack to be pending");
453 let ack = pending.remove(0);
454 let expected_op = if expect_all {
455 OpCode::AcknowledgeAll
456 } else {
457 OpCode::Acknowledge
458 };
459 assert_eq!(ack.op_code, expected_op);
460 assert_eq!(ack.sequence, expected_sequence);
461 }
462
463 const DATA_LENGTH: usize = 16;
464
465 fn new_channel(clock: &Clock, ack_all: bool) -> ReliableDataInputChannel {
466 ReliableDataInputChannel::new(config(ack_all), None, clock.now)
467 }
468
469 fn run_sequential_fragment_insert(ack_all: bool) {
470 let mut clock = Clock::new();
471 let mut ch = new_channel(&clock, ack_all);
472 let mut pending: Vec<OutgoingContextual> = Vec::new();
473
474 let (f0, d0) = data_fragment(0, Some((DATA_LENGTH * 3) as u32), DATA_LENGTH);
475 let (f1, d1) = data_fragment(1, None, DATA_LENGTH);
476 let (f2, d2) = data_fragment(2, None, DATA_LENGTH);
477
478 ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&f0), clock.tick())
479 .unwrap();
480 assert_pop_ack(&mut ch, &mut clock, &mut pending, 0, !ack_all);
481 assert!(ch.take_app_data().is_empty());
482
483 ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&f1), clock.tick())
484 .unwrap();
485 assert_pop_ack(&mut ch, &mut clock, &mut pending, 1, !ack_all);
486 assert!(ch.take_app_data().is_empty());
487
488 ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&f2), clock.tick())
489 .unwrap();
490 assert_pop_ack(&mut ch, &mut clock, &mut pending, 2, !ack_all);
491 let app = ch.take_app_data();
492 assert_eq!(app.len(), 1);
493
494 let stitched = &app[0];
495 assert_eq!(&stitched[0..DATA_LENGTH], &d0[..]);
496 assert_eq!(&stitched[DATA_LENGTH..DATA_LENGTH * 2], &d1[..]);
497 assert_eq!(&stitched[DATA_LENGTH * 2..], &d2[..]);
498 assert!(pending.is_empty(), "no superfluous acks");
499 }
500
501 #[test]
502 fn sequential_fragment_insert() {
503 run_sequential_fragment_insert(true);
504 run_sequential_fragment_insert(false);
505 }
506
507 fn run_non_sequential_fragment_insert(ack_all: bool) {
508 let mut clock = Clock::new();
509 let mut ch = new_channel(&clock, ack_all);
510 let mut pending: Vec<OutgoingContextual> = Vec::new();
511
512 let (f0, d0) = data_fragment(0, Some((DATA_LENGTH * 3) as u32), DATA_LENGTH);
513 let (f1, d1) = data_fragment(1, None, DATA_LENGTH);
514 let (f2, d2) = data_fragment(2, None, DATA_LENGTH);
515
516 ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&f2), clock.tick())
517 .unwrap();
518 assert_pop_ack(&mut ch, &mut clock, &mut pending, 2, false);
519
520 ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&f0), clock.tick())
521 .unwrap();
522 assert_pop_ack(&mut ch, &mut clock, &mut pending, 0, !ack_all);
523 assert!(ch.take_app_data().is_empty());
524
525 ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&f1), clock.tick())
526 .unwrap();
527 assert_pop_ack(
528 &mut ch,
529 &mut clock,
530 &mut pending,
531 if ack_all { 1 } else { 2 },
532 !ack_all,
533 );
534 let app = ch.take_app_data();
535 assert_eq!(app.len(), 1);
536
537 let stitched = &app[0];
538 assert_eq!(&stitched[0..DATA_LENGTH], &d0[..]);
539 assert_eq!(&stitched[DATA_LENGTH..DATA_LENGTH * 2], &d1[..]);
540 assert_eq!(&stitched[DATA_LENGTH * 2..], &d2[..]);
541 assert!(pending.is_empty(), "no superfluous acks");
542 }
543
544 #[test]
545 fn non_sequential_fragment_insert() {
546 run_non_sequential_fragment_insert(true);
547 run_non_sequential_fragment_insert(false);
548 }
549
550 fn run_non_fragment_insert(ack_all: bool) {
551 let mut clock = Clock::new();
552 let mut ch = new_channel(&clock, ack_all);
553 let mut pending: Vec<OutgoingContextual> = Vec::new();
554
555 let (p0, d0) = data_fragment(0, None, DATA_LENGTH);
556 let (p1, d1) = data_fragment(1, None, DATA_LENGTH);
557 let (p2, d2) = data_fragment(2, None, DATA_LENGTH);
558
559 ch.handle_reliable_data(Bytes::copy_from_slice(&p0), clock.tick())
560 .unwrap();
561 assert_pop_ack(&mut ch, &mut clock, &mut pending, 0, !ack_all);
562 let app = ch.take_app_data();
563 assert_eq!(app, vec![d0]);
564
565 ch.handle_reliable_data(Bytes::copy_from_slice(&p2), clock.tick())
566 .unwrap();
567 assert_pop_ack(&mut ch, &mut clock, &mut pending, 2, false);
568
569 ch.handle_reliable_data(Bytes::copy_from_slice(&p1), clock.tick())
570 .unwrap();
571 assert_pop_ack(
572 &mut ch,
573 &mut clock,
574 &mut pending,
575 if ack_all { 1 } else { 2 },
576 !ack_all,
577 );
578 let app = ch.take_app_data();
579 assert_eq!(app, vec![d1, d2]);
580 assert!(pending.is_empty(), "no superfluous acks");
581 }
582
583 #[test]
584 fn non_fragment_insert() {
585 run_non_fragment_insert(true);
586 run_non_fragment_insert(false);
587 }
588
589 fn run_fragmented_insert_of_two_datas(ack_all: bool) {
590 let mut clock = Clock::new();
591 let mut ch = new_channel(&clock, ack_all);
592 let mut pending: Vec<OutgoingContextual> = Vec::new();
593
594 let (f0, d0) = data_fragment(0, Some((DATA_LENGTH * 2) as u32), DATA_LENGTH);
595 let (f1, d1) = data_fragment(1, None, DATA_LENGTH);
596 let (f2, d2) = data_fragment(2, Some((DATA_LENGTH * 2) as u32), DATA_LENGTH);
597 let (f3, d3) = data_fragment(3, None, DATA_LENGTH);
598
599 ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&f0), clock.tick())
600 .unwrap();
601 assert_pop_ack(&mut ch, &mut clock, &mut pending, 0, !ack_all);
602 assert!(ch.take_app_data().is_empty());
603
604 ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&f1), clock.tick())
605 .unwrap();
606 assert_pop_ack(&mut ch, &mut clock, &mut pending, 1, !ack_all);
607 let app = ch.take_app_data();
608 assert_eq!(app.len(), 1);
609 assert_eq!(&app[0][..DATA_LENGTH], &d0[..]);
610 assert_eq!(&app[0][DATA_LENGTH..], &d1[..]);
611
612 ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&f3), clock.tick())
613 .unwrap();
614 assert_pop_ack(&mut ch, &mut clock, &mut pending, 3, false);
615 assert!(ch.take_app_data().is_empty());
616
617 ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&f2), clock.tick())
618 .unwrap();
619 assert_pop_ack(
620 &mut ch,
621 &mut clock,
622 &mut pending,
623 if ack_all { 2 } else { 3 },
624 !ack_all,
625 );
626 let app = ch.take_app_data();
627 assert_eq!(app.len(), 1);
628 assert_eq!(&app[0][..DATA_LENGTH], &d2[..]);
629 assert_eq!(&app[0][DATA_LENGTH..], &d3[..]);
630 }
631
632 #[test]
633 fn fragmented_insert_of_two_datas() {
634 run_fragmented_insert_of_two_datas(true);
635 run_fragmented_insert_of_two_datas(false);
636 }
637
638 fn run_sequence_waiting_on_data(ack_all: bool) {
639 let mut clock = Clock::new();
640 let mut ch = new_channel(&clock, ack_all);
641 let mut pending: Vec<OutgoingContextual> = Vec::new();
642
643 let (p0, d0) = data_fragment(0, None, DATA_LENGTH);
644 let (f1, d1) = data_fragment(1, Some((DATA_LENGTH * 2) as u32), DATA_LENGTH);
645 let (f2, d2) = data_fragment(2, None, DATA_LENGTH);
646
647 ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&f1), clock.tick())
648 .unwrap();
649 ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&f2), clock.tick())
650 .unwrap();
651 assert_pop_ack(&mut ch, &mut clock, &mut pending, 1, false);
652 assert_pop_ack(&mut ch, &mut clock, &mut pending, 2, false);
653
654 ch.handle_reliable_data(Bytes::copy_from_slice(&p0), clock.tick())
655 .unwrap();
656 assert_pop_ack(
657 &mut ch,
658 &mut clock,
659 &mut pending,
660 if ack_all { 0 } else { 2 },
661 !ack_all,
662 );
663
664 let app = ch.take_app_data();
665 assert_eq!(app.len(), 2);
666 assert_eq!(app[0], d0);
667 assert_eq!(&app[1][..DATA_LENGTH], &d1[..]);
668 assert_eq!(&app[1][DATA_LENGTH..], &d2[..]);
669 }
670
671 #[test]
672 fn sequence_waiting_on_data() {
673 run_sequence_waiting_on_data(true);
674 run_sequence_waiting_on_data(false);
675 }
676
677 fn run_multi_data(ack_all: bool) {
678 let mut clock = Clock::new();
679 let mut ch = new_channel(&clock, ack_all);
680 let mut pending: Vec<OutgoingContextual> = Vec::new();
681
682 let mut multi = vec![0u8, 0]; multi.extend_from_slice(&MULTI_DATA_INDICATOR);
686 multi.extend_from_slice(&[1, 2]); multi.extend_from_slice(&[1, 4]); ch.handle_reliable_data(Bytes::copy_from_slice(&multi), clock.tick())
690 .unwrap();
691 assert_pop_ack(&mut ch, &mut clock, &mut pending, 0, !ack_all);
692 assert_eq!(ch.take_app_data(), vec![vec![2u8], vec![4u8]]);
693
694 multi[1] = 0x01; ch.handle_reliable_data(Bytes::copy_from_slice(&multi), clock.tick())
696 .unwrap();
697 assert_pop_ack(&mut ch, &mut clock, &mut pending, 1, !ack_all);
698 assert_eq!(ch.take_app_data(), vec![vec![2u8], vec![4u8]]);
699 }
700
701 #[test]
702 fn multi_data() {
703 run_multi_data(true);
704 run_multi_data(false);
705 }
706
707 #[test]
714 fn ack_all_throttled_after_sequence_wraparound() {
715 let mut clock = Clock::new();
716 let mut ch = new_channel(&clock, false);
718
719 let total: u32 = 65_540;
721 for i in 0..total {
722 let (pkt, _) = data_fragment((i & 0xFFFF) as u16, None, DATA_LENGTH);
723 ch.handle_reliable_data(Bytes::copy_from_slice(&pkt), clock.tick())
724 .unwrap();
725 ch.take_outgoing();
727 ch.take_app_data();
728 }
729
730 ch.run_tick(clock.tick());
733 let first = ch.take_outgoing();
734 assert_eq!(first.len(), 1, "expected exactly one ack-all");
735 assert_eq!(first[0].op_code, OpCode::AcknowledgeAll);
736 assert_eq!(first[0].sequence, ((total - 1) & 0xFFFF) as u16);
737
738 for _ in 0..5 {
741 ch.run_tick(clock.tick());
742 assert!(
743 ch.take_outgoing().is_empty(),
744 "ack-all throttle broke after wraparound: redundant ack emitted"
745 );
746 }
747 }
748
749 #[test]
754 fn short_master_fragment_is_rejected_without_panic() {
755 let mut clock = Clock::new();
756 let mut ch = new_channel(&clock, false);
757
758 let mut pkt = Vec::new();
760 pkt.extend_from_slice(&0u16.to_be_bytes());
761 pkt.extend_from_slice(&[0xAB, 0xCD]);
762
763 let result = ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&pkt), clock.tick());
764 assert_eq!(result, Err(CorruptData));
765 }
766
767 #[test]
773 fn huge_claimed_length_master_fragment_does_not_preallocate() {
774 let mut clock = Clock::new();
775 let mut ch = new_channel(&clock, false);
776
777 let (pkt, _) = data_fragment(0, Some(u32::MAX), DATA_LENGTH);
778 let result = ch.handle_reliable_data_fragment(Bytes::copy_from_slice(&pkt), clock.tick());
780 assert_eq!(result, Ok(()));
781 assert!(
785 ch.current_buffer_capacity().unwrap() <= MAX_FRAGMENT_PREALLOC,
786 "reassembly buffer pre-allocated more than the cap from a hostile length"
787 );
788 assert!(ch.take_app_data().is_empty());
790 }
791}