Skip to main content

nox_core/protocol/
fragmentation.rs

1//! Fragmentation and reassembly for messages over unreliable, unordered Sphinx packet streams.
2
3use bincode::Options;
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use std::time::{Duration, Instant};
7use thiserror::Error;
8
9/// Max fragments per message. Set to 9,500 to support ~275 MB via multi-round SURB replenishment.
10pub const MAX_FRAGMENTS_PER_MESSAGE: u32 = 9_500;
11
12/// Maximum forward-path message size (~6 MB). Response-path is not subject to this limit.
13pub const MAX_MESSAGE_SIZE: usize = 200 * 32 * 1024;
14
15/// Default max pending bytes in reassembly buffer (300 MB).
16pub const DEFAULT_MAX_BUFFER_BYTES: usize = 300 * 1024 * 1024;
17
18pub const DEFAULT_MAX_CONCURRENT_MESSAGES: usize = 50;
19pub const DEFAULT_STALE_TIMEOUT_SECS: u64 = 120;
20/// Single fragment: 32KB + overhead.
21pub const MAX_FRAGMENT_DESERIALIZE_SIZE: u64 = 33 * 1024;
22/// Bincode overhead per Fragment (~21 bytes without FEC, ~33 with FEC).
23pub const FRAGMENT_OVERHEAD: usize = 21;
24/// Sphinx body (~31KB) minus `ServiceResponse` wrapper overhead (~50 bytes).
25pub const SURB_PAYLOAD_SIZE: usize = 30 * 1024;
26
27#[derive(Error, Debug, Clone, PartialEq, Eq)]
28pub enum FragmentationError {
29    #[error("Message too large: {size} bytes exceeds max {max} bytes")]
30    MessageTooLarge { size: usize, max: usize },
31
32    #[error("Invalid sequence {got} >= total_fragments {total}")]
33    InvalidSequence { got: u32, total: u32 },
34
35    #[error("Too many fragments: {total} exceeds max {max}")]
36    TooManyFragments { total: u32, max: u32 },
37
38    #[error("Inconsistent metadata: expected total={expected}, got total={got}")]
39    InconsistentMetadata { expected: u32, got: u32 },
40
41    #[error("Serialization error: {0}")]
42    SerializationError(String),
43
44    #[error("Empty message")]
45    EmptyMessage,
46
47    #[error("Invalid chunk size: {0}")]
48    InvalidChunkSize(usize),
49
50    #[error("Internal logic error: {0}")]
51    InternalError(String),
52
53    #[error("Duplicate fragment seq={sequence} for message {message_id} with different data")]
54    DuplicateDataMismatch { message_id: u64, sequence: u32 },
55
56    #[error("Invalid FEC metadata: {reason}")]
57    InvalidFec { reason: String },
58
59    #[error("FEC decode failed: {0}")]
60    FecDecodeFailed(String),
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
64pub struct Fragment {
65    pub message_id: u64,
66    /// D+P when FEC is active.
67    pub total_fragments: u32,
68    /// 0-indexed. With FEC: 0..D-1 are data, D..D+P-1 are parity.
69    pub sequence: u32,
70    pub data: Vec<u8>,
71    /// Present on every FEC-protected fragment; `None` for non-FEC traffic.
72    pub fec: Option<super::fec::FecInfo>,
73}
74
75impl Fragment {
76    pub fn new(
77        message_id: u64,
78        total_fragments: u32,
79        sequence: u32,
80        data: Vec<u8>,
81    ) -> Result<Self, FragmentationError> {
82        if total_fragments > MAX_FRAGMENTS_PER_MESSAGE {
83            return Err(FragmentationError::TooManyFragments {
84                total: total_fragments,
85                max: MAX_FRAGMENTS_PER_MESSAGE,
86            });
87        }
88        if sequence >= total_fragments {
89            return Err(FragmentationError::InvalidSequence {
90                got: sequence,
91                total: total_fragments,
92            });
93        }
94        Ok(Self {
95            message_id,
96            total_fragments,
97            sequence,
98            data,
99            fec: None,
100        })
101    }
102
103    pub fn new_with_fec(
104        message_id: u64,
105        total_fragments: u32,
106        sequence: u32,
107        data: Vec<u8>,
108        fec_info: super::fec::FecInfo,
109    ) -> Result<Self, FragmentationError> {
110        if total_fragments > MAX_FRAGMENTS_PER_MESSAGE {
111            return Err(FragmentationError::TooManyFragments {
112                total: total_fragments,
113                max: MAX_FRAGMENTS_PER_MESSAGE,
114            });
115        }
116        if sequence >= total_fragments {
117            return Err(FragmentationError::InvalidSequence {
118                got: sequence,
119                total: total_fragments,
120            });
121        }
122        if fec_info.data_shard_count == 0 || fec_info.data_shard_count > total_fragments {
123            return Err(FragmentationError::InvalidFec {
124                reason: format!(
125                    "data_shard_count {} must be in 1..={}",
126                    fec_info.data_shard_count, total_fragments
127                ),
128            });
129        }
130        Ok(Self {
131            message_id,
132            total_fragments,
133            sequence,
134            data,
135            fec: Some(fec_info),
136        })
137    }
138
139    pub fn to_bytes(&self) -> Result<Vec<u8>, FragmentationError> {
140        bincode::serialize(self).map_err(|e| FragmentationError::SerializationError(e.to_string()))
141    }
142
143    pub fn from_bytes(bytes: &[u8]) -> Result<Self, FragmentationError> {
144        let frag: Fragment = bincode::DefaultOptions::new()
145            .with_limit(MAX_FRAGMENT_DESERIALIZE_SIZE)
146            .with_fixint_encoding()
147            .allow_trailing_bytes()
148            .deserialize(bytes)
149            .map_err(|e| FragmentationError::SerializationError(e.to_string()))?;
150        frag.validate()?;
151        Ok(frag)
152    }
153
154    pub fn validate(&self) -> Result<(), FragmentationError> {
155        if self.total_fragments > MAX_FRAGMENTS_PER_MESSAGE {
156            return Err(FragmentationError::TooManyFragments {
157                total: self.total_fragments,
158                max: MAX_FRAGMENTS_PER_MESSAGE,
159            });
160        }
161        if self.sequence >= self.total_fragments {
162            return Err(FragmentationError::InvalidSequence {
163                got: self.sequence,
164                total: self.total_fragments,
165            });
166        }
167        if let Some(ref fec_info) = self.fec {
168            if fec_info.data_shard_count == 0 || fec_info.data_shard_count > self.total_fragments {
169                return Err(FragmentationError::InvalidFec {
170                    reason: format!(
171                        "data_shard_count {} must be in 1..={}",
172                        fec_info.data_shard_count, self.total_fragments
173                    ),
174                });
175            }
176        }
177        Ok(())
178    }
179
180    #[must_use]
181    pub fn size(&self) -> usize {
182        let fec_overhead = if self.fec.is_some() { 13 } else { 1 };
183        FRAGMENT_OVERHEAD + self.data.len() + fec_overhead
184    }
185}
186
187#[derive(Debug, Clone)]
188pub struct Fragmenter {
189    max_message_size: usize,
190}
191
192impl Default for Fragmenter {
193    fn default() -> Self {
194        Self::new()
195    }
196}
197
198impl Fragmenter {
199    #[must_use]
200    pub fn new() -> Self {
201        Self {
202            max_message_size: MAX_MESSAGE_SIZE,
203        }
204    }
205
206    #[must_use]
207    pub fn with_max_size(max_message_size: usize) -> Self {
208        Self { max_message_size }
209    }
210
211    pub fn fragment(
212        &self,
213        message_id: u64,
214        data: &[u8],
215        max_chunk_size: usize,
216    ) -> Result<Vec<Fragment>, FragmentationError> {
217        if data.is_empty() {
218            return Err(FragmentationError::EmptyMessage);
219        }
220
221        if data.len() > self.max_message_size {
222            return Err(FragmentationError::MessageTooLarge {
223                size: data.len(),
224                max: self.max_message_size,
225            });
226        }
227
228        let usable_payload = max_chunk_size.saturating_sub(FRAGMENT_OVERHEAD);
229        if usable_payload == 0 {
230            return Err(FragmentationError::InvalidChunkSize(max_chunk_size));
231        }
232
233        let total_fragments = data.len().div_ceil(usable_payload);
234
235        if total_fragments > MAX_FRAGMENTS_PER_MESSAGE as usize {
236            return Err(FragmentationError::TooManyFragments {
237                total: total_fragments as u32,
238                max: MAX_FRAGMENTS_PER_MESSAGE,
239            });
240        }
241
242        let total_fragments = total_fragments as u32;
243        let mut fragments = Vec::with_capacity(total_fragments as usize);
244
245        for (seq, chunk) in data.chunks(usable_payload).enumerate() {
246            fragments.push(Fragment {
247                message_id,
248                total_fragments,
249                sequence: seq as u32,
250                data: chunk.to_vec(),
251                fec: None,
252            });
253        }
254
255        Ok(fragments)
256    }
257
258    #[must_use]
259    pub fn usable_payload_size(max_packet_size: usize) -> usize {
260        max_packet_size.saturating_sub(FRAGMENT_OVERHEAD)
261    }
262}
263
264#[derive(Debug)]
265struct ReassemblyBuffer {
266    fragments: HashMap<u32, Fragment>,
267    expected_total: u32,
268    received_count: u32,
269    buffered_bytes: usize,
270    created_at: Instant,
271    last_activity: Instant,
272    /// When present, completion requires D-of-(D+P) shards instead of all.
273    fec_info: Option<super::fec::FecInfo>,
274}
275
276impl ReassemblyBuffer {
277    fn new(first_fragment: &Fragment) -> Self {
278        let now = Instant::now();
279        Self {
280            fragments: HashMap::with_capacity(first_fragment.total_fragments as usize),
281            expected_total: first_fragment.total_fragments,
282            received_count: 0,
283            buffered_bytes: 0,
284            created_at: now,
285            last_activity: now,
286            fec_info: first_fragment.fec.clone(),
287        }
288    }
289
290    /// Returns true if this was a new (non-duplicate) fragment.
291    /// Rejects duplicate sequences with different data; accepts exact duplicates idempotently.
292    fn add(&mut self, fragment: Fragment) -> Result<bool, FragmentationError> {
293        if fragment.total_fragments != self.expected_total {
294            return Err(FragmentationError::InconsistentMetadata {
295                expected: self.expected_total,
296                got: fragment.total_fragments,
297            });
298        }
299
300        if let Some(ref incoming_fec) = fragment.fec {
301            match &self.fec_info {
302                Some(existing_fec) => {
303                    if incoming_fec != existing_fec {
304                        return Err(FragmentationError::InvalidFec {
305                            reason: format!(
306                                "FEC mismatch: buffer has D={} len={}, fragment has D={} len={}",
307                                existing_fec.data_shard_count,
308                                existing_fec.original_data_len,
309                                incoming_fec.data_shard_count,
310                                incoming_fec.original_data_len,
311                            ),
312                        });
313                    }
314                }
315                None => {
316                    self.fec_info = Some(incoming_fec.clone());
317                }
318            }
319        }
320
321        self.last_activity = Instant::now();
322
323        if let Some(existing) = self.fragments.get(&fragment.sequence) {
324            if existing.data != fragment.data {
325                return Err(FragmentationError::DuplicateDataMismatch {
326                    message_id: fragment.message_id,
327                    sequence: fragment.sequence,
328                });
329            }
330            return Ok(false);
331        }
332
333        self.buffered_bytes += fragment.size();
334        self.received_count += 1;
335        self.fragments.insert(fragment.sequence, fragment);
336
337        Ok(true)
338    }
339
340    fn is_complete(&self) -> bool {
341        match &self.fec_info {
342            Some(fec) => self.received_count >= fec.data_shard_count,
343            None => self.received_count == self.expected_total,
344        }
345    }
346
347    fn has_sequence(&self, sequence: u32) -> bool {
348        self.fragments.contains_key(&sequence)
349    }
350
351    fn assemble(mut self) -> Result<Vec<u8>, FragmentationError> {
352        match &self.fec_info {
353            None => {
354                let mut result = Vec::new();
355                for seq in 0..self.expected_total {
356                    if let Some(frag) = self.fragments.remove(&seq) {
357                        result.extend(frag.data);
358                    }
359                }
360                Ok(result)
361            }
362            Some(fec) => {
363                let d = fec.data_shard_count as usize;
364                let total = self.expected_total as usize;
365                let original_len = fec.original_data_len;
366
367                let mut shards: Vec<Option<Vec<u8>>> = (0..total)
368                    .map(|seq| self.fragments.remove(&(seq as u32)).map(|f| f.data))
369                    .collect();
370
371                super::fec::decode_shards(&mut shards, d, original_len)
372                    .map_err(|e| FragmentationError::FecDecodeFailed(e.to_string()))
373            }
374        }
375    }
376}
377
378#[derive(Debug, Clone)]
379pub struct ReassemblerConfig {
380    pub max_buffer_bytes: usize,
381    pub max_concurrent_messages: usize,
382    pub stale_timeout: Duration,
383}
384
385impl Default for ReassemblerConfig {
386    fn default() -> Self {
387        Self {
388            max_buffer_bytes: DEFAULT_MAX_BUFFER_BYTES,
389            max_concurrent_messages: DEFAULT_MAX_CONCURRENT_MESSAGES,
390            stale_timeout: Duration::from_secs(DEFAULT_STALE_TIMEOUT_SECS),
391        }
392    }
393}
394
395/// Reassembles fragments into complete messages with bounded memory, LRU eviction,
396/// stale pruning, and duplicate integrity checks.
397#[derive(Debug)]
398pub struct Reassembler {
399    buffers: HashMap<u64, ReassemblyBuffer>,
400    config: ReassemblerConfig,
401    total_buffered_bytes: usize,
402}
403
404impl Reassembler {
405    #[must_use]
406    pub fn new(config: ReassemblerConfig) -> Self {
407        Self {
408            buffers: HashMap::new(),
409            config,
410            total_buffered_bytes: 0,
411        }
412    }
413
414    /// Returns `Ok(Some(data))` if this fragment completed a message, `Ok(None)` if more needed.
415    pub fn add_fragment(
416        &mut self,
417        fragment: Fragment,
418    ) -> Result<Option<Vec<u8>>, FragmentationError> {
419        fragment.validate()?;
420
421        let message_id = fragment.message_id;
422        let fragment_size = fragment.size();
423
424        // Early duplicate detection before ensure_capacity() to avoid evicting valid sessions
425        if let Some(buffer) = self.buffers.get(&message_id) {
426            if buffer.has_sequence(fragment.sequence) {
427                let buffer = self.buffers.get_mut(&message_id).ok_or_else(|| {
428                    FragmentationError::InternalError(
429                        "Buffer vanished during duplicate check".to_string(),
430                    )
431                })?;
432                buffer.add(fragment)?;
433                return Ok(None);
434            }
435        }
436
437        self.ensure_capacity(fragment_size)?;
438
439        let buffer = self
440            .buffers
441            .entry(message_id)
442            .or_insert_with(|| ReassemblyBuffer::new(&fragment));
443
444        let is_new = buffer.add(fragment)?;
445
446        if is_new {
447            self.total_buffered_bytes += fragment_size;
448        }
449
450        let is_complete = buffer.is_complete();
451
452        if is_complete {
453            let buffer = self.buffers.remove(&message_id).ok_or_else(|| {
454                FragmentationError::InternalError("Buffer vanished during processing".to_string())
455            })?;
456
457            self.total_buffered_bytes = self
458                .total_buffered_bytes
459                .saturating_sub(buffer.buffered_bytes);
460            Ok(Some(buffer.assemble()?))
461        } else {
462            Ok(None)
463        }
464    }
465
466    fn ensure_capacity(&mut self, needed_bytes: usize) -> Result<(), FragmentationError> {
467        while (self.total_buffered_bytes + needed_bytes > self.config.max_buffer_bytes
468            || self.buffers.len() >= self.config.max_concurrent_messages)
469            && !self.buffers.is_empty()
470        {
471            self.evict_oldest();
472        }
473        Ok(())
474    }
475
476    fn evict_oldest(&mut self) {
477        if let Some((&oldest_id, _)) = self.buffers.iter().min_by_key(|(_, buf)| buf.last_activity)
478        {
479            if let Some(buffer) = self.buffers.remove(&oldest_id) {
480                self.total_buffered_bytes = self
481                    .total_buffered_bytes
482                    .saturating_sub(buffer.buffered_bytes);
483                tracing::debug!(
484                    message_id = oldest_id,
485                    bytes = buffer.buffered_bytes,
486                    "Evicted stale reassembly buffer"
487                );
488            }
489        }
490    }
491
492    pub fn prune_stale(&mut self, timeout: Duration) -> usize {
493        let now = Instant::now();
494        let stale_ids: Vec<u64> = self
495            .buffers
496            .iter()
497            .filter(|(_, buf)| now.duration_since(buf.created_at) > timeout)
498            .map(|(&id, _)| id)
499            .collect();
500
501        let count = stale_ids.len();
502        for id in stale_ids {
503            if let Some(buffer) = self.buffers.remove(&id) {
504                self.total_buffered_bytes = self
505                    .total_buffered_bytes
506                    .saturating_sub(buffer.buffered_bytes);
507                tracing::debug!(
508                    message_id = id,
509                    age_secs = now.duration_since(buffer.created_at).as_secs(),
510                    "Pruned stale reassembly buffer"
511                );
512            }
513        }
514        count
515    }
516
517    pub fn prune_stale_default(&mut self) -> usize {
518        self.prune_stale(self.config.stale_timeout)
519    }
520
521    #[must_use]
522    pub fn buffered_bytes(&self) -> usize {
523        self.total_buffered_bytes
524    }
525
526    #[must_use]
527    pub fn pending_count(&self) -> usize {
528        self.buffers.len()
529    }
530
531    #[must_use]
532    pub fn has_message(&self, message_id: u64) -> bool {
533        self.buffers.contains_key(&message_id)
534    }
535
536    #[must_use]
537    pub fn message_progress(&self, message_id: u64) -> Option<(u32, u32)> {
538        self.buffers
539            .get(&message_id)
540            .map(|buf| (buf.received_count, buf.expected_total))
541    }
542}
543
544#[cfg(test)]
545mod tests {
546    use super::*;
547
548    #[test]
549    fn test_fragment_roundtrip() {
550        let frag = Fragment::new(12345, 10, 5, vec![1, 2, 3, 4, 5]).unwrap();
551        let bytes = frag.to_bytes().unwrap();
552        let recovered = Fragment::from_bytes(&bytes).unwrap();
553        assert_eq!(frag, recovered);
554    }
555
556    #[test]
557    fn test_fragment_validation() {
558        assert!(Fragment::new(1, 10, 10, vec![]).is_err());
559        assert!(Fragment::new(1, 10, 15, vec![]).is_err());
560        assert!(Fragment::new(1, MAX_FRAGMENTS_PER_MESSAGE + 1, 0, vec![]).is_err());
561    }
562
563    #[test]
564    fn test_fragmenter_basic() {
565        let fragmenter = Fragmenter::new();
566        let data: Vec<u8> = (0..10_000).map(|i| (i % 256) as u8).collect();
567        let fragments = fragmenter.fragment(1, &data, 1000).unwrap();
568
569        assert!(fragments.len() > 1);
570        assert!(fragments.iter().all(|f| f.message_id == 1));
571        assert!(fragments
572            .iter()
573            .all(|f| f.total_fragments == fragments.len() as u32));
574
575        for (i, f) in fragments.iter().enumerate() {
576            assert_eq!(f.sequence, i as u32);
577        }
578    }
579
580    #[test]
581    fn test_fragmenter_empty_message() {
582        let fragmenter = Fragmenter::new();
583        assert!(matches!(
584            fragmenter.fragment(1, &[], 1000),
585            Err(FragmentationError::EmptyMessage)
586        ));
587    }
588
589    #[test]
590    fn test_fragmenter_message_too_large() {
591        let fragmenter = Fragmenter::with_max_size(1000);
592        let data = vec![0u8; 2000];
593        assert!(matches!(
594            fragmenter.fragment(1, &data, 100),
595            Err(FragmentationError::MessageTooLarge { .. })
596        ));
597    }
598
599    #[test]
600    fn test_reassembler_in_order() {
601        let fragmenter = Fragmenter::new();
602        let mut reassembler = Reassembler::new(ReassemblerConfig::default());
603
604        let original: Vec<u8> = (0..5000).map(|i| (i % 256) as u8).collect();
605        let fragments = fragmenter.fragment(42, &original, 500).unwrap();
606
607        for (i, frag) in fragments.into_iter().enumerate() {
608            let result = reassembler.add_fragment(frag).unwrap();
609            if i < 10 {
610                assert!(result.is_none());
611            }
612        }
613    }
614
615    #[test]
616    fn test_reassembler_out_of_order() {
617        let fragmenter = Fragmenter::new();
618        let mut reassembler = Reassembler::new(ReassemblerConfig::default());
619
620        let original: Vec<u8> = (0..5000).map(|i| (i % 256) as u8).collect();
621        let mut fragments = fragmenter.fragment(42, &original, 500).unwrap();
622        fragments.reverse();
623
624        let mut result = None;
625        for frag in fragments {
626            if let Some(data) = reassembler.add_fragment(frag).unwrap() {
627                result = Some(data);
628            }
629        }
630
631        assert_eq!(result.unwrap(), original);
632    }
633
634    #[test]
635    fn test_reassembler_duplicate_handling() {
636        let fragmenter = Fragmenter::new();
637        let mut reassembler = Reassembler::new(ReassemblerConfig::default());
638
639        let original: Vec<u8> = (0..1000).map(|i| (i % 256) as u8).collect();
640        let fragments = fragmenter.fragment(42, &original, 500).unwrap();
641
642        let first = fragments[0].clone();
643        assert!(reassembler.add_fragment(first.clone()).unwrap().is_none());
644        assert!(reassembler.add_fragment(first.clone()).unwrap().is_none());
645        assert!(reassembler.add_fragment(first).unwrap().is_none());
646
647        let mut result = None;
648        for frag in fragments.into_iter().skip(1) {
649            if let Some(data) = reassembler.add_fragment(frag).unwrap() {
650                result = Some(data);
651            }
652        }
653
654        assert_eq!(result.unwrap(), original);
655    }
656
657    #[test]
658    fn test_reassembler_inconsistent_metadata() {
659        let mut reassembler = Reassembler::new(ReassemblerConfig::default());
660
661        let frag1 = Fragment::new(100, 5, 0, vec![1, 2, 3]).unwrap();
662        reassembler.add_fragment(frag1).unwrap();
663
664        let frag2 = Fragment {
665            message_id: 100,
666            total_fragments: 10,
667            sequence: 1,
668            data: vec![4, 5, 6],
669            fec: None,
670        };
671
672        assert!(matches!(
673            reassembler.add_fragment(frag2),
674            Err(FragmentationError::InconsistentMetadata {
675                expected: 5,
676                got: 10
677            })
678        ));
679    }
680
681    #[test]
682    fn test_reassembler_memory_limit() {
683        let config = ReassemblerConfig {
684            max_buffer_bytes: 1000,
685            max_concurrent_messages: 2,
686            stale_timeout: Duration::from_secs(60),
687        };
688        let mut reassembler = Reassembler::new(config);
689
690        let frag1 = Fragment::new(1, 5, 0, vec![0u8; 200]).unwrap();
691        reassembler.add_fragment(frag1).unwrap();
692
693        let frag2 = Fragment::new(2, 5, 0, vec![0u8; 200]).unwrap();
694        reassembler.add_fragment(frag2).unwrap();
695
696        let frag3 = Fragment::new(3, 5, 0, vec![0u8; 200]).unwrap();
697        reassembler.add_fragment(frag3).unwrap();
698
699        assert!(!reassembler.has_message(1));
700        assert!(reassembler.has_message(2));
701        assert!(reassembler.has_message(3));
702    }
703
704    #[test]
705    fn test_reassembler_prune_stale() {
706        let config = ReassemblerConfig {
707            max_buffer_bytes: 10_000,
708            max_concurrent_messages: 100,
709            stale_timeout: Duration::from_millis(50),
710        };
711        let mut reassembler = Reassembler::new(config);
712
713        let frag = Fragment::new(999, 10, 0, vec![1, 2, 3]).unwrap();
714        reassembler.add_fragment(frag).unwrap();
715        assert_eq!(reassembler.pending_count(), 1);
716
717        std::thread::sleep(Duration::from_millis(100));
718        let pruned = reassembler.prune_stale(Duration::from_millis(50));
719        assert_eq!(pruned, 1);
720        assert_eq!(reassembler.pending_count(), 0);
721    }
722
723    #[test]
724    fn test_duplicate_sequence_same_data_accepted() {
725        let mut reassembler = Reassembler::new(ReassemblerConfig::default());
726
727        let frag = Fragment::new(1, 3, 0, vec![1, 2, 3]).unwrap();
728        assert!(reassembler.add_fragment(frag.clone()).unwrap().is_none());
729        assert!(reassembler.add_fragment(frag).unwrap().is_none());
730        assert_eq!(reassembler.message_progress(1), Some((1, 3)));
731    }
732
733    #[test]
734    fn test_duplicate_sequence_different_data_rejected() {
735        let mut reassembler = Reassembler::new(ReassemblerConfig::default());
736
737        let frag1 = Fragment::new(1, 3, 0, vec![1, 2, 3]).unwrap();
738        reassembler.add_fragment(frag1).unwrap();
739
740        let frag2 = Fragment {
741            message_id: 1,
742            total_fragments: 3,
743            sequence: 0,
744            data: vec![4, 5, 6],
745            fec: None,
746        };
747        assert!(matches!(
748            reassembler.add_fragment(frag2),
749            Err(FragmentationError::DuplicateDataMismatch {
750                message_id: 1,
751                sequence: 0,
752            })
753        ));
754    }
755
756    #[test]
757    fn test_duplicate_does_not_evict() {
758        let config = ReassemblerConfig {
759            max_buffer_bytes: 1000,
760            max_concurrent_messages: 2,
761            stale_timeout: Duration::from_secs(60),
762        };
763        let mut reassembler = Reassembler::new(config);
764
765        let frag1 = Fragment::new(1, 5, 0, vec![0u8; 200]).unwrap();
766        reassembler.add_fragment(frag1).unwrap();
767
768        let frag2 = Fragment::new(2, 5, 0, vec![0u8; 200]).unwrap();
769        reassembler.add_fragment(frag2).unwrap();
770
771        let dup = Fragment::new(1, 5, 0, vec![0u8; 200]).unwrap();
772        reassembler.add_fragment(dup).unwrap();
773
774        assert!(
775            reassembler.has_message(1),
776            "Message 1 should still be present"
777        );
778        assert!(
779            reassembler.has_message(2),
780            "Message 2 should still be present"
781        );
782    }
783
784    use super::super::fec::{encode_parity_shards, pad_to_uniform, FecInfo};
785
786    fn make_fec_fragments(
787        message_id: u64,
788        data: &[u8],
789        shard_size: usize,
790        parity_count: usize,
791    ) -> Vec<Fragment> {
792        let chunks: Vec<Vec<u8>> = data.chunks(shard_size).map(|c| c.to_vec()).collect();
793        let (padded, _) = pad_to_uniform(&chunks).unwrap();
794        let d = padded.len();
795
796        let parity = encode_parity_shards(&padded, parity_count).unwrap();
797        let total = (d + parity_count) as u32;
798
799        let fec_info = FecInfo {
800            data_shard_count: d as u32,
801            original_data_len: data.len() as u64,
802        };
803
804        let mut fragments = Vec::new();
805        for (seq, shard) in padded.into_iter().enumerate() {
806            fragments.push(Fragment {
807                message_id,
808                total_fragments: total,
809                sequence: seq as u32,
810                data: shard,
811                fec: Some(fec_info.clone()),
812            });
813        }
814        for (i, shard) in parity.into_iter().enumerate() {
815            fragments.push(Fragment {
816                message_id,
817                total_fragments: total,
818                sequence: (d + i) as u32,
819                data: shard,
820                fec: Some(fec_info.clone()),
821            });
822        }
823        fragments
824    }
825
826    #[test]
827    fn test_fec_all_data_present_fast_path() {
828        let mut reassembler = Reassembler::new(ReassemblerConfig::default());
829        let original: Vec<u8> = (0..300).map(|i| (i % 256) as u8).collect();
830        let fragments = make_fec_fragments(1, &original, 100, 2); // D=3, P=2
831
832        let mut result = None;
833        for frag in fragments {
834            if let Some(data) = reassembler.add_fragment(frag).unwrap() {
835                result = Some(data);
836            }
837        }
838
839        assert_eq!(result.unwrap(), original);
840    }
841
842    #[test]
843    fn test_fec_drop_one_data_shard_rs_recovery() {
844        let mut reassembler = Reassembler::new(ReassemblerConfig::default());
845        let original: Vec<u8> = (0..300).map(|i| (i % 256) as u8).collect();
846        let mut fragments = make_fec_fragments(1, &original, 100, 2); // D=3, P=2
847
848        fragments.remove(1);
849
850        let mut result = None;
851        for frag in fragments {
852            if let Some(data) = reassembler.add_fragment(frag).unwrap() {
853                result = Some(data);
854            }
855        }
856
857        assert_eq!(result.unwrap(), original);
858    }
859
860    #[test]
861    fn test_fec_drop_all_parity_fast_path() {
862        let mut reassembler = Reassembler::new(ReassemblerConfig::default());
863        let original: Vec<u8> = (0..300).map(|i| (i % 256) as u8).collect();
864        let fragments = make_fec_fragments(1, &original, 100, 2); // D=3, P=2
865
866        let mut result = None;
867        for frag in fragments.into_iter().take(3) {
868            if let Some(data) = reassembler.add_fragment(frag).unwrap() {
869                result = Some(data);
870            }
871        }
872
873        assert_eq!(result.unwrap(), original);
874    }
875
876    #[test]
877    fn test_fec_drop_p_shards_rs_recovers() {
878        let mut reassembler = Reassembler::new(ReassemblerConfig::default());
879        let original: Vec<u8> = (0..500).map(|i| (i % 256) as u8).collect();
880        let mut fragments = make_fec_fragments(1, &original, 100, 3); // D=5, P=3
881
882        fragments.remove(6);
883        fragments.remove(3);
884        fragments.remove(0);
885
886        let mut result = None;
887        for frag in fragments {
888            if let Some(data) = reassembler.add_fragment(frag).unwrap() {
889                result = Some(data);
890            }
891        }
892
893        assert_eq!(result.unwrap(), original);
894    }
895
896    #[test]
897    fn test_fec_drop_too_many_incomplete() {
898        let mut reassembler = Reassembler::new(ReassemblerConfig::default());
899        let original: Vec<u8> = (0..300).map(|i| (i % 256) as u8).collect();
900        let mut fragments = make_fec_fragments(1, &original, 100, 2); // D=3, P=2
901
902        fragments.remove(4);
903        fragments.remove(2);
904        fragments.remove(0);
905
906        let mut result = None;
907        for frag in fragments {
908            if let Some(data) = reassembler.add_fragment(frag).unwrap() {
909                result = Some(data);
910            }
911        }
912
913        assert!(result.is_none());
914        assert_eq!(reassembler.pending_count(), 1);
915    }
916
917    #[test]
918    fn test_fec_single_fragment_d1_p1() {
919        let mut reassembler = Reassembler::new(ReassemblerConfig::default());
920        let original = b"Small message".to_vec();
921        let fragments = make_fec_fragments(1, &original, original.len(), 1); // D=1, P=1
922
923        assert_eq!(fragments.len(), 2);
924
925        let parity = fragments[1].clone();
926        let result = reassembler.add_fragment(parity).unwrap();
927        assert!(result.is_some());
928        assert_eq!(result.unwrap(), original);
929    }
930
931    #[test]
932    fn test_fec_backward_compat_no_fec() {
933        let mut reassembler = Reassembler::new(ReassemblerConfig::default());
934        let original: Vec<u8> = (0..1000).map(|i| (i % 256) as u8).collect();
935
936        let fragmenter = Fragmenter::new();
937        let fragments = fragmenter.fragment(42, &original, 500).unwrap();
938
939        let mut result = None;
940        for frag in fragments {
941            assert!(frag.fec.is_none());
942            if let Some(data) = reassembler.add_fragment(frag).unwrap() {
943                result = Some(data);
944            }
945        }
946
947        assert_eq!(result.unwrap(), original);
948    }
949
950    #[test]
951    fn test_fec_consistency_validation() {
952        let mut reassembler = Reassembler::new(ReassemblerConfig::default());
953
954        let fec1 = FecInfo {
955            data_shard_count: 3,
956            original_data_len: 300,
957        };
958        let fec2 = FecInfo {
959            data_shard_count: 5,
960            original_data_len: 500,
961        };
962
963        let frag1 = Fragment {
964            message_id: 1,
965            total_fragments: 5,
966            sequence: 0,
967            data: vec![0u8; 100],
968            fec: Some(fec1),
969        };
970        reassembler.add_fragment(frag1).unwrap();
971
972        let frag2 = Fragment {
973            message_id: 1,
974            total_fragments: 5,
975            sequence: 1,
976            data: vec![0u8; 100],
977            fec: Some(fec2),
978        };
979
980        assert!(matches!(
981            reassembler.add_fragment(frag2),
982            Err(FragmentationError::InvalidFec { .. })
983        ));
984    }
985
986    #[test]
987    fn test_fec_mixed_data_parity_drops() {
988        let mut reassembler = Reassembler::new(ReassemblerConfig::default());
989        let original: Vec<u8> = (0..1000).map(|i| (i % 256) as u8).collect();
990        let mut fragments = make_fec_fragments(1, &original, 200, 3); // D=5, P=3
991
992        fragments.remove(6);
993        fragments.remove(2);
994        fragments.remove(0);
995
996        let mut result = None;
997        for frag in fragments {
998            if let Some(data) = reassembler.add_fragment(frag).unwrap() {
999                result = Some(data);
1000            }
1001        }
1002
1003        assert_eq!(result.unwrap(), original);
1004    }
1005
1006    #[test]
1007    fn test_fec_info_inconsistency_rejected() {
1008        let mut reassembler = Reassembler::new(ReassemblerConfig::default());
1009
1010        let fec_info_a = FecInfo {
1011            data_shard_count: 3,
1012            original_data_len: 600,
1013        };
1014        let fec_info_b = FecInfo {
1015            data_shard_count: 5,
1016            original_data_len: 600,
1017        };
1018
1019        let frag1 = Fragment {
1020            message_id: 42,
1021            total_fragments: 4,
1022            sequence: 0,
1023            data: vec![1u8; 200],
1024            fec: Some(fec_info_a),
1025        };
1026        reassembler.add_fragment(frag1).unwrap();
1027
1028        let frag2 = Fragment {
1029            message_id: 42,
1030            total_fragments: 4,
1031            sequence: 1,
1032            data: vec![2u8; 200],
1033            fec: Some(fec_info_b),
1034        };
1035        let result = reassembler.add_fragment(frag2);
1036        assert!(
1037            result.is_err(),
1038            "expected error on inconsistent FEC metadata, got Ok"
1039        );
1040    }
1041}