mpeg2ts/pes/
reader.rs

1use crate::pes::PesPacket;
2use crate::ts::payload::{Bytes, Pes};
3use crate::ts::{Pid, ReadTsPacket, TsPayload};
4use crate::{Error, Result};
5use std::collections::HashMap;
6
7/// The `ReadPesPacket` trait allows for reading PES packets from a source.
8pub trait ReadPesPacket {
9    /// Reads a PES packet.
10    ///
11    /// If the end of the stream is reached, it will return `Ok(None)`.
12    fn read_pes_packet(&mut self) -> Result<Option<PesPacket<Vec<u8>>>>;
13}
14
15/// PES packet reader.
16#[derive(Debug)]
17pub struct PesPacketReader<R> {
18    ts_packet_reader: R,
19    pes_packets: HashMap<Pid, PartialPesPacket>,
20    eos: bool,
21}
22impl<R: ReadTsPacket> PesPacketReader<R> {
23    /// Makes a new `PesPacketReader` instance.
24    pub fn new(ts_packet_reader: R) -> Self {
25        PesPacketReader {
26            ts_packet_reader,
27            pes_packets: HashMap::new(),
28            eos: false,
29        }
30    }
31
32    /// Returns a reference to the underlaying TS packet reader.
33    pub fn ts_packet_reader(&self) -> &R {
34        &self.ts_packet_reader
35    }
36
37    /// Converts `PesPacketReader` into the underlaying TS packet reader.
38    pub fn into_ts_packet_reader(self) -> R {
39        self.ts_packet_reader
40    }
41
42    fn handle_eos(&mut self) -> Result<Option<PesPacket<Vec<u8>>>> {
43        if let Some(key) = self.pes_packets.keys().next().cloned() {
44            let partial = self.pes_packets.remove(&key).expect("Never fails");
45            if partial.data_len.is_some() && partial.data_len != Some(partial.packet.data.len()) {
46                return Err(Error::invalid_input("Unexpected EOS"));
47            }
48            Ok(Some(partial.packet))
49        } else {
50            Ok(None)
51        }
52    }
53
54    fn handle_pes_payload(&mut self, pid: Pid, pes: Pes) -> Result<Option<PesPacket<Vec<u8>>>> {
55        let data_len = if pes.pes_packet_len == 0 {
56            None
57        } else {
58            let optional_header_len = pes.header.optional_header_len();
59            if pes.pes_packet_len < optional_header_len {
60                return Err(Error::invalid_input(format!(
61                    "pes.pes_packet_len={}, optional_header_len={}",
62                    pes.pes_packet_len, optional_header_len
63                )));
64            }
65            Some((pes.pes_packet_len - optional_header_len) as usize)
66        };
67
68        let mut data = Vec::with_capacity(data_len.unwrap_or(pes.data.len()));
69        data.extend_from_slice(&pes.data);
70
71        let packet = PesPacket {
72            header: pes.header,
73            data,
74        };
75        let partial = PartialPesPacket { packet, data_len };
76        if let Some(pred) = self.pes_packets.insert(pid, partial) {
77            if pred.data_len.is_some() && pred.data_len != Some(pred.packet.data.len()) {
78                return Err(Error::invalid_input(format!(
79                    "Mismatched PES packet data length: actual={}, expected={}",
80                    pred.packet.data.len(),
81                    pred.data_len.expect("Never fails")
82                )));
83            }
84            Ok(Some(pred.packet))
85        } else {
86            Ok(None)
87        }
88    }
89
90    fn handle_raw_payload(&mut self, pid: Pid, data: &Bytes) -> Result<Option<PesPacket<Vec<u8>>>> {
91        let mut partial = self
92            .pes_packets
93            .remove(&pid)
94            .ok_or_else(|| Error::invalid_input("PES packet not found for PID"))?;
95        partial.packet.data.extend_from_slice(data);
96        if Some(partial.packet.data.len()) == partial.data_len {
97            Ok(Some(partial.packet))
98        } else {
99            if let Some(expected) = partial.data_len
100                && partial.packet.data.len() > expected
101            {
102                return Err(Error::invalid_input(format!(
103                    "Too large PES packet data: actual={}, expected={}",
104                    partial.packet.data.len(),
105                    expected
106                )));
107            }
108            self.pes_packets.insert(pid, partial);
109            Ok(None)
110        }
111    }
112}
113impl<R: ReadTsPacket> ReadPesPacket for PesPacketReader<R> {
114    fn read_pes_packet(&mut self) -> Result<Option<PesPacket<Vec<u8>>>> {
115        if self.eos {
116            return self.handle_eos();
117        }
118
119        while let Some(ts_packet) = self.ts_packet_reader.read_ts_packet()? {
120            let pid = ts_packet.header.pid;
121            let result = match ts_packet.payload {
122                Some(TsPayload::Pes(payload)) => self.handle_pes_payload(pid, payload)?,
123                Some(TsPayload::Raw(payload)) => self.handle_raw_payload(pid, &payload)?,
124                _ => None,
125            };
126            if result.is_some() {
127                return Ok(result);
128            }
129        }
130
131        self.eos = true;
132        self.handle_eos()
133    }
134}
135
136#[derive(Debug)]
137struct PartialPesPacket {
138    packet: PesPacket<Vec<u8>>,
139    data_len: Option<usize>,
140}