ts_analyzer/reader.rs
1//! A module for reading the transport stream.
2use std::error::Error;
3use std::fs::File;
4use std::io::{BufReader, ErrorKind, Read, Seek, SeekFrom};
5use crate::errors::no_sync_byte_found::NoSyncByteFound;
6use crate::packet::{TSPacket, PACKET_SIZE};
7use crate::packet::header::SYNC_BYTE;
8use crate::packet::payload::TSPayload;
9use crate::helpers::tracked_payload::{self, TrackedPayload};
10
11#[cfg(feature = "log")]
12use log::{info,debug,trace};
13use memmem::{Searcher, TwoWaySearcher};
14
15/// Struct used for holding information related to reading the transport stream.
16pub struct TSReader {
17 /// Filename for the file being read. Only really used for logging.
18 filename: String,
19 /// Buffered reader for the transport stream file.
20 buf_reader: BufReader<File>,
21 /// Sync byte alignment. A Sync byte should be found every `PACKET_SIZE` away.
22 sync_alignment: u64,
23 /// Counter of the number of packets read
24 packets_read: u64,
25 /// PIDs that should be tracked when querying for packets or payloads.
26 ///
27 /// If empty, all PIDs are tracked. This will use more memory as there are more
28 /// incomplete payloads to keep track of.
29 tracked_pids: Vec<u16>,
30 /// Payloads that are currently being tracked by the reader.
31 tracked_payloads: Vec<TrackedPayload>,
32}
33
34impl TSReader {
35
36 /// Create a new TSReader instance using the given file.
37 ///
38 /// This function also finds the first SYNC byte, so we can determine the alignment of the
39 /// transport packets.
40 /// # Parameters
41 /// - `buf_reader`: a buffered reader that contains transport stream data.
42 pub fn new(filename: &str, mut buf_reader: BufReader<File>) -> Result<Self, Box<dyn Error>> {
43 // Find the first sync byte, so we can search easier by doing simple `PACKET_SIZE` buffer
44 // reads.
45 let mut read_buf = [0];
46 let sync_alignment: u64;
47
48 loop {
49 let count = buf_reader.read(&mut read_buf)?;
50
51 // Return a `NoSyncByteFound` error if no SYNC byte could be found in the reader.
52 if count == 0 {
53 return Err(Box::new(NoSyncByteFound));
54 }
55
56 // Run through this loop until we find a sync byte.
57 if read_buf[0] != SYNC_BYTE {
58 continue
59 }
60
61 // Note the location of this SYNC byte for later
62 let sync_pos = buf_reader.stream_position().expect("Couldn't get stream position from BufReader");
63
64 #[cfg(feature = "log")]
65 trace!("SYNC found at position {} for file {}", sync_pos, filename);
66
67 // If we think this is the correct alignment because we have found a SYNC byte we need
68 // to verify that this is correct by seeking 1 `PACKET_SIZE` away and verifying a SYNC
69 // byte is there. If there isn't one there then this is simply the same data as a SYNC
70 // byte by coincidence, and we need to keep looking.
71 //
72 // There is always the possibility that we hit a `0x47` in the payload, seek 1
73 // `PACKET_SIZE` further, and find another `0x47` but I don't have a way of accounting
74 // for that, so we're going with blind hope that this case doesn't get seen.
75 buf_reader.seek_relative(PACKET_SIZE as i64 - 1)?;
76 let count = buf_reader.read(&mut read_buf)?;
77
78 // If we run out of data to read while trying to verify that the SYNC byte is actually a
79 // SYNC byte and isn't part of a payload then we'll simply assume that it really is a
80 // SYNC byte as we have nothing else to go off of.
81 if count == 0 {
82 #[cfg(feature = "log")]
83 debug!("Could not find SYNC byte in file {}", filename);
84 return Err(Box::new(NoSyncByteFound));
85 }
86
87 // Seek back to the original location for later reading.
88 buf_reader.seek(SeekFrom::Start(sync_pos - 1))?;
89
90 // If the byte 1 `PACKET_SIZE` away is also a SYNC byte we can be relatively sure that
91 // this alignment is correct.
92 if read_buf[0] == SYNC_BYTE {
93 sync_alignment = sync_pos;
94 break
95 }
96 }
97
98 Ok(TSReader {
99 filename: filename.to_string(),
100 buf_reader,
101 sync_alignment,
102 packets_read: 0,
103 tracked_pids: Vec::new(),
104 tracked_payloads: Vec::new(),
105 })
106 }
107
108 /// Read the next packet from the transport stream file.
109 ///
110 /// This function returns `None` for any `Err` in order to prevent the need for `.unwrap()`
111 /// calls in more concise code.
112 /// # Returns
113 /// `Some(TSPacket)` if the next transport stream packet could be parsed from the file.
114 /// `None` if the next transport stream packet could not be parsed from the file for any
115 /// reason. This includes if the entire file has been fully read.
116 pub fn next_packet_unchecked(&mut self) -> Option<TSPacket> {
117 self.next_packet().unwrap_or_else(|_| None)
118 }
119
120 /// Read the next packet from the transport stream file.
121 /// # Returns
122 /// `Ok(Some(TSPacket))` if the next transport stream packet could be parsed from the file.
123 /// `Ok(None)` if there was no issue reading the file and no more TS packets can be read.
124 pub fn next_packet(&mut self) -> Result<Option<TSPacket>, Box<dyn Error>> {
125 let mut packet_buf = [0; PACKET_SIZE];
126 loop {
127 match self.buf_reader.read_exact(&mut packet_buf) {
128 Ok(_) => {},
129 Err(e) => {
130 if e.kind() == ErrorKind::UnexpectedEof {
131 #[cfg(feature = "log")]
132 {
133 info!("Finished reading file {}", self.filename);
134 }
135 return Ok(None);
136 }
137
138 return Err(Box::new(e));
139 },
140 }
141
142 #[cfg(feature = "log")]
143 {
144 if let Ok(position) = self.buf_reader.stream_position() {
145 trace!("Seek position in file {}: {}", self.filename, position)
146 }
147 }
148
149 self.packets_read += 1;
150 #[cfg(feature = "log")]
151 trace!("Packets read in file {}: {}", self.filename, self.packets_read);
152
153 let packet = match TSPacket::from_bytes(&mut packet_buf) {
154 Ok(packet) => packet,
155 Err(e) => {
156 #[cfg(feature = "log")]
157 debug!("Got error from {} when trying to parse next packet from bytes {:2X?}",
158 self.filename, packet_buf);
159 return Err(e)
160 },
161 };
162
163 // We should only return a packet if it is in the tracked PIDs (or there are no tracked
164 // PIDs)
165 if ! self.tracked_pids.is_empty() && ! self.tracked_pids.contains(&packet.header().pid()) {
166 continue
167 }
168
169 return Ok(Some(packet));
170 }
171 }
172
173 /// Read the next payload from the transport stream file.
174 ///
175 /// This function returns `None` for any `Err` in order to prevent the need for `.unwrap()`
176 /// calls in more concise code.
177 /// # Returns
178 /// `Some(TSPayload)` if the next transport stream packet could be parsed from the file.
179 /// `None` if the next transport stream payload could not be parsed from the file for any
180 /// reason. This includes if the entire file has been fully read.
181 pub fn next_payload_unchecked(&mut self) -> Option<Box<[u8]>> {
182 self.next_payload().unwrap_or_else(|_| None)
183 }
184
185 /// Read the next full payload from the file.
186 ///
187 /// This function parses through all transport stream packets, stores them in a buffer and
188 /// concatenates their payloads together once a payload has been complete.
189 pub fn next_payload(&mut self) -> Result<Option<Box<[u8]>>, Box<dyn Error>> {
190 loop {
191 let possible_packet = match self.next_packet() {
192 Ok(packet) => packet,
193 Err(e) => return Err(e),
194 };
195
196 let Some(packet) = possible_packet else {
197 return Ok(None);
198 };
199
200 // Add this packet's payload to the tracked payload and retrieve the completed payload
201 // if it exists.
202 let payload = self.add_tracked_payload(&packet);
203 if payload.is_some() {
204 return Ok(payload)
205 }
206 }
207 }
208
209 /// Return the alignment of the SYNC bytes in this reader.
210 pub fn sync_byte_alignment(&self) -> u64 {
211 self.sync_alignment
212 }
213
214 /// Add a PID to the tracking list.
215 ///
216 /// Only tracked PIDs are returned when running methods that gather packets or payloads. If no
217 /// PID is specified then all PIDs are returned.
218 pub fn add_tracked_pid(&mut self, pid: u16) {
219 self.tracked_pids.push(pid);
220 }
221
222 /// Remove this PID from being tracked.
223 ///
224 /// Only tracked PIDs are returned when running methods that gather packets or payloads. If no
225 /// PID is specified then all PIDs are returned.
226 pub fn remove_tracked_pid(&mut self, pid: u16) {
227 self.tracked_pids.retain(|vec_pid| *vec_pid != pid);
228 }
229
230 /// Add payload data from a packet to the tracked payloads list.
231 fn add_tracked_payload(&mut self, packet: &TSPacket) -> Option<Box<[u8]>> {
232 let payload = packet.payload()?;
233
234 // Check to see if we already have an TrackedPayload object for this item PID
235 let pid = packet.header().pid();
236
237 if let Some(index) = self.tracked_payloads.iter().position(|tp| tp.pid() == pid) {
238 let tracked_payload = &mut self.tracked_payloads[index];
239 return tracked_payload.add_and_get_complete(&payload);
240 }
241
242 // We cannot possibly know that a payload is complete from the first packet. In order to
243 // know that a payload is fully contained in 1 packet we need to see the `PUSI` flag set in
244 // the next packet so there is no reason to check if the packet is complete when creating a
245 // new TrackedPayload.
246
247 match TrackedPayload::from_packet(packet) {
248 Ok(tp) => {
249 self.tracked_payloads.push(tp);
250 }
251 Err(_) => {}
252 };
253
254 return None;
255 }
256}