Skip to main content

dvb_si/carousel/
reassembler.rs

1//! Data-carousel module reassembly — collects [`DownloadDataBlock`]s into
2//! complete modules per the DII's `moduleSize`/`blockSize` announcement
3//! (`docs/iso_13818_6_carousel.md`, "Module reassembly").
4
5use std::collections::HashMap;
6
7use super::messages::{Dii, DownloadDataBlock};
8
9/// Identifies one module instance on the carousel.
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
11#[cfg_attr(feature = "serde", derive(serde::Serialize))]
12pub struct ModuleKey {
13    /// downloadId from the DII / DDB headers.
14    pub download_id: u32,
15    /// moduleId from the DII module entry.
16    pub module_id: u16,
17    /// moduleVersion — a version bump restarts collection.
18    pub module_version: u8,
19}
20
21/// A fully reassembled module.
22#[derive(Debug, Clone, PartialEq, Eq)]
23#[cfg_attr(feature = "serde", derive(serde::Serialize))]
24pub struct Module {
25    /// Identity of the completed module.
26    pub key: ModuleKey,
27    /// The `moduleSize` bytes, in order.
28    pub data: Vec<u8>,
29}
30
31/// Internal map key: one slot per module instance, version held in the
32/// [`Slot`] — so the stale-version check in `note_dii` is a single keyed
33/// lookup instead of a scan over every in-progress module.
34type SlotKey = (u32, u16); // (download_id, module_id)
35
36/// Per-module collection state. `received` is a bitset (one bit per block) so
37/// a hostile `blockSize = 1` DII costs ~1/8 of the module size in tracking
38/// overhead instead of one `bool` per byte.
39struct Slot {
40    module_version: u8,
41    block_size: usize,
42    data: Vec<u8>,
43    received: Vec<u64>,
44    n_blocks: usize,
45    remaining: usize,
46}
47
48impl Slot {
49    fn is_received(&self, n: usize) -> bool {
50        (self.received[n >> 6] >> (n & 63)) & 1 != 0
51    }
52    fn mark_received(&mut self, n: usize) {
53        self.received[n >> 6] |= 1 << (n & 63);
54    }
55}
56
57/// Default cap on a single module's announced `moduleSize`.
58pub const DEFAULT_MAX_MODULE_SIZE: u32 = 64 * 1024 * 1024;
59/// Default cap on the TOTAL bytes held across all in-progress modules — a
60/// hostile carousel rotating downloadId/moduleId/moduleVersion can otherwise
61/// multiply the per-module cap without bound.
62pub const DEFAULT_MAX_TOTAL_BYTES: usize = 256 * 1024 * 1024;
63/// Default cap on the number of in-progress module slots. The byte budget
64/// alone does not model the per-slot map entry + bitset overhead, so many
65/// distinct *small*-module announcements must be bounded separately. 16 384
66/// slots is far above any real carousel (a DII announces at most 65 535
67/// modules per `numberOfModules`, real ones tens) while capping worst-case
68/// map overhead at a few MiB.
69pub const DEFAULT_MAX_SLOTS: usize = 16 * 1024;
70
71/// Collects DDB blocks into complete modules.
72///
73/// Usage: call [`note_dii`](Self::note_dii) for every DII (repeats are
74/// idempotent; a changed `moduleVersion` restarts that module), then feed
75/// every DDB through [`feed_ddb`](Self::feed_ddb). DDBs for modules not yet
76/// announced by a DII are ignored — carousels repeat, so the block comes
77/// round again after the DII has been seen.
78///
79/// Memory bounds: each announced module is capped at `max_module_size`, the
80/// aggregate of all in-progress module buffers at `max_total_bytes`, and the
81/// number of in-progress slots at `max_slots` (the byte budget alone does not
82/// model per-slot map/bitset overhead, so many distinct small modules are
83/// bounded separately) — announcements that would exceed any cap are skipped
84/// until completed modules free space, and `moduleSize == 0` announcements
85/// are rejected outright. Block tracking is a bitset
86/// (~`moduleSize/blockSize/8` bytes), so the worst-case overhead for a
87/// `blockSize = 1` announcement is ~12.5% on top of the data buffer.
88///
89/// Liveness on lossy streams: the skip-until-space policy means that when the
90/// budget is held by large modules that never complete (sustained loss),
91/// later small announcements are starved until the large slots complete or
92/// are version-bumped. This is by design — drop and recreate the reassembler
93/// to recover from a wedged stream.
94pub struct ModuleReassembler {
95    slots: HashMap<SlotKey, Slot>,
96    max_module_size: u32,
97    max_total_bytes: usize,
98    max_slots: usize,
99    total_bytes: usize,
100}
101
102impl Default for ModuleReassembler {
103    fn default() -> Self {
104        Self::new()
105    }
106}
107
108impl ModuleReassembler {
109    /// New reassembler with [`DEFAULT_MAX_MODULE_SIZE`] and
110    /// [`DEFAULT_MAX_TOTAL_BYTES`].
111    #[must_use]
112    pub fn new() -> Self {
113        Self::with_limits(DEFAULT_MAX_MODULE_SIZE, DEFAULT_MAX_TOTAL_BYTES)
114    }
115
116    /// New reassembler with a custom per-module size cap (aggregate budget
117    /// stays at [`DEFAULT_MAX_TOTAL_BYTES`]).
118    #[must_use]
119    pub fn with_max_module_size(max_module_size: u32) -> Self {
120        Self::with_limits(max_module_size, DEFAULT_MAX_TOTAL_BYTES)
121    }
122
123    /// New reassembler with explicit per-module and aggregate byte caps
124    /// (slot-count cap stays at [`DEFAULT_MAX_SLOTS`]).
125    #[must_use]
126    pub fn with_limits(max_module_size: u32, max_total_bytes: usize) -> Self {
127        Self {
128            slots: HashMap::new(),
129            max_module_size,
130            max_total_bytes,
131            max_slots: DEFAULT_MAX_SLOTS,
132            total_bytes: 0,
133        }
134    }
135
136    /// Replace the in-progress slot-count cap (default
137    /// [`DEFAULT_MAX_SLOTS`]). Announcements past the cap are skipped until
138    /// completed modules free slots — the same skip-until-space policy as the
139    /// byte budget.
140    #[must_use]
141    pub fn with_max_slots(mut self, max_slots: usize) -> Self {
142        self.max_slots = max_slots;
143        self
144    }
145
146    /// Register the modules announced by a DII. Skipped: `moduleSize == 0`
147    /// (nothing to reassemble), modules over the per-module cap,
148    /// `blockSize == 0`, and modules that would push the aggregate budget
149    /// over `max_total_bytes` or the slot count to `max_slots`.
150    /// Re-announcement of an in-progress (same-version) module is a no-op; a
151    /// new version replaces the old slot (freeing its budget first).
152    pub fn note_dii(&mut self, dii: &Dii<'_>) {
153        for m in &dii.modules {
154            if m.module_size == 0 || m.module_size > self.max_module_size || dii.block_size == 0 {
155                continue;
156            }
157            let key: SlotKey = (dii.download_id, m.module_id);
158            if let Some(existing) = self.slots.get(&key) {
159                if existing.module_version == m.module_version {
160                    continue; // carousel repeat — keep accumulated blocks
161                }
162                // Older version — drop it, releasing its budget.
163                let s = self.slots.remove(&key).expect("just found");
164                self.total_bytes -= s.data.len();
165            }
166            let size = m.module_size as usize;
167            if self.total_bytes + size > self.max_total_bytes || self.slots.len() >= self.max_slots
168            {
169                continue; // budget or slot cap exhausted — skip until space frees
170            }
171            let block_size = dii.block_size as usize;
172            let n_blocks = size.div_ceil(block_size).max(1);
173            self.total_bytes += size;
174            self.slots.insert(
175                key,
176                Slot {
177                    module_version: m.module_version,
178                    block_size,
179                    data: vec![0u8; size],
180                    received: vec![0u64; n_blocks.div_ceil(64)],
181                    n_blocks,
182                    remaining: n_blocks,
183                },
184            );
185        }
186    }
187
188    /// Feed one DDB. Returns the completed [`Module`] when this block was the
189    /// last missing piece. Blocks for unknown (downloadId, moduleId, version)
190    /// triples, out-of-range block numbers, repeats, and blocks whose length
191    /// disagrees with the DII geometry are ignored.
192    pub fn feed_ddb(&mut self, ddb: &DownloadDataBlock<'_>) -> Option<Module> {
193        let key: SlotKey = (ddb.download_id, ddb.module_id);
194        let slot = self.slots.get_mut(&key)?;
195        let n = ddb.block_number as usize;
196        if slot.module_version != ddb.module_version || n >= slot.n_blocks || slot.is_received(n) {
197            return None;
198        }
199        let offset = n * slot.block_size;
200        let expected = (slot.data.len() - offset).min(slot.block_size);
201        if ddb.block_data.len() != expected {
202            return None; // disagrees with the announced geometry — corrupt
203        }
204        slot.data[offset..offset + expected].copy_from_slice(ddb.block_data);
205        slot.mark_received(n);
206        slot.remaining -= 1;
207        if slot.remaining > 0 {
208            return None;
209        }
210        let slot = self.slots.remove(&key).expect("slot exists");
211        self.total_bytes -= slot.data.len();
212        Some(Module {
213            key: ModuleKey {
214                download_id: ddb.download_id,
215                module_id: ddb.module_id,
216                module_version: slot.module_version,
217            },
218            data: slot.data,
219        })
220    }
221
222    /// Number of modules currently being collected.
223    #[must_use]
224    pub fn pending(&self) -> usize {
225        self.slots.len()
226    }
227
228    /// Total announced `moduleSize` bytes currently held by in-progress
229    /// module buffers — the quantity charged against `max_total_bytes`.
230    ///
231    /// This counts data-buffer bytes only, not the per-slot map entry and
232    /// block-bitset overhead, so it understates true retained memory (the
233    /// slot-count cap bounds that overhead instead). Do not use it as a
234    /// memory-pressure signal; use [`pending`](Self::pending) × expected
235    /// module size for a rough upper bound.
236    #[must_use]
237    pub fn pending_bytes(&self) -> usize {
238        self.total_bytes
239    }
240}
241
242#[cfg(test)]
243mod tests {
244    use super::super::messages::DiiModule;
245    use super::*;
246    use crate::compatibility::CompatibilityDescriptor;
247
248    fn dii(download_id: u32, block_size: u16, modules: Vec<DiiModule<'static>>) -> Dii<'static> {
249        Dii {
250            transaction_id: 0x8000_0002,
251            adaptation: &[],
252            download_id,
253            block_size,
254            window_size: 0,
255            ack_period: 0,
256            t_c_download_window: 0,
257            t_c_download_scenario: 0,
258            compatibility_descriptor: CompatibilityDescriptor {
259                descriptors: vec![],
260            },
261            modules,
262            private_data: &[],
263        }
264    }
265
266    fn module(module_id: u16, module_size: u32, module_version: u8) -> DiiModule<'static> {
267        DiiModule {
268            module_id,
269            module_size,
270            module_version,
271            module_info: &[],
272        }
273    }
274
275    fn ddb(
276        download_id: u32,
277        module_id: u16,
278        module_version: u8,
279        block_number: u16,
280        block_data: &[u8],
281    ) -> DownloadDataBlock<'_> {
282        DownloadDataBlock {
283            download_id,
284            adaptation: &[],
285            module_id,
286            module_version,
287            block_number,
288            block_data,
289        }
290    }
291
292    #[test]
293    fn two_block_module_completes() {
294        let mut r = ModuleReassembler::new();
295        r.note_dii(&dii(1, 4, vec![module(7, 6, 0)]));
296        assert!(r.feed_ddb(&ddb(1, 7, 0, 0, &[1, 2, 3, 4])).is_none());
297        let m = r.feed_ddb(&ddb(1, 7, 0, 1, &[5, 6])).expect("complete");
298        assert_eq!(m.key.module_id, 7);
299        assert_eq!(m.data, vec![1, 2, 3, 4, 5, 6]);
300        assert_eq!(r.pending(), 0);
301    }
302
303    #[test]
304    fn out_of_order_blocks_complete() {
305        let mut r = ModuleReassembler::new();
306        r.note_dii(&dii(1, 2, vec![module(1, 4, 0)]));
307        assert!(r.feed_ddb(&ddb(1, 1, 0, 1, &[3, 4])).is_none());
308        let m = r.feed_ddb(&ddb(1, 1, 0, 0, &[1, 2])).expect("complete");
309        assert_eq!(m.data, vec![1, 2, 3, 4]);
310    }
311
312    #[test]
313    fn ddb_before_dii_is_ignored() {
314        let mut r = ModuleReassembler::new();
315        assert!(r.feed_ddb(&ddb(1, 1, 0, 0, &[1, 2])).is_none());
316        // After the DII arrives, the carousel repeat completes it.
317        r.note_dii(&dii(1, 2, vec![module(1, 2, 0)]));
318        assert!(r.feed_ddb(&ddb(1, 1, 0, 0, &[1, 2])).is_some());
319    }
320
321    #[test]
322    fn version_mismatch_ignored_and_new_version_restarts() {
323        let mut r = ModuleReassembler::new();
324        r.note_dii(&dii(1, 2, vec![module(1, 4, 0)]));
325        assert!(r.feed_ddb(&ddb(1, 1, 0, 0, &[1, 2])).is_none());
326        // DDB with a different version is not accepted into the v0 slot.
327        assert!(r.feed_ddb(&ddb(1, 1, 3, 1, &[9, 9])).is_none());
328        // A DII announcing v3 replaces the v0 slot entirely.
329        r.note_dii(&dii(1, 2, vec![module(1, 4, 3)]));
330        assert_eq!(r.pending(), 1);
331        assert!(r.feed_ddb(&ddb(1, 1, 3, 0, &[5, 6])).is_none());
332        let m = r.feed_ddb(&ddb(1, 1, 3, 1, &[7, 8])).expect("complete");
333        assert_eq!(m.key.module_version, 3);
334        assert_eq!(m.data, vec![5, 6, 7, 8]);
335    }
336
337    #[test]
338    fn repeated_dii_keeps_progress() {
339        let mut r = ModuleReassembler::new();
340        let d = dii(1, 2, vec![module(1, 4, 0)]);
341        r.note_dii(&d);
342        assert!(r.feed_ddb(&ddb(1, 1, 0, 0, &[1, 2])).is_none());
343        r.note_dii(&d); // carousel repeat
344        let m = r.feed_ddb(&ddb(1, 1, 0, 1, &[3, 4])).expect("complete");
345        assert_eq!(m.data, vec![1, 2, 3, 4]);
346    }
347
348    #[test]
349    fn duplicate_and_out_of_range_blocks_ignored() {
350        let mut r = ModuleReassembler::new();
351        r.note_dii(&dii(1, 2, vec![module(1, 4, 0)]));
352        assert!(r.feed_ddb(&ddb(1, 1, 0, 0, &[1, 2])).is_none());
353        assert!(r.feed_ddb(&ddb(1, 1, 0, 0, &[1, 2])).is_none()); // dup
354        assert!(r.feed_ddb(&ddb(1, 1, 0, 9, &[9, 9])).is_none()); // range
355        assert_eq!(r.pending(), 1);
356    }
357
358    #[test]
359    fn wrong_block_length_ignored() {
360        let mut r = ModuleReassembler::new();
361        r.note_dii(&dii(1, 4, vec![module(1, 6, 0)]));
362        // Block 0 must be exactly blockSize (4); block 1 exactly 2.
363        assert!(r.feed_ddb(&ddb(1, 1, 0, 0, &[1, 2, 3])).is_none());
364        assert!(r.feed_ddb(&ddb(1, 1, 0, 1, &[5, 6, 7])).is_none());
365        assert_eq!(r.pending(), 1);
366        assert!(r.feed_ddb(&ddb(1, 1, 0, 0, &[1, 2, 3, 4])).is_none());
367        assert!(r.feed_ddb(&ddb(1, 1, 0, 1, &[5, 6])).is_some());
368    }
369
370    #[test]
371    fn oversize_module_skipped() {
372        let mut r = ModuleReassembler::with_max_module_size(8);
373        r.note_dii(&dii(1, 4, vec![module(1, 9, 0), module(2, 8, 0)]));
374        assert_eq!(r.pending(), 1); // only module 2 within the cap
375    }
376
377    #[test]
378    fn zero_block_size_skipped() {
379        let mut r = ModuleReassembler::new();
380        r.note_dii(&dii(1, 0, vec![module(1, 4, 0)]));
381        assert_eq!(r.pending(), 0);
382    }
383
384    /// `moduleSize == 0` announcements are rejected: a module with no data has
385    /// nothing to reassemble, and zero-size slots cost zero budget — a hostile
386    /// carousel rotating ids could otherwise grow the slot map without bound.
387    #[test]
388    fn zero_size_module_announcement_ignored() {
389        let mut r = ModuleReassembler::new();
390        r.note_dii(&dii(1, 4, vec![module(1, 0, 0)]));
391        assert_eq!(r.pending(), 0);
392        assert!(r.feed_ddb(&ddb(1, 1, 0, 0, &[])).is_none());
393    }
394
395    /// The slot-count cap bounds the map itself: data bytes alone don't model
396    /// the per-slot map/bitset overhead, so many distinct small-module
397    /// announcements must also be bounded. Completing a module frees its slot
398    /// for later announcements.
399    #[test]
400    fn slot_count_capped() {
401        let mut r = ModuleReassembler::new().with_max_slots(3);
402        let modules: Vec<_> = (0..5).map(|i| module(i, 1, 0)).collect();
403        r.note_dii(&dii(1, 4, modules));
404        assert_eq!(r.pending(), 3); // first three announcements; rest skipped
405                                    // Completing one frees a slot...
406        assert!(r.feed_ddb(&ddb(1, 0, 0, 0, &[0xAA])).is_some());
407        assert_eq!(r.pending(), 2);
408        // ...so a re-announcement of a skipped module now fits.
409        r.note_dii(&dii(1, 4, vec![module(4, 1, 0)]));
410        assert_eq!(r.pending(), 3);
411    }
412
413    /// The aggregate budget bounds rotating-key amplification: announcements
414    /// past `max_total_bytes` are skipped, and completing a module frees its
415    /// budget for later announcements.
416    #[test]
417    fn aggregate_budget_bounds_total_memory() {
418        let mut r = ModuleReassembler::with_limits(8, 10);
419        r.note_dii(&dii(1, 4, vec![module(1, 8, 0)]));
420        assert_eq!(r.pending_bytes(), 8);
421        // A second module would exceed the 10-byte aggregate budget — skipped,
422        // even though it is within the per-module cap.
423        r.note_dii(&dii(2, 4, vec![module(1, 8, 0)]));
424        assert_eq!(r.pending(), 1);
425        assert_eq!(r.pending_bytes(), 8);
426        // Completing the first frees its budget...
427        assert!(r.feed_ddb(&ddb(1, 1, 0, 0, &[0; 4])).is_none());
428        assert!(r.feed_ddb(&ddb(1, 1, 0, 1, &[0; 4])).is_some());
429        assert_eq!(r.pending_bytes(), 0);
430        // ...so the repeat announcement now fits.
431        r.note_dii(&dii(2, 4, vec![module(1, 8, 0)]));
432        assert_eq!(r.pending(), 1);
433        assert_eq!(r.pending_bytes(), 8);
434    }
435
436    /// A version bump releases the old slot's budget before charging the new.
437    #[test]
438    fn version_replacement_releases_budget() {
439        let mut r = ModuleReassembler::with_limits(8, 8);
440        r.note_dii(&dii(1, 4, vec![module(1, 8, 0)]));
441        assert_eq!(r.pending_bytes(), 8);
442        r.note_dii(&dii(1, 4, vec![module(1, 8, 1)]));
443        assert_eq!(r.pending(), 1);
444        assert_eq!(r.pending_bytes(), 8); // replaced, not doubled
445    }
446
447    /// blockSize=1 tracking is a bitset — the structure stays usable and the
448    /// dup-guard still holds at single-byte granularity.
449    #[test]
450    fn block_size_one_uses_bitset() {
451        let mut r = ModuleReassembler::new();
452        r.note_dii(&dii(1, 1, vec![module(1, 130, 0)]));
453        for i in 0..129u16 {
454            assert!(r.feed_ddb(&ddb(1, 1, 0, i, &[i as u8])).is_none());
455            // duplicate of the same block is ignored
456            assert!(r.feed_ddb(&ddb(1, 1, 0, i, &[i as u8])).is_none());
457        }
458        let m = r.feed_ddb(&ddb(1, 1, 0, 129, &[0x81])).expect("complete");
459        assert_eq!(m.data.len(), 130);
460        assert_eq!(m.data[129], 0x81);
461    }
462}