Skip to main content

dvb_si/carousel/
reassembler.rs

1//! Data-carousel module reassembly — collects [`DownloadDataBlock`]s into
2//! complete modules per the DII's `moduleSize`/`blockSize` announcement
3//! (`docs/iso_13818_6_carousel.md`, "Module reassembly").
4
5use std::collections::HashMap;
6
7use super::messages::{Dii, DownloadDataBlock};
8
9/// Identifies one module instance on the carousel.
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
11#[cfg_attr(feature = "serde", derive(serde::Serialize))]
12pub struct ModuleKey {
13    /// downloadId from the DII / DDB headers.
14    pub download_id: u32,
15    /// moduleId from the DII module entry.
16    pub module_id: u16,
17    /// moduleVersion — a version bump restarts collection.
18    pub module_version: u8,
19}
20
21/// A fully reassembled module.
22#[derive(Debug, Clone, PartialEq, Eq)]
23#[cfg_attr(feature = "serde", derive(serde::Serialize))]
24pub struct Module {
25    /// Identity of the completed module.
26    pub key: ModuleKey,
27    /// The `moduleSize` bytes, in order.
28    pub data: Vec<u8>,
29}
30
31/// Per-module collection state. `received` is a bitset (one bit per block) so
32/// a hostile `blockSize = 1` DII costs ~1/8 of the module size in tracking
33/// overhead instead of one `bool` per byte.
34struct Slot {
35    block_size: usize,
36    data: Vec<u8>,
37    received: Vec<u64>,
38    n_blocks: usize,
39    remaining: usize,
40}
41
42impl Slot {
43    fn is_received(&self, n: usize) -> bool {
44        (self.received[n >> 6] >> (n & 63)) & 1 != 0
45    }
46    fn mark_received(&mut self, n: usize) {
47        self.received[n >> 6] |= 1 << (n & 63);
48    }
49}
50
51/// Default cap on a single module's announced `moduleSize`.
52pub const DEFAULT_MAX_MODULE_SIZE: u32 = 64 * 1024 * 1024;
53/// Default cap on the TOTAL bytes held across all in-progress modules — a
54/// hostile carousel rotating downloadId/moduleId/moduleVersion can otherwise
55/// multiply the per-module cap without bound.
56pub const DEFAULT_MAX_TOTAL_BYTES: usize = 256 * 1024 * 1024;
57
58/// Collects DDB blocks into complete modules.
59///
60/// Usage: call [`note_dii`](Self::note_dii) for every DII (repeats are
61/// idempotent; a changed `moduleVersion` restarts that module), then feed
62/// every DDB through [`feed_ddb`](Self::feed_ddb). DDBs for modules not yet
63/// announced by a DII are ignored — carousels repeat, so the block comes
64/// round again after the DII has been seen.
65///
66/// Memory bounds: each announced module is capped at `max_module_size`, and
67/// the aggregate of all in-progress module buffers at `max_total_bytes` —
68/// announcements that would exceed the budget are skipped until completed
69/// modules free space. Block tracking is a bitset (~`moduleSize/blockSize/8`
70/// bytes), so the worst-case overhead for a `blockSize = 1` announcement is
71/// ~12.5% on top of the data buffer.
72pub struct ModuleReassembler {
73    slots: HashMap<ModuleKey, Slot>,
74    max_module_size: u32,
75    max_total_bytes: usize,
76    total_bytes: usize,
77}
78
79impl Default for ModuleReassembler {
80    fn default() -> Self {
81        Self::new()
82    }
83}
84
85impl ModuleReassembler {
86    /// New reassembler with [`DEFAULT_MAX_MODULE_SIZE`] and
87    /// [`DEFAULT_MAX_TOTAL_BYTES`].
88    #[must_use]
89    pub fn new() -> Self {
90        Self::with_limits(DEFAULT_MAX_MODULE_SIZE, DEFAULT_MAX_TOTAL_BYTES)
91    }
92
93    /// New reassembler with a custom per-module size cap (aggregate budget
94    /// stays at [`DEFAULT_MAX_TOTAL_BYTES`]).
95    #[must_use]
96    pub fn with_max_module_size(max_module_size: u32) -> Self {
97        Self::with_limits(max_module_size, DEFAULT_MAX_TOTAL_BYTES)
98    }
99
100    /// New reassembler with explicit per-module and aggregate byte caps.
101    #[must_use]
102    pub fn with_limits(max_module_size: u32, max_total_bytes: usize) -> Self {
103        Self {
104            slots: HashMap::new(),
105            max_module_size,
106            max_total_bytes,
107            total_bytes: 0,
108        }
109    }
110
111    /// Register the modules announced by a DII. Skipped: modules over the
112    /// per-module cap, `blockSize == 0`, and modules that would push the
113    /// aggregate budget over `max_total_bytes`. Re-announcement of an
114    /// in-progress (same-version) module is a no-op; a new version replaces
115    /// the old slot (freeing its budget first).
116    pub fn note_dii(&mut self, dii: &Dii<'_>) {
117        for m in &dii.modules {
118            if m.module_size > self.max_module_size || dii.block_size == 0 {
119                continue;
120            }
121            let key = ModuleKey {
122                download_id: dii.download_id,
123                module_id: m.module_id,
124                module_version: m.module_version,
125            };
126            // Drop any older version of the same module, releasing its budget.
127            let stale: Vec<ModuleKey> = self
128                .slots
129                .keys()
130                .filter(|k| {
131                    k.download_id == key.download_id
132                        && k.module_id == key.module_id
133                        && k.module_version != key.module_version
134                })
135                .copied()
136                .collect();
137            for k in stale {
138                if let Some(s) = self.slots.remove(&k) {
139                    self.total_bytes -= s.data.len();
140                }
141            }
142            if self.slots.contains_key(&key) {
143                continue; // carousel repeat — keep accumulated blocks
144            }
145            let size = m.module_size as usize;
146            if self.total_bytes + size > self.max_total_bytes {
147                continue; // aggregate budget exhausted — skip until space frees
148            }
149            let block_size = dii.block_size as usize;
150            let n_blocks = size.div_ceil(block_size).max(1);
151            self.total_bytes += size;
152            self.slots.insert(
153                key,
154                Slot {
155                    block_size,
156                    data: vec![0u8; size],
157                    received: vec![0u64; n_blocks.div_ceil(64)],
158                    n_blocks,
159                    remaining: n_blocks,
160                },
161            );
162        }
163    }
164
165    /// Feed one DDB. Returns the completed [`Module`] when this block was the
166    /// last missing piece. Blocks for unknown (downloadId, moduleId, version)
167    /// triples, out-of-range block numbers, repeats, and blocks whose length
168    /// disagrees with the DII geometry are ignored.
169    pub fn feed_ddb(&mut self, ddb: &DownloadDataBlock<'_>) -> Option<Module> {
170        let key = ModuleKey {
171            download_id: ddb.download_id,
172            module_id: ddb.module_id,
173            module_version: ddb.module_version,
174        };
175        let slot = self.slots.get_mut(&key)?;
176        let n = ddb.block_number as usize;
177        if n >= slot.n_blocks || slot.is_received(n) {
178            return None;
179        }
180        let offset = n * slot.block_size;
181        let expected = (slot.data.len() - offset).min(slot.block_size);
182        if ddb.block_data.len() != expected {
183            return None; // disagrees with the announced geometry — corrupt
184        }
185        slot.data[offset..offset + expected].copy_from_slice(ddb.block_data);
186        slot.mark_received(n);
187        slot.remaining -= 1;
188        if slot.remaining > 0 {
189            return None;
190        }
191        let slot = self.slots.remove(&key).expect("slot exists");
192        self.total_bytes -= slot.data.len();
193        Some(Module {
194            key,
195            data: slot.data,
196        })
197    }
198
199    /// Number of modules currently being collected.
200    #[must_use]
201    pub fn pending(&self) -> usize {
202        self.slots.len()
203    }
204
205    /// Total bytes currently held by in-progress module buffers.
206    #[must_use]
207    pub fn pending_bytes(&self) -> usize {
208        self.total_bytes
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use super::super::messages::DiiModule;
215    use super::*;
216
217    fn dii(download_id: u32, block_size: u16, modules: Vec<DiiModule<'static>>) -> Dii<'static> {
218        Dii {
219            transaction_id: 0x8000_0002,
220            adaptation: &[],
221            download_id,
222            block_size,
223            window_size: 0,
224            ack_period: 0,
225            t_c_download_window: 0,
226            t_c_download_scenario: 0,
227            compatibility_descriptor: &[],
228            modules,
229            private_data: &[],
230        }
231    }
232
233    fn module(module_id: u16, module_size: u32, module_version: u8) -> DiiModule<'static> {
234        DiiModule {
235            module_id,
236            module_size,
237            module_version,
238            module_info: &[],
239        }
240    }
241
242    fn ddb(
243        download_id: u32,
244        module_id: u16,
245        module_version: u8,
246        block_number: u16,
247        block_data: &[u8],
248    ) -> DownloadDataBlock<'_> {
249        DownloadDataBlock {
250            download_id,
251            adaptation: &[],
252            module_id,
253            module_version,
254            block_number,
255            block_data,
256        }
257    }
258
259    #[test]
260    fn two_block_module_completes() {
261        let mut r = ModuleReassembler::new();
262        r.note_dii(&dii(1, 4, vec![module(7, 6, 0)]));
263        assert!(r.feed_ddb(&ddb(1, 7, 0, 0, &[1, 2, 3, 4])).is_none());
264        let m = r.feed_ddb(&ddb(1, 7, 0, 1, &[5, 6])).expect("complete");
265        assert_eq!(m.key.module_id, 7);
266        assert_eq!(m.data, vec![1, 2, 3, 4, 5, 6]);
267        assert_eq!(r.pending(), 0);
268    }
269
270    #[test]
271    fn out_of_order_blocks_complete() {
272        let mut r = ModuleReassembler::new();
273        r.note_dii(&dii(1, 2, vec![module(1, 4, 0)]));
274        assert!(r.feed_ddb(&ddb(1, 1, 0, 1, &[3, 4])).is_none());
275        let m = r.feed_ddb(&ddb(1, 1, 0, 0, &[1, 2])).expect("complete");
276        assert_eq!(m.data, vec![1, 2, 3, 4]);
277    }
278
279    #[test]
280    fn ddb_before_dii_is_ignored() {
281        let mut r = ModuleReassembler::new();
282        assert!(r.feed_ddb(&ddb(1, 1, 0, 0, &[1, 2])).is_none());
283        // After the DII arrives, the carousel repeat completes it.
284        r.note_dii(&dii(1, 2, vec![module(1, 2, 0)]));
285        assert!(r.feed_ddb(&ddb(1, 1, 0, 0, &[1, 2])).is_some());
286    }
287
288    #[test]
289    fn version_mismatch_ignored_and_new_version_restarts() {
290        let mut r = ModuleReassembler::new();
291        r.note_dii(&dii(1, 2, vec![module(1, 4, 0)]));
292        assert!(r.feed_ddb(&ddb(1, 1, 0, 0, &[1, 2])).is_none());
293        // DDB with a different version is not accepted into the v0 slot.
294        assert!(r.feed_ddb(&ddb(1, 1, 3, 1, &[9, 9])).is_none());
295        // A DII announcing v3 replaces the v0 slot entirely.
296        r.note_dii(&dii(1, 2, vec![module(1, 4, 3)]));
297        assert_eq!(r.pending(), 1);
298        assert!(r.feed_ddb(&ddb(1, 1, 3, 0, &[5, 6])).is_none());
299        let m = r.feed_ddb(&ddb(1, 1, 3, 1, &[7, 8])).expect("complete");
300        assert_eq!(m.key.module_version, 3);
301        assert_eq!(m.data, vec![5, 6, 7, 8]);
302    }
303
304    #[test]
305    fn repeated_dii_keeps_progress() {
306        let mut r = ModuleReassembler::new();
307        let d = dii(1, 2, vec![module(1, 4, 0)]);
308        r.note_dii(&d);
309        assert!(r.feed_ddb(&ddb(1, 1, 0, 0, &[1, 2])).is_none());
310        r.note_dii(&d); // carousel repeat
311        let m = r.feed_ddb(&ddb(1, 1, 0, 1, &[3, 4])).expect("complete");
312        assert_eq!(m.data, vec![1, 2, 3, 4]);
313    }
314
315    #[test]
316    fn duplicate_and_out_of_range_blocks_ignored() {
317        let mut r = ModuleReassembler::new();
318        r.note_dii(&dii(1, 2, vec![module(1, 4, 0)]));
319        assert!(r.feed_ddb(&ddb(1, 1, 0, 0, &[1, 2])).is_none());
320        assert!(r.feed_ddb(&ddb(1, 1, 0, 0, &[1, 2])).is_none()); // dup
321        assert!(r.feed_ddb(&ddb(1, 1, 0, 9, &[9, 9])).is_none()); // range
322        assert_eq!(r.pending(), 1);
323    }
324
325    #[test]
326    fn wrong_block_length_ignored() {
327        let mut r = ModuleReassembler::new();
328        r.note_dii(&dii(1, 4, vec![module(1, 6, 0)]));
329        // Block 0 must be exactly blockSize (4); block 1 exactly 2.
330        assert!(r.feed_ddb(&ddb(1, 1, 0, 0, &[1, 2, 3])).is_none());
331        assert!(r.feed_ddb(&ddb(1, 1, 0, 1, &[5, 6, 7])).is_none());
332        assert_eq!(r.pending(), 1);
333        assert!(r.feed_ddb(&ddb(1, 1, 0, 0, &[1, 2, 3, 4])).is_none());
334        assert!(r.feed_ddb(&ddb(1, 1, 0, 1, &[5, 6])).is_some());
335    }
336
337    #[test]
338    fn oversize_module_skipped() {
339        let mut r = ModuleReassembler::with_max_module_size(8);
340        r.note_dii(&dii(1, 4, vec![module(1, 9, 0), module(2, 8, 0)]));
341        assert_eq!(r.pending(), 1); // only module 2 within the cap
342    }
343
344    #[test]
345    fn zero_block_size_skipped() {
346        let mut r = ModuleReassembler::new();
347        r.note_dii(&dii(1, 0, vec![module(1, 4, 0)]));
348        assert_eq!(r.pending(), 0);
349    }
350
351    /// The aggregate budget bounds rotating-key amplification: announcements
352    /// past `max_total_bytes` are skipped, and completing a module frees its
353    /// budget for later announcements.
354    #[test]
355    fn aggregate_budget_bounds_total_memory() {
356        let mut r = ModuleReassembler::with_limits(8, 10);
357        r.note_dii(&dii(1, 4, vec![module(1, 8, 0)]));
358        assert_eq!(r.pending_bytes(), 8);
359        // A second module would exceed the 10-byte aggregate budget — skipped,
360        // even though it is within the per-module cap.
361        r.note_dii(&dii(2, 4, vec![module(1, 8, 0)]));
362        assert_eq!(r.pending(), 1);
363        assert_eq!(r.pending_bytes(), 8);
364        // Completing the first frees its budget...
365        assert!(r.feed_ddb(&ddb(1, 1, 0, 0, &[0; 4])).is_none());
366        assert!(r.feed_ddb(&ddb(1, 1, 0, 1, &[0; 4])).is_some());
367        assert_eq!(r.pending_bytes(), 0);
368        // ...so the repeat announcement now fits.
369        r.note_dii(&dii(2, 4, vec![module(1, 8, 0)]));
370        assert_eq!(r.pending(), 1);
371        assert_eq!(r.pending_bytes(), 8);
372    }
373
374    /// A version bump releases the old slot's budget before charging the new.
375    #[test]
376    fn version_replacement_releases_budget() {
377        let mut r = ModuleReassembler::with_limits(8, 8);
378        r.note_dii(&dii(1, 4, vec![module(1, 8, 0)]));
379        assert_eq!(r.pending_bytes(), 8);
380        r.note_dii(&dii(1, 4, vec![module(1, 8, 1)]));
381        assert_eq!(r.pending(), 1);
382        assert_eq!(r.pending_bytes(), 8); // replaced, not doubled
383    }
384
385    /// blockSize=1 tracking is a bitset — the structure stays usable and the
386    /// dup-guard still holds at single-byte granularity.
387    #[test]
388    fn block_size_one_uses_bitset() {
389        let mut r = ModuleReassembler::new();
390        r.note_dii(&dii(1, 1, vec![module(1, 130, 0)]));
391        for i in 0..129u16 {
392            assert!(r.feed_ddb(&ddb(1, 1, 0, i, &[i as u8])).is_none());
393            // duplicate of the same block is ignored
394            assert!(r.feed_ddb(&ddb(1, 1, 0, i, &[i as u8])).is_none());
395        }
396        let m = r.feed_ddb(&ddb(1, 1, 0, 129, &[0x81])).expect("complete");
397        assert_eq!(m.data.len(), 130);
398        assert_eq!(m.data[129], 0x81);
399    }
400}