1use bincode::Options;
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use std::time::{Duration, Instant};
7use thiserror::Error;
8
9pub const MAX_FRAGMENTS_PER_MESSAGE: u32 = 9_500;
11
12pub const MAX_MESSAGE_SIZE: usize = 200 * 32 * 1024;
14
15pub const DEFAULT_MAX_BUFFER_BYTES: usize = 300 * 1024 * 1024;
17
18pub const DEFAULT_MAX_CONCURRENT_MESSAGES: usize = 50;
19pub const DEFAULT_STALE_TIMEOUT_SECS: u64 = 120;
20pub const MAX_FRAGMENT_DESERIALIZE_SIZE: u64 = 33 * 1024;
22pub const FRAGMENT_OVERHEAD: usize = 21;
24pub const SURB_PAYLOAD_SIZE: usize = 30 * 1024;
26
27#[derive(Error, Debug, Clone, PartialEq, Eq)]
28pub enum FragmentationError {
29 #[error("Message too large: {size} bytes exceeds max {max} bytes")]
30 MessageTooLarge { size: usize, max: usize },
31
32 #[error("Invalid sequence {got} >= total_fragments {total}")]
33 InvalidSequence { got: u32, total: u32 },
34
35 #[error("Too many fragments: {total} exceeds max {max}")]
36 TooManyFragments { total: u32, max: u32 },
37
38 #[error("Inconsistent metadata: expected total={expected}, got total={got}")]
39 InconsistentMetadata { expected: u32, got: u32 },
40
41 #[error("Serialization error: {0}")]
42 SerializationError(String),
43
44 #[error("Empty message")]
45 EmptyMessage,
46
47 #[error("Invalid chunk size: {0}")]
48 InvalidChunkSize(usize),
49
50 #[error("Internal logic error: {0}")]
51 InternalError(String),
52
53 #[error("Duplicate fragment seq={sequence} for message {message_id} with different data")]
54 DuplicateDataMismatch { message_id: u64, sequence: u32 },
55
56 #[error("Invalid FEC metadata: {reason}")]
57 InvalidFec { reason: String },
58
59 #[error("FEC decode failed: {0}")]
60 FecDecodeFailed(String),
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
64pub struct Fragment {
65 pub message_id: u64,
66 pub total_fragments: u32,
68 pub sequence: u32,
70 pub data: Vec<u8>,
71 pub fec: Option<super::fec::FecInfo>,
73}
74
75impl Fragment {
76 pub fn new(
77 message_id: u64,
78 total_fragments: u32,
79 sequence: u32,
80 data: Vec<u8>,
81 ) -> Result<Self, FragmentationError> {
82 if total_fragments > MAX_FRAGMENTS_PER_MESSAGE {
83 return Err(FragmentationError::TooManyFragments {
84 total: total_fragments,
85 max: MAX_FRAGMENTS_PER_MESSAGE,
86 });
87 }
88 if sequence >= total_fragments {
89 return Err(FragmentationError::InvalidSequence {
90 got: sequence,
91 total: total_fragments,
92 });
93 }
94 Ok(Self {
95 message_id,
96 total_fragments,
97 sequence,
98 data,
99 fec: None,
100 })
101 }
102
103 pub fn new_with_fec(
104 message_id: u64,
105 total_fragments: u32,
106 sequence: u32,
107 data: Vec<u8>,
108 fec_info: super::fec::FecInfo,
109 ) -> Result<Self, FragmentationError> {
110 if total_fragments > MAX_FRAGMENTS_PER_MESSAGE {
111 return Err(FragmentationError::TooManyFragments {
112 total: total_fragments,
113 max: MAX_FRAGMENTS_PER_MESSAGE,
114 });
115 }
116 if sequence >= total_fragments {
117 return Err(FragmentationError::InvalidSequence {
118 got: sequence,
119 total: total_fragments,
120 });
121 }
122 if fec_info.data_shard_count == 0 || fec_info.data_shard_count > total_fragments {
123 return Err(FragmentationError::InvalidFec {
124 reason: format!(
125 "data_shard_count {} must be in 1..={}",
126 fec_info.data_shard_count, total_fragments
127 ),
128 });
129 }
130 Ok(Self {
131 message_id,
132 total_fragments,
133 sequence,
134 data,
135 fec: Some(fec_info),
136 })
137 }
138
139 pub fn to_bytes(&self) -> Result<Vec<u8>, FragmentationError> {
140 bincode::serialize(self).map_err(|e| FragmentationError::SerializationError(e.to_string()))
141 }
142
143 pub fn from_bytes(bytes: &[u8]) -> Result<Self, FragmentationError> {
144 let frag: Fragment = bincode::DefaultOptions::new()
145 .with_limit(MAX_FRAGMENT_DESERIALIZE_SIZE)
146 .with_fixint_encoding()
147 .allow_trailing_bytes()
148 .deserialize(bytes)
149 .map_err(|e| FragmentationError::SerializationError(e.to_string()))?;
150 frag.validate()?;
151 Ok(frag)
152 }
153
154 pub fn validate(&self) -> Result<(), FragmentationError> {
155 if self.total_fragments > MAX_FRAGMENTS_PER_MESSAGE {
156 return Err(FragmentationError::TooManyFragments {
157 total: self.total_fragments,
158 max: MAX_FRAGMENTS_PER_MESSAGE,
159 });
160 }
161 if self.sequence >= self.total_fragments {
162 return Err(FragmentationError::InvalidSequence {
163 got: self.sequence,
164 total: self.total_fragments,
165 });
166 }
167 if let Some(ref fec_info) = self.fec {
168 if fec_info.data_shard_count == 0 || fec_info.data_shard_count > self.total_fragments {
169 return Err(FragmentationError::InvalidFec {
170 reason: format!(
171 "data_shard_count {} must be in 1..={}",
172 fec_info.data_shard_count, self.total_fragments
173 ),
174 });
175 }
176 }
177 Ok(())
178 }
179
180 #[must_use]
181 pub fn size(&self) -> usize {
182 let fec_overhead = if self.fec.is_some() { 13 } else { 1 };
183 FRAGMENT_OVERHEAD + self.data.len() + fec_overhead
184 }
185}
186
187#[derive(Debug, Clone)]
188pub struct Fragmenter {
189 max_message_size: usize,
190}
191
192impl Default for Fragmenter {
193 fn default() -> Self {
194 Self::new()
195 }
196}
197
198impl Fragmenter {
199 #[must_use]
200 pub fn new() -> Self {
201 Self {
202 max_message_size: MAX_MESSAGE_SIZE,
203 }
204 }
205
206 #[must_use]
207 pub fn with_max_size(max_message_size: usize) -> Self {
208 Self { max_message_size }
209 }
210
211 pub fn fragment(
212 &self,
213 message_id: u64,
214 data: &[u8],
215 max_chunk_size: usize,
216 ) -> Result<Vec<Fragment>, FragmentationError> {
217 if data.is_empty() {
218 return Err(FragmentationError::EmptyMessage);
219 }
220
221 if data.len() > self.max_message_size {
222 return Err(FragmentationError::MessageTooLarge {
223 size: data.len(),
224 max: self.max_message_size,
225 });
226 }
227
228 let usable_payload = max_chunk_size.saturating_sub(FRAGMENT_OVERHEAD);
229 if usable_payload == 0 {
230 return Err(FragmentationError::InvalidChunkSize(max_chunk_size));
231 }
232
233 let total_fragments = data.len().div_ceil(usable_payload);
234
235 if total_fragments > MAX_FRAGMENTS_PER_MESSAGE as usize {
236 return Err(FragmentationError::TooManyFragments {
237 total: total_fragments as u32,
238 max: MAX_FRAGMENTS_PER_MESSAGE,
239 });
240 }
241
242 let total_fragments = total_fragments as u32;
243 let mut fragments = Vec::with_capacity(total_fragments as usize);
244
245 for (seq, chunk) in data.chunks(usable_payload).enumerate() {
246 fragments.push(Fragment {
247 message_id,
248 total_fragments,
249 sequence: seq as u32,
250 data: chunk.to_vec(),
251 fec: None,
252 });
253 }
254
255 Ok(fragments)
256 }
257
258 #[must_use]
259 pub fn usable_payload_size(max_packet_size: usize) -> usize {
260 max_packet_size.saturating_sub(FRAGMENT_OVERHEAD)
261 }
262}
263
264#[derive(Debug)]
265struct ReassemblyBuffer {
266 fragments: HashMap<u32, Fragment>,
267 expected_total: u32,
268 received_count: u32,
269 buffered_bytes: usize,
270 created_at: Instant,
271 last_activity: Instant,
272 fec_info: Option<super::fec::FecInfo>,
274}
275
276impl ReassemblyBuffer {
277 fn new(first_fragment: &Fragment) -> Self {
278 let now = Instant::now();
279 Self {
280 fragments: HashMap::with_capacity(first_fragment.total_fragments as usize),
281 expected_total: first_fragment.total_fragments,
282 received_count: 0,
283 buffered_bytes: 0,
284 created_at: now,
285 last_activity: now,
286 fec_info: first_fragment.fec.clone(),
287 }
288 }
289
290 fn add(&mut self, fragment: Fragment) -> Result<bool, FragmentationError> {
293 if fragment.total_fragments != self.expected_total {
294 return Err(FragmentationError::InconsistentMetadata {
295 expected: self.expected_total,
296 got: fragment.total_fragments,
297 });
298 }
299
300 if let Some(ref incoming_fec) = fragment.fec {
301 match &self.fec_info {
302 Some(existing_fec) => {
303 if incoming_fec != existing_fec {
304 return Err(FragmentationError::InvalidFec {
305 reason: format!(
306 "FEC mismatch: buffer has D={} len={}, fragment has D={} len={}",
307 existing_fec.data_shard_count,
308 existing_fec.original_data_len,
309 incoming_fec.data_shard_count,
310 incoming_fec.original_data_len,
311 ),
312 });
313 }
314 }
315 None => {
316 self.fec_info = Some(incoming_fec.clone());
317 }
318 }
319 }
320
321 self.last_activity = Instant::now();
322
323 if let Some(existing) = self.fragments.get(&fragment.sequence) {
324 if existing.data != fragment.data {
325 return Err(FragmentationError::DuplicateDataMismatch {
326 message_id: fragment.message_id,
327 sequence: fragment.sequence,
328 });
329 }
330 return Ok(false);
331 }
332
333 self.buffered_bytes += fragment.size();
334 self.received_count += 1;
335 self.fragments.insert(fragment.sequence, fragment);
336
337 Ok(true)
338 }
339
340 fn is_complete(&self) -> bool {
341 match &self.fec_info {
342 Some(fec) => self.received_count >= fec.data_shard_count,
343 None => self.received_count == self.expected_total,
344 }
345 }
346
347 fn has_sequence(&self, sequence: u32) -> bool {
348 self.fragments.contains_key(&sequence)
349 }
350
351 fn assemble(mut self) -> Result<Vec<u8>, FragmentationError> {
352 match &self.fec_info {
353 None => {
354 let mut result = Vec::new();
355 for seq in 0..self.expected_total {
356 if let Some(frag) = self.fragments.remove(&seq) {
357 result.extend(frag.data);
358 }
359 }
360 Ok(result)
361 }
362 Some(fec) => {
363 let d = fec.data_shard_count as usize;
364 let total = self.expected_total as usize;
365 let original_len = fec.original_data_len;
366
367 let mut shards: Vec<Option<Vec<u8>>> = (0..total)
368 .map(|seq| self.fragments.remove(&(seq as u32)).map(|f| f.data))
369 .collect();
370
371 super::fec::decode_shards(&mut shards, d, original_len)
372 .map_err(|e| FragmentationError::FecDecodeFailed(e.to_string()))
373 }
374 }
375 }
376}
377
378#[derive(Debug, Clone)]
379pub struct ReassemblerConfig {
380 pub max_buffer_bytes: usize,
381 pub max_concurrent_messages: usize,
382 pub stale_timeout: Duration,
383}
384
385impl Default for ReassemblerConfig {
386 fn default() -> Self {
387 Self {
388 max_buffer_bytes: DEFAULT_MAX_BUFFER_BYTES,
389 max_concurrent_messages: DEFAULT_MAX_CONCURRENT_MESSAGES,
390 stale_timeout: Duration::from_secs(DEFAULT_STALE_TIMEOUT_SECS),
391 }
392 }
393}
394
395#[derive(Debug)]
398pub struct Reassembler {
399 buffers: HashMap<u64, ReassemblyBuffer>,
400 config: ReassemblerConfig,
401 total_buffered_bytes: usize,
402}
403
404impl Reassembler {
405 #[must_use]
406 pub fn new(config: ReassemblerConfig) -> Self {
407 Self {
408 buffers: HashMap::new(),
409 config,
410 total_buffered_bytes: 0,
411 }
412 }
413
414 pub fn add_fragment(
416 &mut self,
417 fragment: Fragment,
418 ) -> Result<Option<Vec<u8>>, FragmentationError> {
419 fragment.validate()?;
420
421 let message_id = fragment.message_id;
422 let fragment_size = fragment.size();
423
424 if let Some(buffer) = self.buffers.get(&message_id) {
426 if buffer.has_sequence(fragment.sequence) {
427 let buffer = self.buffers.get_mut(&message_id).ok_or_else(|| {
428 FragmentationError::InternalError(
429 "Buffer vanished during duplicate check".to_string(),
430 )
431 })?;
432 buffer.add(fragment)?;
433 return Ok(None);
434 }
435 }
436
437 self.ensure_capacity(fragment_size)?;
438
439 let buffer = self
440 .buffers
441 .entry(message_id)
442 .or_insert_with(|| ReassemblyBuffer::new(&fragment));
443
444 let is_new = buffer.add(fragment)?;
445
446 if is_new {
447 self.total_buffered_bytes += fragment_size;
448 }
449
450 let is_complete = buffer.is_complete();
451
452 if is_complete {
453 let buffer = self.buffers.remove(&message_id).ok_or_else(|| {
454 FragmentationError::InternalError("Buffer vanished during processing".to_string())
455 })?;
456
457 self.total_buffered_bytes = self
458 .total_buffered_bytes
459 .saturating_sub(buffer.buffered_bytes);
460 Ok(Some(buffer.assemble()?))
461 } else {
462 Ok(None)
463 }
464 }
465
466 fn ensure_capacity(&mut self, needed_bytes: usize) -> Result<(), FragmentationError> {
467 while (self.total_buffered_bytes + needed_bytes > self.config.max_buffer_bytes
468 || self.buffers.len() >= self.config.max_concurrent_messages)
469 && !self.buffers.is_empty()
470 {
471 self.evict_oldest();
472 }
473 Ok(())
474 }
475
476 fn evict_oldest(&mut self) {
477 if let Some((&oldest_id, _)) = self.buffers.iter().min_by_key(|(_, buf)| buf.last_activity)
478 {
479 if let Some(buffer) = self.buffers.remove(&oldest_id) {
480 self.total_buffered_bytes = self
481 .total_buffered_bytes
482 .saturating_sub(buffer.buffered_bytes);
483 tracing::debug!(
484 message_id = oldest_id,
485 bytes = buffer.buffered_bytes,
486 "Evicted stale reassembly buffer"
487 );
488 }
489 }
490 }
491
492 pub fn prune_stale(&mut self, timeout: Duration) -> usize {
493 let now = Instant::now();
494 let stale_ids: Vec<u64> = self
495 .buffers
496 .iter()
497 .filter(|(_, buf)| now.duration_since(buf.created_at) > timeout)
498 .map(|(&id, _)| id)
499 .collect();
500
501 let count = stale_ids.len();
502 for id in stale_ids {
503 if let Some(buffer) = self.buffers.remove(&id) {
504 self.total_buffered_bytes = self
505 .total_buffered_bytes
506 .saturating_sub(buffer.buffered_bytes);
507 tracing::debug!(
508 message_id = id,
509 age_secs = now.duration_since(buffer.created_at).as_secs(),
510 "Pruned stale reassembly buffer"
511 );
512 }
513 }
514 count
515 }
516
517 pub fn prune_stale_default(&mut self) -> usize {
518 self.prune_stale(self.config.stale_timeout)
519 }
520
521 #[must_use]
522 pub fn buffered_bytes(&self) -> usize {
523 self.total_buffered_bytes
524 }
525
526 #[must_use]
527 pub fn pending_count(&self) -> usize {
528 self.buffers.len()
529 }
530
531 #[must_use]
532 pub fn has_message(&self, message_id: u64) -> bool {
533 self.buffers.contains_key(&message_id)
534 }
535
536 #[must_use]
537 pub fn message_progress(&self, message_id: u64) -> Option<(u32, u32)> {
538 self.buffers
539 .get(&message_id)
540 .map(|buf| (buf.received_count, buf.expected_total))
541 }
542}
543
544#[cfg(test)]
545mod tests {
546 use super::*;
547
548 #[test]
549 fn test_fragment_roundtrip() {
550 let frag = Fragment::new(12345, 10, 5, vec![1, 2, 3, 4, 5]).unwrap();
551 let bytes = frag.to_bytes().unwrap();
552 let recovered = Fragment::from_bytes(&bytes).unwrap();
553 assert_eq!(frag, recovered);
554 }
555
556 #[test]
557 fn test_fragment_validation() {
558 assert!(Fragment::new(1, 10, 10, vec![]).is_err());
559 assert!(Fragment::new(1, 10, 15, vec![]).is_err());
560 assert!(Fragment::new(1, MAX_FRAGMENTS_PER_MESSAGE + 1, 0, vec![]).is_err());
561 }
562
563 #[test]
564 fn test_fragmenter_basic() {
565 let fragmenter = Fragmenter::new();
566 let data: Vec<u8> = (0..10_000).map(|i| (i % 256) as u8).collect();
567 let fragments = fragmenter.fragment(1, &data, 1000).unwrap();
568
569 assert!(fragments.len() > 1);
570 assert!(fragments.iter().all(|f| f.message_id == 1));
571 assert!(fragments
572 .iter()
573 .all(|f| f.total_fragments == fragments.len() as u32));
574
575 for (i, f) in fragments.iter().enumerate() {
576 assert_eq!(f.sequence, i as u32);
577 }
578 }
579
580 #[test]
581 fn test_fragmenter_empty_message() {
582 let fragmenter = Fragmenter::new();
583 assert!(matches!(
584 fragmenter.fragment(1, &[], 1000),
585 Err(FragmentationError::EmptyMessage)
586 ));
587 }
588
589 #[test]
590 fn test_fragmenter_message_too_large() {
591 let fragmenter = Fragmenter::with_max_size(1000);
592 let data = vec![0u8; 2000];
593 assert!(matches!(
594 fragmenter.fragment(1, &data, 100),
595 Err(FragmentationError::MessageTooLarge { .. })
596 ));
597 }
598
599 #[test]
600 fn test_reassembler_in_order() {
601 let fragmenter = Fragmenter::new();
602 let mut reassembler = Reassembler::new(ReassemblerConfig::default());
603
604 let original: Vec<u8> = (0..5000).map(|i| (i % 256) as u8).collect();
605 let fragments = fragmenter.fragment(42, &original, 500).unwrap();
606
607 for (i, frag) in fragments.into_iter().enumerate() {
608 let result = reassembler.add_fragment(frag).unwrap();
609 if i < 10 {
610 assert!(result.is_none());
611 }
612 }
613 }
614
615 #[test]
616 fn test_reassembler_out_of_order() {
617 let fragmenter = Fragmenter::new();
618 let mut reassembler = Reassembler::new(ReassemblerConfig::default());
619
620 let original: Vec<u8> = (0..5000).map(|i| (i % 256) as u8).collect();
621 let mut fragments = fragmenter.fragment(42, &original, 500).unwrap();
622 fragments.reverse();
623
624 let mut result = None;
625 for frag in fragments {
626 if let Some(data) = reassembler.add_fragment(frag).unwrap() {
627 result = Some(data);
628 }
629 }
630
631 assert_eq!(result.unwrap(), original);
632 }
633
634 #[test]
635 fn test_reassembler_duplicate_handling() {
636 let fragmenter = Fragmenter::new();
637 let mut reassembler = Reassembler::new(ReassemblerConfig::default());
638
639 let original: Vec<u8> = (0..1000).map(|i| (i % 256) as u8).collect();
640 let fragments = fragmenter.fragment(42, &original, 500).unwrap();
641
642 let first = fragments[0].clone();
643 assert!(reassembler.add_fragment(first.clone()).unwrap().is_none());
644 assert!(reassembler.add_fragment(first.clone()).unwrap().is_none());
645 assert!(reassembler.add_fragment(first).unwrap().is_none());
646
647 let mut result = None;
648 for frag in fragments.into_iter().skip(1) {
649 if let Some(data) = reassembler.add_fragment(frag).unwrap() {
650 result = Some(data);
651 }
652 }
653
654 assert_eq!(result.unwrap(), original);
655 }
656
657 #[test]
658 fn test_reassembler_inconsistent_metadata() {
659 let mut reassembler = Reassembler::new(ReassemblerConfig::default());
660
661 let frag1 = Fragment::new(100, 5, 0, vec![1, 2, 3]).unwrap();
662 reassembler.add_fragment(frag1).unwrap();
663
664 let frag2 = Fragment {
665 message_id: 100,
666 total_fragments: 10,
667 sequence: 1,
668 data: vec![4, 5, 6],
669 fec: None,
670 };
671
672 assert!(matches!(
673 reassembler.add_fragment(frag2),
674 Err(FragmentationError::InconsistentMetadata {
675 expected: 5,
676 got: 10
677 })
678 ));
679 }
680
681 #[test]
682 fn test_reassembler_memory_limit() {
683 let config = ReassemblerConfig {
684 max_buffer_bytes: 1000,
685 max_concurrent_messages: 2,
686 stale_timeout: Duration::from_secs(60),
687 };
688 let mut reassembler = Reassembler::new(config);
689
690 let frag1 = Fragment::new(1, 5, 0, vec![0u8; 200]).unwrap();
691 reassembler.add_fragment(frag1).unwrap();
692
693 let frag2 = Fragment::new(2, 5, 0, vec![0u8; 200]).unwrap();
694 reassembler.add_fragment(frag2).unwrap();
695
696 let frag3 = Fragment::new(3, 5, 0, vec![0u8; 200]).unwrap();
697 reassembler.add_fragment(frag3).unwrap();
698
699 assert!(!reassembler.has_message(1));
700 assert!(reassembler.has_message(2));
701 assert!(reassembler.has_message(3));
702 }
703
704 #[test]
705 fn test_reassembler_prune_stale() {
706 let config = ReassemblerConfig {
707 max_buffer_bytes: 10_000,
708 max_concurrent_messages: 100,
709 stale_timeout: Duration::from_millis(50),
710 };
711 let mut reassembler = Reassembler::new(config);
712
713 let frag = Fragment::new(999, 10, 0, vec![1, 2, 3]).unwrap();
714 reassembler.add_fragment(frag).unwrap();
715 assert_eq!(reassembler.pending_count(), 1);
716
717 std::thread::sleep(Duration::from_millis(100));
718 let pruned = reassembler.prune_stale(Duration::from_millis(50));
719 assert_eq!(pruned, 1);
720 assert_eq!(reassembler.pending_count(), 0);
721 }
722
723 #[test]
724 fn test_duplicate_sequence_same_data_accepted() {
725 let mut reassembler = Reassembler::new(ReassemblerConfig::default());
726
727 let frag = Fragment::new(1, 3, 0, vec![1, 2, 3]).unwrap();
728 assert!(reassembler.add_fragment(frag.clone()).unwrap().is_none());
729 assert!(reassembler.add_fragment(frag).unwrap().is_none());
730 assert_eq!(reassembler.message_progress(1), Some((1, 3)));
731 }
732
733 #[test]
734 fn test_duplicate_sequence_different_data_rejected() {
735 let mut reassembler = Reassembler::new(ReassemblerConfig::default());
736
737 let frag1 = Fragment::new(1, 3, 0, vec![1, 2, 3]).unwrap();
738 reassembler.add_fragment(frag1).unwrap();
739
740 let frag2 = Fragment {
741 message_id: 1,
742 total_fragments: 3,
743 sequence: 0,
744 data: vec![4, 5, 6],
745 fec: None,
746 };
747 assert!(matches!(
748 reassembler.add_fragment(frag2),
749 Err(FragmentationError::DuplicateDataMismatch {
750 message_id: 1,
751 sequence: 0,
752 })
753 ));
754 }
755
756 #[test]
757 fn test_duplicate_does_not_evict() {
758 let config = ReassemblerConfig {
759 max_buffer_bytes: 1000,
760 max_concurrent_messages: 2,
761 stale_timeout: Duration::from_secs(60),
762 };
763 let mut reassembler = Reassembler::new(config);
764
765 let frag1 = Fragment::new(1, 5, 0, vec![0u8; 200]).unwrap();
766 reassembler.add_fragment(frag1).unwrap();
767
768 let frag2 = Fragment::new(2, 5, 0, vec![0u8; 200]).unwrap();
769 reassembler.add_fragment(frag2).unwrap();
770
771 let dup = Fragment::new(1, 5, 0, vec![0u8; 200]).unwrap();
772 reassembler.add_fragment(dup).unwrap();
773
774 assert!(
775 reassembler.has_message(1),
776 "Message 1 should still be present"
777 );
778 assert!(
779 reassembler.has_message(2),
780 "Message 2 should still be present"
781 );
782 }
783
784 use super::super::fec::{encode_parity_shards, pad_to_uniform, FecInfo};
785
786 fn make_fec_fragments(
787 message_id: u64,
788 data: &[u8],
789 shard_size: usize,
790 parity_count: usize,
791 ) -> Vec<Fragment> {
792 let chunks: Vec<Vec<u8>> = data.chunks(shard_size).map(|c| c.to_vec()).collect();
793 let (padded, _) = pad_to_uniform(&chunks).unwrap();
794 let d = padded.len();
795
796 let parity = encode_parity_shards(&padded, parity_count).unwrap();
797 let total = (d + parity_count) as u32;
798
799 let fec_info = FecInfo {
800 data_shard_count: d as u32,
801 original_data_len: data.len() as u64,
802 };
803
804 let mut fragments = Vec::new();
805 for (seq, shard) in padded.into_iter().enumerate() {
806 fragments.push(Fragment {
807 message_id,
808 total_fragments: total,
809 sequence: seq as u32,
810 data: shard,
811 fec: Some(fec_info.clone()),
812 });
813 }
814 for (i, shard) in parity.into_iter().enumerate() {
815 fragments.push(Fragment {
816 message_id,
817 total_fragments: total,
818 sequence: (d + i) as u32,
819 data: shard,
820 fec: Some(fec_info.clone()),
821 });
822 }
823 fragments
824 }
825
826 #[test]
827 fn test_fec_all_data_present_fast_path() {
828 let mut reassembler = Reassembler::new(ReassemblerConfig::default());
829 let original: Vec<u8> = (0..300).map(|i| (i % 256) as u8).collect();
830 let fragments = make_fec_fragments(1, &original, 100, 2); let mut result = None;
833 for frag in fragments {
834 if let Some(data) = reassembler.add_fragment(frag).unwrap() {
835 result = Some(data);
836 }
837 }
838
839 assert_eq!(result.unwrap(), original);
840 }
841
842 #[test]
843 fn test_fec_drop_one_data_shard_rs_recovery() {
844 let mut reassembler = Reassembler::new(ReassemblerConfig::default());
845 let original: Vec<u8> = (0..300).map(|i| (i % 256) as u8).collect();
846 let mut fragments = make_fec_fragments(1, &original, 100, 2); fragments.remove(1);
849
850 let mut result = None;
851 for frag in fragments {
852 if let Some(data) = reassembler.add_fragment(frag).unwrap() {
853 result = Some(data);
854 }
855 }
856
857 assert_eq!(result.unwrap(), original);
858 }
859
860 #[test]
861 fn test_fec_drop_all_parity_fast_path() {
862 let mut reassembler = Reassembler::new(ReassemblerConfig::default());
863 let original: Vec<u8> = (0..300).map(|i| (i % 256) as u8).collect();
864 let fragments = make_fec_fragments(1, &original, 100, 2); let mut result = None;
867 for frag in fragments.into_iter().take(3) {
868 if let Some(data) = reassembler.add_fragment(frag).unwrap() {
869 result = Some(data);
870 }
871 }
872
873 assert_eq!(result.unwrap(), original);
874 }
875
876 #[test]
877 fn test_fec_drop_p_shards_rs_recovers() {
878 let mut reassembler = Reassembler::new(ReassemblerConfig::default());
879 let original: Vec<u8> = (0..500).map(|i| (i % 256) as u8).collect();
880 let mut fragments = make_fec_fragments(1, &original, 100, 3); fragments.remove(6);
883 fragments.remove(3);
884 fragments.remove(0);
885
886 let mut result = None;
887 for frag in fragments {
888 if let Some(data) = reassembler.add_fragment(frag).unwrap() {
889 result = Some(data);
890 }
891 }
892
893 assert_eq!(result.unwrap(), original);
894 }
895
896 #[test]
897 fn test_fec_drop_too_many_incomplete() {
898 let mut reassembler = Reassembler::new(ReassemblerConfig::default());
899 let original: Vec<u8> = (0..300).map(|i| (i % 256) as u8).collect();
900 let mut fragments = make_fec_fragments(1, &original, 100, 2); fragments.remove(4);
903 fragments.remove(2);
904 fragments.remove(0);
905
906 let mut result = None;
907 for frag in fragments {
908 if let Some(data) = reassembler.add_fragment(frag).unwrap() {
909 result = Some(data);
910 }
911 }
912
913 assert!(result.is_none());
914 assert_eq!(reassembler.pending_count(), 1);
915 }
916
917 #[test]
918 fn test_fec_single_fragment_d1_p1() {
919 let mut reassembler = Reassembler::new(ReassemblerConfig::default());
920 let original = b"Small message".to_vec();
921 let fragments = make_fec_fragments(1, &original, original.len(), 1); assert_eq!(fragments.len(), 2);
924
925 let parity = fragments[1].clone();
926 let result = reassembler.add_fragment(parity).unwrap();
927 assert!(result.is_some());
928 assert_eq!(result.unwrap(), original);
929 }
930
931 #[test]
932 fn test_fec_backward_compat_no_fec() {
933 let mut reassembler = Reassembler::new(ReassemblerConfig::default());
934 let original: Vec<u8> = (0..1000).map(|i| (i % 256) as u8).collect();
935
936 let fragmenter = Fragmenter::new();
937 let fragments = fragmenter.fragment(42, &original, 500).unwrap();
938
939 let mut result = None;
940 for frag in fragments {
941 assert!(frag.fec.is_none());
942 if let Some(data) = reassembler.add_fragment(frag).unwrap() {
943 result = Some(data);
944 }
945 }
946
947 assert_eq!(result.unwrap(), original);
948 }
949
950 #[test]
951 fn test_fec_consistency_validation() {
952 let mut reassembler = Reassembler::new(ReassemblerConfig::default());
953
954 let fec1 = FecInfo {
955 data_shard_count: 3,
956 original_data_len: 300,
957 };
958 let fec2 = FecInfo {
959 data_shard_count: 5,
960 original_data_len: 500,
961 };
962
963 let frag1 = Fragment {
964 message_id: 1,
965 total_fragments: 5,
966 sequence: 0,
967 data: vec![0u8; 100],
968 fec: Some(fec1),
969 };
970 reassembler.add_fragment(frag1).unwrap();
971
972 let frag2 = Fragment {
973 message_id: 1,
974 total_fragments: 5,
975 sequence: 1,
976 data: vec![0u8; 100],
977 fec: Some(fec2),
978 };
979
980 assert!(matches!(
981 reassembler.add_fragment(frag2),
982 Err(FragmentationError::InvalidFec { .. })
983 ));
984 }
985
986 #[test]
987 fn test_fec_mixed_data_parity_drops() {
988 let mut reassembler = Reassembler::new(ReassemblerConfig::default());
989 let original: Vec<u8> = (0..1000).map(|i| (i % 256) as u8).collect();
990 let mut fragments = make_fec_fragments(1, &original, 200, 3); fragments.remove(6);
993 fragments.remove(2);
994 fragments.remove(0);
995
996 let mut result = None;
997 for frag in fragments {
998 if let Some(data) = reassembler.add_fragment(frag).unwrap() {
999 result = Some(data);
1000 }
1001 }
1002
1003 assert_eq!(result.unwrap(), original);
1004 }
1005
1006 #[test]
1007 fn test_fec_info_inconsistency_rejected() {
1008 let mut reassembler = Reassembler::new(ReassemblerConfig::default());
1009
1010 let fec_info_a = FecInfo {
1011 data_shard_count: 3,
1012 original_data_len: 600,
1013 };
1014 let fec_info_b = FecInfo {
1015 data_shard_count: 5,
1016 original_data_len: 600,
1017 };
1018
1019 let frag1 = Fragment {
1020 message_id: 42,
1021 total_fragments: 4,
1022 sequence: 0,
1023 data: vec![1u8; 200],
1024 fec: Some(fec_info_a),
1025 };
1026 reassembler.add_fragment(frag1).unwrap();
1027
1028 let frag2 = Fragment {
1029 message_id: 42,
1030 total_fragments: 4,
1031 sequence: 1,
1032 data: vec![2u8; 200],
1033 fec: Some(fec_info_b),
1034 };
1035 let result = reassembler.add_fragment(frag2);
1036 assert!(
1037 result.is_err(),
1038 "expected error on inconsistent FEC metadata, got Ok"
1039 );
1040 }
1041}