1use crate::pes::PesPacket;
2use crate::ts::payload::{Bytes, Pes};
3use crate::ts::{Pid, ReadTsPacket, TsPayload};
4use crate::{Error, Result};
5use std::collections::HashMap;
6
7pub trait ReadPesPacket {
9 fn read_pes_packet(&mut self) -> Result<Option<PesPacket<Vec<u8>>>>;
13}
14
15#[derive(Debug)]
17pub struct PesPacketReader<R> {
18 ts_packet_reader: R,
19 pes_packets: HashMap<Pid, PartialPesPacket>,
20 eos: bool,
21}
22impl<R: ReadTsPacket> PesPacketReader<R> {
23 pub fn new(ts_packet_reader: R) -> Self {
25 PesPacketReader {
26 ts_packet_reader,
27 pes_packets: HashMap::new(),
28 eos: false,
29 }
30 }
31
32 pub fn ts_packet_reader(&self) -> &R {
34 &self.ts_packet_reader
35 }
36
37 pub fn into_ts_packet_reader(self) -> R {
39 self.ts_packet_reader
40 }
41
42 fn handle_eos(&mut self) -> Result<Option<PesPacket<Vec<u8>>>> {
43 if let Some(key) = self.pes_packets.keys().next().cloned() {
44 let partial = self.pes_packets.remove(&key).expect("Never fails");
45 if partial.data_len.is_some() && partial.data_len != Some(partial.packet.data.len()) {
46 return Err(Error::invalid_input("Unexpected EOS"));
47 }
48 Ok(Some(partial.packet))
49 } else {
50 Ok(None)
51 }
52 }
53
54 fn handle_pes_payload(&mut self, pid: Pid, pes: Pes) -> Result<Option<PesPacket<Vec<u8>>>> {
55 let data_len = if pes.pes_packet_len == 0 {
56 None
57 } else {
58 let optional_header_len = pes.header.optional_header_len();
59 if pes.pes_packet_len < optional_header_len {
60 return Err(Error::invalid_input(format!(
61 "pes.pes_packet_len={}, optional_header_len={}",
62 pes.pes_packet_len, optional_header_len
63 )));
64 }
65 Some((pes.pes_packet_len - optional_header_len) as usize)
66 };
67
68 let mut data = Vec::with_capacity(data_len.unwrap_or(pes.data.len()));
69 data.extend_from_slice(&pes.data);
70
71 let packet = PesPacket {
72 header: pes.header,
73 data,
74 };
75 let partial = PartialPesPacket { packet, data_len };
76 if let Some(pred) = self.pes_packets.insert(pid, partial) {
77 if pred.data_len.is_some() && pred.data_len != Some(pred.packet.data.len()) {
78 return Err(Error::invalid_input(format!(
79 "Mismatched PES packet data length: actual={}, expected={}",
80 pred.packet.data.len(),
81 pred.data_len.expect("Never fails")
82 )));
83 }
84 Ok(Some(pred.packet))
85 } else {
86 Ok(None)
87 }
88 }
89
90 fn handle_raw_payload(&mut self, pid: Pid, data: &Bytes) -> Result<Option<PesPacket<Vec<u8>>>> {
91 let mut partial = self
92 .pes_packets
93 .remove(&pid)
94 .ok_or_else(|| Error::invalid_input("PES packet not found for PID"))?;
95 partial.packet.data.extend_from_slice(data);
96 if Some(partial.packet.data.len()) == partial.data_len {
97 Ok(Some(partial.packet))
98 } else {
99 if let Some(expected) = partial.data_len
100 && partial.packet.data.len() > expected
101 {
102 return Err(Error::invalid_input(format!(
103 "Too large PES packet data: actual={}, expected={}",
104 partial.packet.data.len(),
105 expected
106 )));
107 }
108 self.pes_packets.insert(pid, partial);
109 Ok(None)
110 }
111 }
112}
113impl<R: ReadTsPacket> ReadPesPacket for PesPacketReader<R> {
114 fn read_pes_packet(&mut self) -> Result<Option<PesPacket<Vec<u8>>>> {
115 if self.eos {
116 return self.handle_eos();
117 }
118
119 while let Some(ts_packet) = self.ts_packet_reader.read_ts_packet()? {
120 let pid = ts_packet.header.pid;
121 let result = match ts_packet.payload {
122 Some(TsPayload::Pes(payload)) => self.handle_pes_payload(pid, payload)?,
123 Some(TsPayload::Raw(payload)) => self.handle_raw_payload(pid, &payload)?,
124 _ => None,
125 };
126 if result.is_some() {
127 return Ok(result);
128 }
129 }
130
131 self.eos = true;
132 self.handle_eos()
133 }
134}
135
136#[derive(Debug)]
137struct PartialPesPacket {
138 packet: PesPacket<Vec<u8>>,
139 data_len: Option<usize>,
140}