dvb_gse/
gsepacket.rs

1//! DVB GSE (Generic Stream Encapsulation) Packet.
2//!
3//! The GSE Packet is a variable-length packet formed by a header and a data
4//! field. It contains a full PDU or a fragment of a PDU. One or more GSE Packets are carried in the data field of a BBFRAME. See Section 4.2 in
5//! [TS 102
6//! 606-1](https://www.etsi.org/deliver/etsi_ts/102600_102699/10260601/01.02.01_60/ts_10260601v010201p.pdf).
7
8use super::bbframe::BBFrame;
9use super::bbheader::BBHeader;
10use super::gseheader::{GSEHeader, Label};
11use bytes::{Bytes, BytesMut};
12use crc::Digest;
13use std::collections::HashMap;
14use thiserror::Error;
15
16/// GSE Packet.
17///
18/// This struct contains a GSE Packet.
19#[derive(Debug, Clone, Eq, PartialEq, Hash)]
20pub struct GSEPacket {
21    header: GSEHeader,
22    data: Bytes,
23}
24
25/// GSE protocol error.
26#[derive(Error, Debug, Copy, Clone, Eq, PartialEq, Hash)]
27pub enum GSEError {
28    /// The BBFRAME is shorter than the BBHEADER length.
29    #[error("the BBFRAME is shorter than the BBHEADER length")]
30    BBFrameShort,
31    /// The SYNCD field of the GSE-HEM BBFRAME is not a multiple of 8 bits.
32    #[error("the SYNCD field of the GSE-HEM BBFRAME is not a multiple of 8 bits")]
33    SyncdNotMultiple,
34    /// The SYNCD field of the GSE-HEM BBFRAME points beyond the end of the BBFRAME.
35    #[error("The SYNCD field of the GSE-HEM BBFRAME points beyond the end of the BBFRAME")]
36    SyncdTooLarge,
37}
38
39lazy_static::lazy_static! {
40    static ref CRC32: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_MPEG_2);
41}
42
43impl GSEPacket {
44    /// Creates a new GSE Packet by parsing the data at the beginning of a
45    /// [`Bytes`].
46    ///
47    /// The GSE Header at the beginning of `bytes` is parsed and used to
48    /// determine the length of the GSE Packet. On success,the GSE Packet is
49    /// returned.
50    ///
51    /// This function returns `None` if the GSE Header cannot be parsed or if
52    /// the `bytes` does not fully contain the GSE Packet.
53    pub fn from_bytes(bytes: &Bytes, re_used_label: Option<&Label>) -> Option<GSEPacket> {
54        Self::try_from_bytes(bytes, re_used_label, true)
55    }
56
57    fn try_from_bytes(
58        bytes: &Bytes,
59        re_used_label: Option<&Label>,
60        not_contained_is_error: bool,
61    ) -> Option<GSEPacket> {
62        let header = GSEHeader::from_slice(bytes, re_used_label)?;
63        let header_len = header.len();
64        let total_len = 2 + usize::from(header.gse_length());
65        if total_len > bytes.len() {
66            if not_contained_is_error {
67                log::error!("GSE Packet not fully contained inside bytes");
68            }
69            return None;
70        }
71        if total_len < header_len {
72            log::error!("GSE Packet total length is smaller than header length");
73            return None;
74        }
75        let data = bytes.slice(header_len..total_len);
76        Some(GSEPacket { header, data })
77    }
78
79    /// Splits a [`Bytes`] into GSE Packets.
80    ///
81    /// This function returns an iterator that returns the GSE Packets contained
82    /// in the `Bytes`. The first GSE Packet should start at the beginning of
83    /// the `Bytes`. The iterator stops when the end of the `Bytes` is reached,
84    /// or when GSE padding or a malformed GSE Packet is found.
85    pub fn split_bytes(bytes: &Bytes) -> impl Iterator<Item = GSEPacket> {
86        Self::try_split_bytes(bytes, true)
87    }
88
89    fn try_split_bytes(
90        bytes: &Bytes,
91        not_contained_is_error: bool,
92    ) -> impl Iterator<Item = GSEPacket> {
93        let mut remain = bytes.slice(..);
94        let mut label = None;
95        std::iter::from_fn(move || {
96            if let Some(packet) =
97                GSEPacket::try_from_bytes(&remain, label.as_ref(), not_contained_is_error)
98            {
99                log::debug!("extracted GSE Packet with header {}", packet.header());
100                log::trace!(
101                    "GSE Packet data field {}",
102                    faster_hex::hex_string(packet.data())
103                );
104                remain = remain.slice(packet.len()..);
105                if let Some(l) = packet.header.label() {
106                    label = Some(l.clone());
107                }
108                Some(packet)
109            } else {
110                log::debug!("no more GSE Packets in BBFRAME");
111                None
112            }
113        })
114    }
115
116    /// Splits a non-HEM BBFRAME into GSE Packets.
117    ///
118    /// This function returns an iterator that returns the GSE Packets contained
119    /// in the BBFRAME. The iterator stops when the end of the BBFRAME is
120    /// reached, or when GSE padding or a malformed GSE Packet is found.
121    ///
122    /// The function returns an error if the BBFRAME is malformed. For instance,
123    /// if the BBFRAME length is shorter than the BBHEADER length.
124    pub fn split_bbframe(bbframe: &BBFrame) -> Result<impl Iterator<Item = GSEPacket>, GSEError> {
125        if bbframe.len() < BBHeader::LEN {
126            return Err(GSEError::BBFrameShort);
127        }
128        Ok(GSEPacket::split_bytes(&bbframe.slice(BBHeader::LEN..)))
129    }
130
131    /// Gives the length of the GSE Packet in bytes.
132    pub fn len(&self) -> usize {
133        self.header.len() + self.data.len()
134    }
135
136    /// Returns `true` if the GSE Packet has a length of zero bytes.
137    ///
138    /// This always returns `false`, since a GSE Header never has a length of
139    /// zero bytes. This function exists because objects that implement a `len`
140    /// method should also implement an `is_empty` method.
141    pub fn is_empty(&self) -> bool {
142        false
143    }
144
145    /// Gives access to the header of the GSE Packet.
146    pub fn header(&self) -> &GSEHeader {
147        &self.header
148    }
149
150    /// Gives access to the data field of the GSE Packet.
151    pub fn data(&self) -> &Bytes {
152        &self.data
153    }
154}
155
156/// GSE Packet defragmenter.
157///
158/// This structure performs defragmentation of GSE Packets in order to obtain
159/// full [`PDU`]s.
160#[derive(Debug)]
161pub struct GSEPacketDefrag {
162    defragger: Defragger,
163    // used to store the leftover partial packet at the end of a GSE-HEM BBFRAME
164    hem_leftover: BytesMut,
165    // used for label re-use of the leftover partial packet in GSE-HEM
166    hem_last_label: Option<Label>,
167}
168
169// This intermediate struct is introduce only to avoid borrowing the whole
170// GSEPacketDefrag when calling defrag().
171#[derive(Debug)]
172struct Defragger {
173    defrags: HashMap<u8, Defrag>,
174    skip_total_length_check: bool,
175}
176
177struct Defrag {
178    total_length: usize,
179    protocol_type: u16,
180    label: Label,
181    current_length: usize,
182    fragments: Vec<Bytes>,
183    digest: Digest<'static, u32>,
184    // Used because some modulators do not set the Total Length field
185    // correctly. See https://github.com/daniestevez/dvb-gse/issues/11
186    skip_total_length_check: bool,
187}
188
189/// PDU.
190///
191/// This structure represents a PDU. It carriers the PDU data, and its
192/// corresponding metadata (the protocol type and the label).
193#[derive(Debug, Clone, Eq, PartialEq, Hash)]
194pub struct PDU {
195    data: Bytes,
196    protocol_type: u16,
197    label: Label,
198}
199
200impl PDU {
201    fn from_single_fragment(packet: &GSEPacket) -> Option<PDU> {
202        Some(PDU {
203            protocol_type: packet.header().protocol_type()?,
204            label: packet.header().label()?.clone(),
205            data: packet.data().clone(),
206        })
207    }
208
209    /// Gives access to the data of the PDU.
210    pub fn data(&self) -> &Bytes {
211        &self.data
212    }
213
214    /// Returns the protocol type of the PDU.
215    pub fn protocol_type(&self) -> u16 {
216        self.protocol_type
217    }
218
219    /// Gives access to the label of the PDU.
220    pub fn label(&self) -> &Label {
221        &self.label
222    }
223}
224
225// This is needed because GSEPacketDefrag::defrag can return an iterator of
226// either one of two types, depending on whether the BBFRAME is GSE-HEM or not.
227enum EitherIter<AIterType, BIterType> {
228    A(AIterType),
229    B(BIterType),
230}
231
232impl<AIterType, BIterType> Iterator for EitherIter<AIterType, BIterType>
233where
234    AIterType: Iterator,
235    BIterType: Iterator<Item = AIterType::Item>,
236{
237    type Item = AIterType::Item;
238    fn next(&mut self) -> Option<<Self as Iterator>::Item> {
239        match self {
240            EitherIter::A(it) => it.next(),
241            EitherIter::B(it) => it.next(),
242        }
243    }
244}
245
246impl GSEPacketDefrag {
247    /// Creates a new GSE Packet defragmenter.
248    pub fn new() -> GSEPacketDefrag {
249        GSEPacketDefrag {
250            defragger: Defragger {
251                defrags: HashMap::new(),
252                skip_total_length_check: false,
253            },
254            hem_leftover: BytesMut::new(),
255            hem_last_label: None,
256        }
257    }
258
259    /// Enables or disables the check of the total length field.
260    ///
261    /// By default, `GSEPacketDefrag` checks that the length of the defragmented
262    /// data matches the value specified in the total length check
263    /// field. However,
264    /// [some modulators set this value incorrectly](https://github.com/daniestevez/dvb-gse/issues/11).
265    /// This function can be used to skip the check of the total length
266    /// field.
267    pub fn set_skip_total_length_check(&mut self, value: bool) {
268        self.defragger.skip_total_length_check = value;
269    }
270
271    /// Defragment a BBFRAME.
272    ///
273    /// This function returns an iterator that produces all the PDUs that can be
274    /// completed with the GSE Packets found in the BBFRAME.
275    ///
276    /// The function returns an error if the BBFRAME is malformed. For instance,
277    /// if the BBFRAME length is shorter than the BBHEADER length.
278    pub fn defragment<'a>(
279        &'a mut self,
280        bbframe: &'a BBFrame,
281    ) -> Result<impl Iterator<Item = PDU> + 'a, GSEError> {
282        if bbframe.len() < BBHeader::LEN {
283            return Err(GSEError::BBFrameShort);
284        }
285        let bbheader = bbframe[..BBHeader::LEN].try_into().unwrap();
286        let bbheader = BBHeader::new(&bbheader);
287        if bbheader.is_gse_hem() {
288            let syncd_bits = bbheader.syncd();
289            if syncd_bits % 8 != 0 {
290                return Err(GSEError::SyncdNotMultiple);
291            }
292            let syncd_bytes = usize::from(syncd_bits / 8);
293            let remaining_start = BBHeader::LEN + syncd_bytes;
294            if remaining_start >= bbframe.len() {
295                return Err(GSEError::SyncdTooLarge);
296            }
297            let first_packet = match (self.hem_leftover.is_empty(), syncd_bytes == 0) {
298                (true, false) => {
299                    log::warn!(
300                        "GSE-HEM SYNCD is not zero but we have no leftovers from previous BBFRAME"
301                    );
302                    None
303                }
304                (false, true) => {
305                    log::warn!(
306                        "GSE-HEM SYNCD is zero but we have leftovers from previous BBFRAME; \
307                                 dropping leftovers"
308                    );
309                    self.hem_leftover.truncate(0);
310                    None
311                }
312                (true, true) => None,
313                (false, false) => {
314                    self.hem_leftover
315                        .extend_from_slice(&bbframe[BBHeader::LEN..remaining_start]);
316                    let concat = self.hem_leftover.split_off(0).freeze();
317                    let hem_last_label = self.hem_last_label.clone();
318                    GSEPacket::from_bytes(&concat, hem_last_label.as_ref()).and_then(|packet| {
319                        if packet.len() == concat.len() {
320                            Some(packet)
321                        } else {
322                            log::warn!("GSE packet recovered from GSE-HEM leftovers does not match leftovers length; \
323                                        dropping packet");
324                            None
325                        }
326                    })
327                }
328            };
329            let remaining = bbframe.slice(remaining_start..);
330            let remaining_packets = GSEPacket::try_split_bytes(&remaining, false);
331            // use iterator tricks to hook up closures that count the length of
332            // GSE packets and update self.hem_leftover accordingly
333            let remaining_packets = remaining_packets
334                .scan(remaining_start, |end, packet| {
335                    *end += packet.len();
336                    if let Some(l) = packet.header.label() {
337                        self.hem_last_label = Some(l.clone());
338                    }
339                    Some(Some((*end, packet)))
340                })
341                .chain(std::iter::once(None))
342                .scan(remaining_start, |prev_end, packet| {
343                    if let Some((end, packet)) = packet {
344                        *prev_end = end;
345                        Some(packet)
346                    } else {
347                        assert!(self.hem_leftover.is_empty());
348                        self.hem_leftover.extend_from_slice(&bbframe[*prev_end..]);
349                        None
350                    }
351                });
352            Ok(EitherIter::A(
353                first_packet
354                    .into_iter()
355                    .chain(remaining_packets)
356                    .flat_map(|packet| self.defragger.defrag_packet(&packet)),
357            ))
358        } else {
359            if !self.hem_leftover.is_empty() {
360                log::warn!(
361                    "defragmenting non-HEM BBFRAME, but have leftovers from previous HEM BBFRAME; \
362                            dropping leftovers"
363                );
364                self.hem_leftover.truncate(0);
365            }
366            Ok(EitherIter::B(
367                GSEPacket::split_bbframe(bbframe)?
368                    .flat_map(|packet| self.defragger.defrag_packet(&packet)),
369            ))
370        }
371    }
372}
373
374impl Defragger {
375    fn defrag_packet(&mut self, packet: &GSEPacket) -> Option<PDU> {
376        if packet.header().is_single_fragment() {
377            log::debug!("defragmented GSE Packet as a single fragment");
378            return Some(PDU::from_single_fragment(packet).unwrap());
379        }
380        let frag_id = packet.header().fragment_id().unwrap();
381        if packet.header().start() {
382            log::debug!("start of GSE fragment ID = {}", frag_id);
383            let mut defrag = Defrag::new(packet.header()).unwrap();
384            defrag.set_skip_total_length_check(self.skip_total_length_check);
385            defrag.push(packet);
386            self.defrags.insert(frag_id, defrag);
387        } else if let Some(defrag) = self.defrags.get_mut(&frag_id) {
388            log::debug!("pushing non-start GSE fragment ID = {}", frag_id);
389            defrag.push(packet);
390        }
391        if packet.header.end() {
392            if let Some(defrag) = self.defrags.remove(&frag_id) {
393                log::debug!("end of GSE fragment ID = {}", frag_id);
394                return defrag.reconstruct(frag_id);
395            }
396        }
397        None
398    }
399}
400
401impl Defrag {
402    fn new(header: &GSEHeader) -> Option<Defrag> {
403        Some(Defrag {
404            total_length: usize::from(header.total_length()?),
405            protocol_type: header.protocol_type()?,
406            label: header.label()?.clone(),
407            current_length: 0,
408            fragments: Vec::new(),
409            digest: CRC32.digest(),
410            skip_total_length_check: false,
411        })
412    }
413
414    fn set_skip_total_length_check(&mut self, value: bool) {
415        self.skip_total_length_check = value;
416    }
417
418    fn push(&mut self, packet: &GSEPacket) {
419        self.fragments.push(packet.data().clone());
420        if let Some(total_length) = packet.header().total_length() {
421            self.digest.update(&total_length.to_be_bytes());
422        }
423        if let Some(protocol_type) = packet.header().protocol_type() {
424            self.digest.update(&protocol_type.to_be_bytes());
425            self.current_length += std::mem::size_of::<u16>();
426        }
427        if let Some(label) = packet.header().label() {
428            self.digest.update(label.as_slice());
429            self.current_length += label.len();
430        }
431        if packet.header.end() {
432            let data = packet.data();
433            let crc_size = std::mem::size_of::<u32>();
434            if data.len() >= crc_size {
435                self.digest
436                    .update(&packet.data()[..packet.data().len() - crc_size]);
437                self.current_length += packet.data().len() - crc_size;
438            } else {
439                log::error!(
440                    "data size of last GSE fragment is {} bytes, \
441			     which is less than the CRC-32 length",
442                    data.len()
443                );
444            }
445        } else {
446            self.digest.update(packet.data());
447            self.current_length += packet.data().len();
448        }
449    }
450
451    fn reconstruct(self, frag_id: u8) -> Option<PDU> {
452        if !self.skip_total_length_check && self.total_length != self.current_length {
453            log::debug!(
454                "defragmented length {} does not match total length {}",
455                self.current_length,
456                self.total_length
457            );
458            return None;
459        }
460        let data = self.fragments.iter().flatten().copied().collect::<Bytes>();
461        let crc_size = std::mem::size_of::<u32>();
462        if data.len() < crc_size {
463            log::error!("defragmented data is shorter than CRC-32 size");
464            return None;
465        }
466        let crc_calc = self.digest.finalize();
467        let crc_data = u32::from_be_bytes(data[data.len() - crc_size..].try_into().unwrap());
468        if crc_calc != crc_data {
469            log::debug!("invalid CRC-32 for fragment ID = {}", frag_id);
470            return None;
471        }
472        log::debug!("valid CRC-32 for fragment ID = {}", frag_id);
473        Some(PDU {
474            data: data.slice(..data.len() - crc_size),
475            protocol_type: self.protocol_type,
476            label: self.label,
477        })
478    }
479}
480
481impl std::fmt::Debug for Defrag {
482    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
483        f.debug_struct("Defrag")
484            .field("total_length", &self.total_length)
485            .field("fragments", &self.fragments)
486            .finish()
487    }
488}
489
490impl Default for GSEPacketDefrag {
491    fn default() -> GSEPacketDefrag {
492        GSEPacketDefrag::new()
493    }
494}
495
496#[cfg(test)]
497mod test {
498    use super::*;
499    use hex_literal::hex;
500
501    const SINGLE_PACKET: [u8; 104] = hex!(
502        "72 00 00 00 02 f0 00 00 00 15 c0 5c 08 00 02 00
503         48 55 4c 4b 45 00 00 54 6f aa 40 00 40 01 72 fc
504         2c 00 00 01 2c 00 00 02 08 00 4e 94 00 3b 00 04
505         19 7d 6b 63 00 00 00 00 5d 79 08 00 00 00 00 00
506         10 11 12 13 14 15 16 17 18 19 1a 1b 1c 1d 1e 1f
507         20 21 22 23 24 25 26 27 28 29 2a 2b 2c 2d 2e 2f
508         30 31 32 33 34 35 36 37"
509    );
510
511    #[test]
512    fn defrag_single_packet() {
513        let bbframe = Bytes::copy_from_slice(&SINGLE_PACKET);
514        let mut defrag = GSEPacketDefrag::new();
515        let pdus: Vec<_> = defrag.defragment(&bbframe).unwrap().collect();
516        assert_eq!(pdus.len(), 1);
517        let pdu = &pdus[0];
518        assert_eq!(&pdu.data()[..], &SINGLE_PACKET[20..]);
519        assert_eq!(pdu.protocol_type(), 0x0800);
520        assert_eq!(pdu.label().as_slice(), hex!("02 00 48 55 4c 4b"));
521    }
522
523    #[test]
524    fn test_hem_defrag_multiple() {
525        // Create some BBFRAMEs containing GSE packets of the same size
526        let dfl_bytes = 400;
527        let packet_size_bytes = 75;
528        let num_packets = 100;
529        // To be filled with SYNCD and CRC-8 ^ MODE
530        let bbheader_template = hex!("ba 00 00 00 0c 80 00 00 00 00");
531        let bbheader = BBHeader::new(&bbheader_template);
532        assert_eq!(usize::from(bbheader.dfl()), dfl_bytes * 8);
533        let packets = (0..num_packets)
534            .map(|n| {
535                // 2 bytes for protocol type (broadcast label for simplicity)
536                let gse_length = packet_size_bytes + 2;
537                let mut packet = Vec::with_capacity(gse_length + 2);
538                packet.push(0xe0);
539                packet.push(u8::try_from(gse_length).unwrap());
540                // dummy protocol type
541                packet.push(0x12);
542                packet.push(0x34);
543                for j in 0..packet_size_bytes {
544                    packet.push((j + n) as u8);
545                }
546                packet
547            })
548            .collect::<Vec<Vec<u8>>>();
549        let mut bbframes = Vec::new();
550        let mut bbframe = BytesMut::new();
551        let mut remain = BytesMut::new();
552        let mut packets_total = 0;
553        let mut packets_in_bbframe = 0;
554        for packet in &packets {
555            if bbframe.is_empty() {
556                let syncd = remain.len() * 8;
557                let mut bbheader = bbheader_template;
558                bbheader[7] = ((syncd >> 8) & 0xff) as u8;
559                bbheader[8] = (syncd & 0xff) as u8;
560                let crc = BBHeader::new(&bbheader).compute_crc8();
561                bbheader[9] = crc;
562                assert!(BBHeader::new(&bbheader).crc_is_valid());
563                bbframe.extend_from_slice(&bbheader);
564                bbframe.extend_from_slice(&remain);
565                packets_in_bbframe = if remain.is_empty() { 0 } else { 1 };
566                remain.truncate(0);
567            }
568            let to_take = (dfl_bytes - (bbframe.len() - BBHeader::LEN)).min(packet.len());
569            bbframe.extend_from_slice(&packet[..to_take]);
570            if to_take < packet.len() {
571                bbframes.push(bbframe.split_off(0).freeze());
572                packets_total += packets_in_bbframe;
573                assert!(remain.is_empty());
574                remain.extend_from_slice(&packet[to_take..]);
575            } else {
576                packets_in_bbframe += 1;
577            }
578        }
579        // Sanity check that the above has generated a reasonable amount of data
580        assert!(packets_total > 75);
581        assert!(bbframes.len() > 10);
582
583        // Defragment the BBFRAMEs
584        let mut defrag = GSEPacketDefrag::new();
585        let mut pdus = Vec::with_capacity(packets_total);
586        for bbframe in &bbframes {
587            for packet in defrag.defragment(bbframe).unwrap() {
588                pdus.push(packet);
589            }
590        }
591        assert_eq!(pdus.len(), packets_total);
592        for (n, pdu) in pdus.iter().enumerate() {
593            let expected = (0..packet_size_bytes)
594                .map(|j| (j + n) as u8)
595                .collect::<Vec<u8>>();
596            assert_eq!(pdu.data(), &expected);
597            assert_eq!(pdu.protocol_type(), 0x1234);
598        }
599    }
600}
601
602#[cfg(test)]
603mod proptests {
604    use super::*;
605    use proptest::prelude::*;
606
607    prop_compose! {
608        fn garbage()
609            (g in proptest::collection::vec(
610                proptest::collection::vec(any::<u8>(), 0..10000), 0..100))
611             -> Vec<BBFrame> {
612                g.into_iter().map(|v| Bytes::copy_from_slice(&v)).collect::<Vec<BBFrame>>()
613            }
614    }
615
616    proptest! {
617        #[test]
618        fn defrag_garbage(garbage_bbframes in garbage()) {
619            let mut defrag = GSEPacketDefrag::new();
620            for bbframe in &garbage_bbframes {
621                if let Ok(pdus) = defrag.defragment(bbframe) {
622                    for pdu in pdus {
623                        pdu.data();
624                        pdu.protocol_type();
625                        pdu.label();
626                    }
627                }
628            }
629        }
630    }
631}