Skip to main content

zerodds_rtps/
fragment_assembler.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! Fragment-Reassembly fuer DDSI-RTPS 2.5 §8.4.14 auf Reader-Seite.
4//!
5//! Fuehrt pro in-flight Sample-SN einen `FragmentBuffer`, in den DATA_FRAG-
6//! Submessages eingespielt werden. Sobald alle Fragmente da sind, faellt
7//! ein vollstaendiger Sample heraus, den der `ReliableReader` wie ein
8//! regulaeres DATA behandelt.
9//!
10//! # DoS-Haltung
11//!
12//! Der Assembler muss Input von ungetrusteten Writers robust verarbeiten.
13//! Drei Caps schuetzen gegen pathologische Inputs:
14//!
15//! - `max_pending_sns`: Hoechstzahl gleichzeitig in Arbeit befindlicher
16//!   SNs. Ueberlauf verwirft die aelteste (kleinste) unvollstaendige SN.
17//! - `max_sample_bytes`: Obergrenze fuer `sample_size`. DATA_FRAGs mit
18//!   `sample_size > cap` werden verworfen **ohne** Allokation —
19//!   Schutz gegen "ich behaupte 4 GB sample und hoffe, dass du allokierst".
20//! - `max_fragment_size`: Obergrenze fuer `fragment_size`-Angaben vom
21//!   Writer. Uebliche MTU ist < 1500; wir akzeptieren bis 65535.
22//!
23//! Verworfene Fragmente werden in `drop_count` gezaehlt (Diagnose).
24
25extern crate alloc;
26use alloc::collections::{BTreeMap, BTreeSet};
27use alloc::vec;
28use alloc::vec::Vec;
29
30use crate::submessages::{DataFragSubmessage, FragmentNumberSet};
31use crate::wire_types::{FragmentNumber, SequenceNumber};
32
33/// Default-Cap fuer Anzahl gleichzeitig in-flight SNs.
34pub const DEFAULT_MAX_PENDING_SNS: usize = 64;
35/// Default-Cap fuer maximale Sample-Groesse (1 MiB). Groessere Samples
36/// sind in Phase 1 kein Use-Case; DDS-Security/Fragmentation auf
37/// grossen Images wartet auf Phase 2+.
38pub const DEFAULT_MAX_SAMPLE_BYTES: usize = 1024 * 1024;
39/// Default-Cap fuer `fragment_size` (u16-Maximum gemaess Spec).
40pub const DEFAULT_MAX_FRAGMENT_SIZE: u16 = u16::MAX;
41
42/// Ein vollstaendig reassemblierter Sample.
43#[derive(Debug, Clone, PartialEq, Eq)]
44pub struct CompletedSample {
45    /// Writer-Sequence-Number.
46    pub sequence_number: SequenceNumber,
47    /// Reassemblierter Payload (Gesamt-Sample-Bytes in Originalreihenfolge).
48    pub payload: Vec<u8>,
49}
50
51/// Konfiguration fuer den Assembler.
52#[derive(Debug, Clone, Copy)]
53pub struct AssemblerCaps {
54    /// Max. Anzahl gleichzeitiger SNs.
55    pub max_pending_sns: usize,
56    /// Max. sample_size in Bytes.
57    pub max_sample_bytes: usize,
58    /// Max. fragment_size in Bytes.
59    pub max_fragment_size: u16,
60}
61
62impl Default for AssemblerCaps {
63    /// Konservative Defaults fuer typische DDS-Workloads (1 MiB Samples,
64    /// 64 gleichzeitige in-flight SNs, u16-max Fragment-Size).
65    fn default() -> Self {
66        Self {
67            max_pending_sns: DEFAULT_MAX_PENDING_SNS,
68            max_sample_bytes: DEFAULT_MAX_SAMPLE_BYTES,
69            max_fragment_size: DEFAULT_MAX_FRAGMENT_SIZE,
70        }
71    }
72}
73
74/// Pro-SN-Ringpuffer fuer reinkommende Fragmente.
75#[derive(Debug, Clone)]
76struct FragmentBuffer {
77    sample_size: u32,
78    fragment_size: u16,
79    total_fragments: u32,
80    received: BTreeSet<FragmentNumber>,
81    data: Vec<u8>,
82}
83
84impl FragmentBuffer {
85    fn new(sample_size: u32, fragment_size: u16) -> Self {
86        let total = if fragment_size == 0 {
87            0
88        } else {
89            sample_size.div_ceil(u32::from(fragment_size))
90        };
91        Self {
92            sample_size,
93            fragment_size,
94            total_fragments: total,
95            received: BTreeSet::new(),
96            data: vec![0u8; sample_size as usize],
97        }
98    }
99
100    fn is_complete(&self) -> bool {
101        self.total_fragments > 0 && self.received.len() as u32 == self.total_fragments
102    }
103
104    fn missing(&self) -> FragmentNumberSet {
105        if self.total_fragments == 0 {
106            return FragmentNumberSet::from_missing(FragmentNumber(1), &[]);
107        }
108        let mut missing_nums = Vec::new();
109        for f in 1..=self.total_fragments {
110            let fnum = FragmentNumber(f);
111            if !self.received.contains(&fnum) {
112                missing_nums.push(fnum);
113            }
114        }
115        let base = missing_nums
116            .first()
117            .copied()
118            .unwrap_or(FragmentNumber(self.total_fragments.saturating_add(1)));
119        FragmentNumberSet::from_missing(base, &missing_nums)
120    }
121}
122
123/// Zurueckgewiesenes-Fragment-Kategorie — nur fuer Diagnostik.
124///
125/// **Neue Varianten ergaenzen**: pflegeaendernd auch die
126/// [`DropReason::as_str`]-Methode anpassen (exhaustive match), sonst
127/// bricht der Build. Das ist Absicht — so wird verhindert, dass neue
128/// Failure-Modes still in Logging-/Metrics-Pfaden verloren gehen.
129#[derive(Debug, Clone, Copy, PartialEq, Eq)]
130#[non_exhaustive]
131pub enum DropReason {
132    /// `sample_size` ueber Cap.
133    SampleTooLarge,
134    /// `fragment_size` ueber Cap oder == 0.
135    FragmentSizeInvalid,
136    /// `fragment_starting_num == 0` (1-basiert erwartet).
137    FragmentIndexZero,
138    /// Fragment-Index jenseits von `total_fragments`.
139    FragmentIndexOutOfRange,
140    /// Payload-Laenge passt nicht zu Fragment-Position.
141    PayloadSizeMismatch,
142    /// Spaeterer DataFrag widerspricht einem schon gespeicherten
143    /// (anderes `sample_size` oder `fragment_size`).
144    InconsistentWithBuffered,
145    /// `fragments_in_submessage == 0` oder inkonsistent.
146    FragmentsInSubmessageInvalid,
147    /// Anzahl gleichzeitig verwalteter SNs wuerde `max_pending_sns`
148    /// uebersteigen — aelteste unvollstaendige SN wurde verworfen.
149    PendingSnsCapExceeded,
150    /// `max_pending_sns == 0` — Assembler akzeptiert keine Eintraege.
151    AssemblerDisabled,
152}
153
154impl DropReason {
155    /// Stabile String-Repraesentation fuer Logging/Metrics. Exhaustives
156    /// Match — neue Varianten brechen hier absichtlich den Build.
157    #[must_use]
158    pub const fn as_str(self) -> &'static str {
159        match self {
160            Self::SampleTooLarge => "sample_too_large",
161            Self::FragmentSizeInvalid => "fragment_size_invalid",
162            Self::FragmentIndexZero => "fragment_index_zero",
163            Self::FragmentIndexOutOfRange => "fragment_index_out_of_range",
164            Self::PayloadSizeMismatch => "payload_size_mismatch",
165            Self::InconsistentWithBuffered => "inconsistent_with_buffered",
166            Self::FragmentsInSubmessageInvalid => "fragments_in_submessage_invalid",
167            Self::PendingSnsCapExceeded => "pending_sns_cap_exceeded",
168            Self::AssemblerDisabled => "assembler_disabled",
169        }
170    }
171}
172
173impl core::fmt::Display for DropReason {
174    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
175        f.write_str(self.as_str())
176    }
177}
178
179/// State eines Reassemblers.
180///
181/// `FragmentAssembler::default()` liefert einen Assembler mit
182/// [`AssemblerCaps::default`] — dem einzigen Defaults-Weg.
183#[derive(Debug, Clone, Default)]
184pub struct FragmentAssembler {
185    buffers: BTreeMap<SequenceNumber, FragmentBuffer>,
186    caps: AssemblerCaps,
187    drop_count: u64,
188    last_drop_reason: Option<DropReason>,
189}
190
191impl FragmentAssembler {
192    /// Erzeugt einen Assembler mit den gegebenen Caps.
193    #[must_use]
194    pub fn new(caps: AssemblerCaps) -> Self {
195        Self {
196            buffers: BTreeMap::new(),
197            caps,
198            drop_count: 0,
199            last_drop_reason: None,
200        }
201    }
202
203    /// Anzahl aktiver SNs.
204    #[must_use]
205    pub fn len(&self) -> usize {
206        self.buffers.len()
207    }
208
209    /// Ist der Assembler leer?
210    #[must_use]
211    pub fn is_empty(&self) -> bool {
212        self.buffers.is_empty()
213    }
214
215    /// Anzahl verworfener Fragmente seit Start (oder seit
216    /// [`reset_diagnostics`](Self::reset_diagnostics)).
217    #[must_use]
218    pub fn drop_count(&self) -> u64 {
219        self.drop_count
220    }
221
222    /// Der Grund des zuletzt verworfenen Fragments, falls ueberhaupt
223    /// eines verworfen wurde. Fuer Debugging/Metrics — nicht fuer
224    /// Control-Flow-Entscheidungen.
225    #[must_use]
226    pub fn last_drop_reason(&self) -> Option<DropReason> {
227        self.last_drop_reason
228    }
229
230    /// Setzt Diagnose-Zaehler auf 0 zurueck. `buffers` bleiben
231    /// unveraendert — das ist reine Metric-Hygiene (Long-Running-Reader
232    /// wollen ihre Delta-Snapshots).
233    pub fn reset_diagnostics(&mut self) {
234        self.drop_count = 0;
235        self.last_drop_reason = None;
236    }
237
238    /// True, wenn fuer mind. eine SN Fragmente fehlen.
239    #[must_use]
240    pub fn has_gaps(&self) -> bool {
241        self.buffers.values().any(|b| !b.is_complete())
242    }
243
244    /// Iteriert SNs, fuer die aktuell Fragment-Luecken existieren.
245    pub fn incomplete_sns(&self) -> impl Iterator<Item = SequenceNumber> + '_ {
246        self.buffers
247            .iter()
248            .filter(|(_, b)| !b.is_complete())
249            .map(|(sn, _)| *sn)
250    }
251
252    /// Fehlende Fragmente fuer eine SN. Liefert leeren Set, wenn SN
253    /// unbekannt oder bereits komplett.
254    #[must_use]
255    pub fn missing_fragments(&self, sn: SequenceNumber) -> FragmentNumberSet {
256        match self.buffers.get(&sn) {
257            Some(b) => b.missing(),
258            None => FragmentNumberSet::from_missing(FragmentNumber(1), &[]),
259        }
260    }
261
262    /// Entfernt den Buffer fuer diese SN (z.B. bei GAP-Signal oder nach
263    /// Completion). Gibt den Buffer zurueck, falls vorhanden.
264    pub fn discard(&mut self, sn: SequenceNumber) -> bool {
265        self.buffers.remove(&sn).is_some()
266    }
267
268    /// Spielt ein DATA_FRAG ein. Liefert bei Vervollstaendigung den
269    /// reassemblierten Sample.
270    ///
271    /// Inkonsistente oder pathologische Fragmente werden verworfen und
272    /// in `drop_count` gezaehlt — sie koennen nicht die interne Map
273    /// ueber die Caps hinaus wachsen lassen.
274    pub fn insert(&mut self, df: &DataFragSubmessage) -> Option<CompletedSample> {
275        // --- Eingangsvalidierung (no-alloc gate) ----------------------
276        if df.fragment_size == 0 || df.fragment_size > self.caps.max_fragment_size {
277            self.record_drop(DropReason::FragmentSizeInvalid);
278            return None;
279        }
280        if df.fragments_in_submessage == 0 {
281            self.record_drop(DropReason::FragmentsInSubmessageInvalid);
282            return None;
283        }
284        if df.sample_size as usize > self.caps.max_sample_bytes {
285            self.record_drop(DropReason::SampleTooLarge);
286            return None;
287        }
288        if df.fragment_starting_num.0 == 0 {
289            self.record_drop(DropReason::FragmentIndexZero);
290            return None;
291        }
292
293        // Vorab-Berechnungen
294        let total_fragments = df.sample_size.div_ceil(u32::from(df.fragment_size));
295        let last_frag = df
296            .fragment_starting_num
297            .0
298            .checked_add(u32::from(df.fragments_in_submessage) - 1)
299            .unwrap_or(u32::MAX);
300        if last_frag > total_fragments {
301            self.record_drop(DropReason::FragmentIndexOutOfRange);
302            return None;
303        }
304
305        // Cap: Anzahl gleichzeitig in-flight SNs.
306        if !self.buffers.contains_key(&df.writer_sn)
307            && self.buffers.len() >= self.caps.max_pending_sns
308        {
309            // Aelteste SN verwerfen — DoS-Schutz. Der betroffene Sample
310            // ist weg; der Reader muss das wie einen GAP-Zustand behandeln.
311            let Some(&oldest) = self.buffers.keys().next() else {
312                // Cap == 0: niemand darf rein.
313                self.record_drop(DropReason::AssemblerDisabled);
314                return None;
315            };
316            self.buffers.remove(&oldest);
317            self.record_drop(DropReason::PendingSnsCapExceeded);
318        }
319
320        // Buffer anlegen oder konsistent erweitern.
321        let buffer = match self.buffers.get_mut(&df.writer_sn) {
322            Some(existing) => {
323                if existing.sample_size != df.sample_size
324                    || existing.fragment_size != df.fragment_size
325                {
326                    self.record_drop(DropReason::InconsistentWithBuffered);
327                    return None;
328                }
329                existing
330            }
331            None => {
332                self.buffers.insert(
333                    df.writer_sn,
334                    FragmentBuffer::new(df.sample_size, df.fragment_size),
335                );
336                self.buffers.get_mut(&df.writer_sn)?
337            }
338        };
339
340        // Fragment-Bytes an die richtige Position schreiben.
341        let frag_size_usize = buffer.fragment_size as usize;
342        let frag_count = df.fragments_in_submessage as usize;
343        let first_idx = (df.fragment_starting_num.0 - 1) as usize;
344        let byte_start = first_idx * frag_size_usize;
345        let expected_last_frag = core::cmp::min(last_frag, buffer.total_fragments);
346        // Erwartete Payload-Laenge: frag_count-1 volle Fragmente + ggf.
347        // verkuerztes letztes Fragment (wenn last_frag == total_fragments).
348        let full_portion = (frag_count - 1) * frag_size_usize;
349        let tail_size = if expected_last_frag == buffer.total_fragments {
350            // Letztes Fragment des Samples darf kuerzer sein.
351            buffer.sample_size as usize - ((buffer.total_fragments - 1) as usize) * frag_size_usize
352        } else {
353            frag_size_usize
354        };
355        let expected_len = full_portion + tail_size;
356        if df.serialized_payload.len() != expected_len {
357            self.record_drop(DropReason::PayloadSizeMismatch);
358            return None;
359        }
360
361        // Schreiben
362        let data_end = byte_start + df.serialized_payload.len();
363        if data_end > buffer.data.len() {
364            self.record_drop(DropReason::PayloadSizeMismatch);
365            return None;
366        }
367        buffer.data[byte_start..data_end].copy_from_slice(&df.serialized_payload);
368        for f in 0..df.fragments_in_submessage as u32 {
369            buffer
370                .received
371                .insert(FragmentNumber(df.fragment_starting_num.0 + f));
372        }
373
374        if buffer.is_complete() {
375            // Buffer entnehmen und CompletedSample zurueckgeben.
376            let buf = self.buffers.remove(&df.writer_sn)?;
377            return Some(CompletedSample {
378                sequence_number: df.writer_sn,
379                payload: buf.data,
380            });
381        }
382        None
383    }
384
385    fn record_drop(&mut self, reason: DropReason) {
386        self.drop_count = self.drop_count.saturating_add(1);
387        self.last_drop_reason = Some(reason);
388    }
389}
390
391#[cfg(test)]
392#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
393mod tests {
394    use super::*;
395    use crate::wire_types::EntityId;
396
397    fn wid() -> EntityId {
398        EntityId::user_writer_with_key([0x10, 0x20, 0x30])
399    }
400    fn rid() -> EntityId {
401        EntityId::user_reader_with_key([0x40, 0x50, 0x60])
402    }
403
404    fn df(
405        sn: i64,
406        starting: u32,
407        count: u16,
408        frag_size: u16,
409        sample_size: u32,
410        payload: Vec<u8>,
411    ) -> DataFragSubmessage {
412        DataFragSubmessage {
413            extra_flags: 0,
414            reader_id: rid(),
415            writer_id: wid(),
416            writer_sn: SequenceNumber(sn),
417            fragment_starting_num: FragmentNumber(starting),
418            fragments_in_submessage: count,
419            fragment_size: frag_size,
420            sample_size,
421            serialized_payload: alloc::sync::Arc::from(payload),
422            inline_qos_flag: false,
423            hash_key_flag: false,
424            key_flag: false,
425            non_standard_flag: false,
426        }
427    }
428
429    #[test]
430    fn single_fragment_sample_completes_immediately() {
431        let mut a = FragmentAssembler::default();
432        // sample_size=4, frag_size=4 → 1 Fragment
433        let res = a.insert(&df(1, 1, 1, 4, 4, vec![1, 2, 3, 4]));
434        assert!(res.is_some());
435        let s = res.unwrap();
436        assert_eq!(s.sequence_number, SequenceNumber(1));
437        assert_eq!(s.payload, vec![1, 2, 3, 4]);
438        assert_eq!(a.len(), 0);
439    }
440
441    #[test]
442    fn two_fragments_complete_in_order() {
443        let mut a = FragmentAssembler::default();
444        assert!(a.insert(&df(1, 1, 1, 4, 8, vec![1, 2, 3, 4])).is_none());
445        let res = a.insert(&df(1, 2, 1, 4, 8, vec![5, 6, 7, 8])).unwrap();
446        assert_eq!(res.payload, vec![1, 2, 3, 4, 5, 6, 7, 8]);
447    }
448
449    #[test]
450    fn fragments_complete_out_of_order() {
451        let mut a = FragmentAssembler::default();
452        // 2 zuerst, dann 1, dann 3
453        assert!(a.insert(&df(1, 2, 1, 4, 10, vec![5, 6, 7, 8])).is_none());
454        assert!(a.insert(&df(1, 1, 1, 4, 10, vec![1, 2, 3, 4])).is_none());
455        let res = a.insert(&df(1, 3, 1, 4, 10, vec![9, 10])).unwrap();
456        assert_eq!(res.payload, vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
457    }
458
459    #[test]
460    fn last_fragment_shorter_than_fragment_size() {
461        let mut a = FragmentAssembler::default();
462        assert!(a.insert(&df(1, 1, 1, 4, 10, vec![1, 2, 3, 4])).is_none());
463        assert!(a.insert(&df(1, 2, 1, 4, 10, vec![5, 6, 7, 8])).is_none());
464        let res = a.insert(&df(1, 3, 1, 4, 10, vec![9, 10])).unwrap();
465        assert_eq!(res.payload.len(), 10);
466    }
467
468    #[test]
469    fn duplicate_fragment_is_idempotent() {
470        let mut a = FragmentAssembler::default();
471        assert!(a.insert(&df(1, 1, 1, 4, 8, vec![1, 2, 3, 4])).is_none());
472        assert!(a.insert(&df(1, 1, 1, 4, 8, vec![1, 2, 3, 4])).is_none());
473        assert_eq!(a.missing_fragments(SequenceNumber(1)).num_bits, 1);
474    }
475
476    #[test]
477    fn missing_fragments_enumerates_gaps() {
478        let mut a = FragmentAssembler::default();
479        // Fragment 2 fehlt
480        assert!(a.insert(&df(1, 1, 1, 4, 10, vec![1, 2, 3, 4])).is_none());
481        assert!(a.insert(&df(1, 3, 1, 4, 10, vec![9, 10])).is_none());
482        let ms = a.missing_fragments(SequenceNumber(1));
483        let collected: Vec<_> = ms.iter_set().collect();
484        assert_eq!(collected, vec![FragmentNumber(2)]);
485    }
486
487    #[test]
488    fn inconsistent_sample_size_drops_fragment() {
489        let mut a = FragmentAssembler::default();
490        assert!(a.insert(&df(1, 1, 1, 4, 8, vec![1, 2, 3, 4])).is_none());
491        // Zweiter Fragment meldet sample_size=12 statt 8 → verworfen
492        let res = a.insert(&df(1, 2, 1, 4, 12, vec![5, 6, 7, 8]));
493        assert!(res.is_none());
494        assert_eq!(a.drop_count(), 1);
495        // sn=1 ist weiter in-flight mit dem alten sample_size=8
496        assert_eq!(a.missing_fragments(SequenceNumber(1)).num_bits, 1);
497    }
498
499    #[test]
500    fn sample_too_large_drops_without_alloc() {
501        let caps = AssemblerCaps {
502            max_sample_bytes: 16,
503            ..AssemblerCaps::default()
504        };
505        let mut a = FragmentAssembler::new(caps);
506        // sample_size=100 > cap=16 → verworfen
507        assert!(a.insert(&df(1, 1, 1, 4, 100, vec![1, 2, 3, 4])).is_none());
508        assert!(a.is_empty());
509        assert_eq!(a.drop_count(), 1);
510    }
511
512    #[test]
513    fn fragment_size_zero_dropped() {
514        let mut a = FragmentAssembler::default();
515        // frag_size=0 → div by 0 vermeiden
516        assert!(a.insert(&df(1, 1, 1, 0, 4, vec![1, 2, 3, 4])).is_none());
517        assert_eq!(a.drop_count(), 1);
518    }
519
520    #[test]
521    fn fragment_index_zero_dropped() {
522        let mut a = FragmentAssembler::default();
523        assert!(a.insert(&df(1, 0, 1, 4, 4, vec![1, 2, 3, 4])).is_none());
524        assert_eq!(a.drop_count(), 1);
525    }
526
527    #[test]
528    fn fragment_index_out_of_range_dropped() {
529        let mut a = FragmentAssembler::default();
530        // sample_size=4, frag_size=4 → total=1, aber Index 2 angefragt
531        assert!(a.insert(&df(1, 2, 1, 4, 4, vec![0])).is_none());
532        assert_eq!(a.drop_count(), 1);
533    }
534
535    #[test]
536    fn payload_size_mismatch_dropped() {
537        let mut a = FragmentAssembler::default();
538        // frag_size=4 aber payload ist nur 2 Byte → mismatch
539        assert!(a.insert(&df(1, 1, 1, 4, 8, vec![1, 2])).is_none());
540        assert_eq!(a.drop_count(), 1);
541    }
542
543    #[test]
544    fn max_pending_sns_evicts_oldest() {
545        let caps = AssemblerCaps {
546            max_pending_sns: 2,
547            ..AssemblerCaps::default()
548        };
549        let mut a = FragmentAssembler::new(caps);
550        // SN 1, 2 offen (nur je Fragment 1 von 2 erhalten)
551        a.insert(&df(1, 1, 1, 4, 8, vec![1, 2, 3, 4]));
552        a.insert(&df(2, 1, 1, 4, 8, vec![1, 2, 3, 4]));
553        assert_eq!(a.len(), 2);
554        // SN 3 drueckt SN 1 raus
555        a.insert(&df(3, 1, 1, 4, 8, vec![1, 2, 3, 4]));
556        assert_eq!(a.len(), 2);
557        assert!(a.buffers.contains_key(&SequenceNumber(2)));
558        assert!(a.buffers.contains_key(&SequenceNumber(3)));
559        assert_eq!(a.drop_count(), 1);
560    }
561
562    #[test]
563    fn has_gaps_flips_to_false_after_completion() {
564        let mut a = FragmentAssembler::default();
565        a.insert(&df(1, 1, 1, 4, 8, vec![1, 2, 3, 4]));
566        assert!(a.has_gaps());
567        a.insert(&df(1, 2, 1, 4, 8, vec![5, 6, 7, 8]));
568        assert!(!a.has_gaps());
569    }
570
571    #[test]
572    fn incomplete_sns_enumerates_in_order() {
573        let mut a = FragmentAssembler::default();
574        a.insert(&df(5, 1, 1, 4, 8, vec![1, 2, 3, 4]));
575        a.insert(&df(2, 1, 1, 4, 8, vec![1, 2, 3, 4]));
576        let sns: Vec<_> = a.incomplete_sns().collect();
577        assert_eq!(sns, vec![SequenceNumber(2), SequenceNumber(5)]);
578    }
579
580    #[test]
581    fn discard_removes_buffer() {
582        let mut a = FragmentAssembler::default();
583        a.insert(&df(1, 1, 1, 4, 8, vec![1, 2, 3, 4]));
584        assert!(a.discard(SequenceNumber(1)));
585        assert!(a.is_empty());
586        assert!(!a.discard(SequenceNumber(1)));
587    }
588
589    #[test]
590    fn missing_for_unknown_sn_is_empty() {
591        let a = FragmentAssembler::default();
592        assert_eq!(a.missing_fragments(SequenceNumber(42)).num_bits, 0);
593    }
594
595    // ---- fragments_in_submessage > 1 (Bundle-Decode) ----
596
597    #[test]
598    fn bundled_fragments_all_full() {
599        // 3 Fragmente in einem Submessage, alle voll (kein tail).
600        // sample_size=18, frag_size=4, total=5. Wir bundeln Fragmente 1-3.
601        let mut a = FragmentAssembler::default();
602        let payload = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
603        let res = a.insert(&df(1, 1, 3, 4, 18, payload.clone()));
604        assert!(res.is_none(), "not yet complete");
605        // Fragmente 4, 5 fehlen noch
606        let ms: Vec<_> = a.missing_fragments(SequenceNumber(1)).iter_set().collect();
607        assert_eq!(ms, vec![FragmentNumber(4), FragmentNumber(5)]);
608    }
609
610    #[test]
611    fn bundled_fragments_including_last_with_tail() {
612        // 2 Fragmente in einem Submessage, inkl. letztem (Tail verkuerzt).
613        // sample_size=10, frag_size=4, total=3. Bundle: Fragmente 2-3.
614        let mut a = FragmentAssembler::default();
615        // Erst Fragment 1 vorlegen
616        assert!(
617            a.insert(&df(1, 1, 1, 4, 10, vec![0xA, 0xB, 0xC, 0xD]))
618                .is_none()
619        );
620        // Jetzt Bundle 2+3 (4 + 2 Byte = 6)
621        let bundle = vec![5, 6, 7, 8, 9, 10];
622        let res = a.insert(&df(1, 2, 2, 4, 10, bundle));
623        assert!(res.is_some());
624        let s = res.unwrap();
625        assert_eq!(s.payload, vec![0xA, 0xB, 0xC, 0xD, 5, 6, 7, 8, 9, 10]);
626    }
627
628    #[test]
629    fn bundled_fragments_payload_size_mismatch_rejected() {
630        // Bundle mit behaupteten 3 Fragmenten à 4 Byte = 12, aber
631        // nur 10 Byte geliefert.
632        let mut a = FragmentAssembler::default();
633        let payload = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
634        assert!(a.insert(&df(1, 1, 3, 4, 20, payload)).is_none());
635        assert_eq!(a.drop_count(), 1);
636        assert_eq!(a.last_drop_reason(), Some(DropReason::PayloadSizeMismatch));
637    }
638
639    // ---- last_drop_reason Diagnose ----
640
641    #[test]
642    fn last_drop_reason_tracks_most_recent() {
643        let mut a = FragmentAssembler::default();
644        assert_eq!(a.last_drop_reason(), None);
645        a.insert(&df(1, 0, 1, 4, 4, vec![1, 2, 3, 4]));
646        assert_eq!(a.last_drop_reason(), Some(DropReason::FragmentIndexZero));
647        a.insert(&df(1, 1, 1, 0, 4, vec![1, 2, 3, 4]));
648        assert_eq!(a.last_drop_reason(), Some(DropReason::FragmentSizeInvalid));
649    }
650
651    #[test]
652    fn pending_sns_cap_exceeded_uses_dedicated_reason() {
653        let caps = AssemblerCaps {
654            max_pending_sns: 1,
655            ..AssemblerCaps::default()
656        };
657        let mut a = FragmentAssembler::new(caps);
658        a.insert(&df(1, 1, 1, 4, 8, vec![1, 2, 3, 4]));
659        a.insert(&df(2, 1, 1, 4, 8, vec![1, 2, 3, 4]));
660        assert_eq!(
661            a.last_drop_reason(),
662            Some(DropReason::PendingSnsCapExceeded)
663        );
664    }
665
666    #[test]
667    fn default_assembler_uses_default_caps() {
668        // B2-Regression: Default-Trait muss einen funktionierenden
669        // Assembler liefern (nicht nur Zero-State, sondern korrekte Caps).
670        let mut a = FragmentAssembler::default();
671        assert!(a.is_empty());
672        // Typischer Fall: 1-Fragment-Sample complete
673        let res = a.insert(&df(1, 1, 1, 4, 4, vec![1, 2, 3, 4]));
674        assert!(res.is_some());
675    }
676
677    #[test]
678    fn reset_diagnostics_clears_counters_but_keeps_buffers() {
679        // B8-Regression: reset_diagnostics soll nur Metrik-State
680        // nullieren, in-flight Buffer bleiben erhalten.
681        let mut a = FragmentAssembler::default();
682        a.insert(&df(1, 0, 1, 4, 4, vec![1, 2, 3, 4])); // FragmentIndexZero → drop
683        a.insert(&df(2, 1, 1, 4, 8, vec![1, 2, 3, 4])); // partial buffer
684        assert_eq!(a.drop_count(), 1);
685        assert_eq!(a.len(), 1);
686        a.reset_diagnostics();
687        assert_eq!(a.drop_count(), 0);
688        assert!(a.last_drop_reason().is_none());
689        assert_eq!(a.len(), 1, "buffers must stay intact");
690    }
691
692    #[test]
693    fn max_pending_sns_zero_rejects_with_assembler_disabled() {
694        let caps = AssemblerCaps {
695            max_pending_sns: 0,
696            ..AssemblerCaps::default()
697        };
698        let mut a = FragmentAssembler::new(caps);
699        a.insert(&df(1, 1, 1, 4, 8, vec![1, 2, 3, 4]));
700        assert_eq!(a.last_drop_reason(), Some(DropReason::AssemblerDisabled));
701        assert!(a.is_empty());
702    }
703}