ts_analyzer/
reader.rs

1//! A module for reading the transport stream.
2use std::error::Error;
3use std::fs::File;
4use std::io::{BufReader, ErrorKind, Read, Seek, SeekFrom};
5use crate::errors::no_sync_byte_found::NoSyncByteFound;
6use crate::packet::{TSPacket, PACKET_SIZE};
7use crate::packet::header::SYNC_BYTE;
8use crate::packet::payload::TSPayload;
9use crate::helpers::tracked_payload::{self, TrackedPayload};
10
11#[cfg(feature = "log")]
12use log::{info,debug,trace};
13use memmem::{Searcher, TwoWaySearcher};
14
15/// Struct used for holding information related to reading the transport stream.
16pub struct TSReader {
17    /// Filename for the file being read. Only really used for logging.
18    filename: String,
19    /// Buffered reader for the transport stream file.
20    buf_reader: BufReader<File>,
21    /// Sync byte alignment. A Sync byte should be found every `PACKET_SIZE` away.
22    sync_alignment: u64,
23    /// Counter of the number of packets read
24    packets_read: u64,
25    /// PIDs that should be tracked when querying for packets or payloads.
26    /// 
27    /// If empty, all PIDs are tracked. This will use more memory as there are more
28    /// incomplete payloads to keep track of.
29    tracked_pids: Vec<u16>,
30    /// Payloads that are currently being tracked by the reader.
31    tracked_payloads: Vec<TrackedPayload>,
32}
33
34impl TSReader {
35
36    /// Create a new TSReader instance using the given file.
37    ///
38    /// This function also finds the first SYNC byte, so we can determine the alignment of the
39    /// transport packets.
40    /// # Parameters
41    /// - `buf_reader`: a buffered reader that contains transport stream data.
42    pub fn new(filename: &str, mut buf_reader: BufReader<File>) -> Result<Self, Box<dyn Error>> {
43        // Find the first sync byte, so we can search easier by doing simple `PACKET_SIZE` buffer
44        // reads.
45        let mut read_buf = [0];
46        let sync_alignment: u64;
47
48        loop {
49            let count = buf_reader.read(&mut read_buf)?;
50
51            // Return a `NoSyncByteFound` error if no SYNC byte could be found in the reader.
52            if count == 0 {
53                return Err(Box::new(NoSyncByteFound));
54            }
55
56            // Run through this loop until we find a sync byte.
57            if read_buf[0] != SYNC_BYTE {
58                continue
59            }
60
61            // Note the location of this SYNC byte for later
62            let sync_pos = buf_reader.stream_position().expect("Couldn't get stream position from BufReader");
63
64            #[cfg(feature = "log")]
65            trace!("SYNC found at position {} for file {}", sync_pos, filename);
66
67            // If we think this is the correct alignment because we have found a SYNC byte we need
68            // to verify that this is correct by seeking 1 `PACKET_SIZE` away and verifying a SYNC
69            // byte is there. If there isn't one there then this is simply the same data as a SYNC
70            // byte by coincidence, and we need to keep looking.
71            //
72            // There is always the possibility that we hit a `0x47` in the payload, seek 1
73            // `PACKET_SIZE` further, and find another `0x47` but I don't have a way of accounting
74            // for that, so we're going with blind hope that this case doesn't get seen.
75            buf_reader.seek_relative(PACKET_SIZE as i64 - 1)?;
76            let count = buf_reader.read(&mut read_buf)?;
77
78            // If we run out of data to read while trying to verify that the SYNC byte is actually a
79            // SYNC byte and isn't part of a payload then we'll simply assume that it really is a
80            // SYNC byte as we have nothing else to go off of.
81            if count == 0 {
82                #[cfg(feature = "log")]
83                debug!("Could not find SYNC byte in file {}", filename);
84                return Err(Box::new(NoSyncByteFound));
85            }
86
87            // Seek back to the original location for later reading.
88            buf_reader.seek(SeekFrom::Start(sync_pos - 1))?;
89
90            // If the byte 1 `PACKET_SIZE` away is also a SYNC byte we can be relatively sure that
91            // this alignment is correct.
92            if read_buf[0] == SYNC_BYTE {
93                sync_alignment = sync_pos;
94                break
95            }
96        }
97
98        Ok(TSReader {
99            filename: filename.to_string(),
100            buf_reader,
101            sync_alignment,
102            packets_read: 0,
103            tracked_pids: Vec::new(),
104            tracked_payloads: Vec::new(),
105        })
106    }
107
108    /// Read the next packet from the transport stream file.
109    ///
110    /// This function returns `None` for any `Err` in order to prevent the need for `.unwrap()`
111    /// calls in more concise code.
112    /// # Returns
113    /// `Some(TSPacket)` if the next transport stream packet could be parsed from the file.
114    /// `None` if the next transport stream packet could not be parsed from the file for any
115    /// reason. This includes if the entire file has been fully read.
116    pub fn next_packet_unchecked(&mut self) -> Option<TSPacket> {
117        self.next_packet().unwrap_or_else(|_| None)
118    }
119
120    /// Read the next packet from the transport stream file.
121    /// # Returns
122    /// `Ok(Some(TSPacket))` if the next transport stream packet could be parsed from the file.
123    /// `Ok(None)` if there was no issue reading the file and no more TS packets can be read.
124    pub fn next_packet(&mut self) -> Result<Option<TSPacket>, Box<dyn Error>> {
125        let mut packet_buf = [0; PACKET_SIZE];
126        loop {
127            match self.buf_reader.read_exact(&mut packet_buf) {
128                Ok(_) => {},
129                Err(e) => {
130                    if e.kind() == ErrorKind::UnexpectedEof {
131                        #[cfg(feature = "log")]
132                        {
133                            info!("Finished reading file {}", self.filename);
134                        }
135                        return Ok(None);
136                    }
137
138                    return Err(Box::new(e));
139                },
140            }
141
142            #[cfg(feature = "log")]
143            {
144                if let Ok(position) = self.buf_reader.stream_position() {
145                    trace!("Seek position in file {}: {}", self.filename, position)
146                }
147            }
148
149            self.packets_read += 1;
150            #[cfg(feature = "log")]
151            trace!("Packets read in file {}: {}", self.filename, self.packets_read);
152
153            let packet = match TSPacket::from_bytes(&mut packet_buf) {
154                Ok(packet) => packet,
155                Err(e) => {
156                    #[cfg(feature = "log")]
157                    debug!("Got error from {} when trying to parse next packet from bytes {:2X?}",
158                        self.filename, packet_buf);
159                    return Err(e)
160                },
161            };
162
163            // We should only return a packet if it is in the tracked PIDs (or there are no tracked
164            // PIDs)
165            if ! self.tracked_pids.is_empty() && ! self.tracked_pids.contains(&packet.header().pid()) {
166                continue
167            }
168
169            return Ok(Some(packet));
170        }
171    }
172
173    /// Read the next payload from the transport stream file.
174    ///
175    /// This function returns `None` for any `Err` in order to prevent the need for `.unwrap()`
176    /// calls in more concise code.
177    /// # Returns
178    /// `Some(TSPayload)` if the next transport stream packet could be parsed from the file.
179    /// `None` if the next transport stream payload could not be parsed from the file for any
180    /// reason. This includes if the entire file has been fully read.
181    pub fn next_payload_unchecked(&mut self) -> Option<Box<[u8]>> {
182        self.next_payload().unwrap_or_else(|_| None)
183    }
184
185    /// Read the next full payload from the file.
186    ///
187    /// This function parses through all transport stream packets, stores them in a buffer and
188    /// concatenates their payloads together once a payload has been complete.
189    pub fn next_payload(&mut self) -> Result<Option<Box<[u8]>>, Box<dyn Error>> {
190        loop {
191            let possible_packet = match self.next_packet() {
192                Ok(packet) => packet,
193                Err(e) => return Err(e),
194            };
195            
196            let Some(packet) = possible_packet else {
197                return Ok(None);
198            };
199
200            // Add this packet's payload to the tracked payload and retrieve the completed payload
201            // if it exists.
202            let payload = self.add_tracked_payload(&packet);
203            if payload.is_some() {
204                return Ok(payload)
205            }
206        }
207    }
208
209    /// Return the alignment of the SYNC bytes in this reader.
210    pub fn sync_byte_alignment(&self) -> u64 {
211        self.sync_alignment
212    }
213
214    /// Add a PID to the tracking list.
215    ///
216    /// Only tracked PIDs are returned when running methods that gather packets or payloads. If no
217    /// PID is specified then all PIDs are returned.
218    pub fn add_tracked_pid(&mut self, pid: u16) {
219        self.tracked_pids.push(pid);
220    }
221
222    /// Remove this PID from being tracked.
223    ///
224    /// Only tracked PIDs are returned when running methods that gather packets or payloads. If no
225    /// PID is specified then all PIDs are returned.
226    pub fn remove_tracked_pid(&mut self, pid: u16) {
227        self.tracked_pids.retain(|vec_pid| *vec_pid != pid);
228    }
229
230    /// Add payload data from a packet to the tracked payloads list.
231    fn add_tracked_payload(&mut self, packet: &TSPacket) -> Option<Box<[u8]>> {
232        let payload = packet.payload()?;
233
234        // Check to see if we already have an TrackedPayload object for this item PID
235        let pid = packet.header().pid();
236        
237        if let Some(index) = self.tracked_payloads.iter().position(|tp| tp.pid() == pid) {
238            let tracked_payload = &mut self.tracked_payloads[index];
239            return tracked_payload.add_and_get_complete(&payload);
240        }
241
242        // We cannot possibly know that a payload is complete from the first packet. In order to
243        // know that a payload is fully contained in 1 packet we need to see the `PUSI` flag set in
244        // the next packet so there is no reason to check if the packet is complete when creating a
245        // new TrackedPayload.
246
247        match TrackedPayload::from_packet(packet) {
248            Ok(tp) => {
249                self.tracked_payloads.push(tp);
250            }
251            Err(_) => {}
252        };
253
254        return None;
255    }
256}