Skip to main content

m2ts_packet/
ts_packet.rs

1use crate::*;
2use tokio_util::codec::Decoder;
3
4pub struct TsPacket {
5    pub header: TransportStreamHeader,
6    pub adaptation_field: Option<AdaptationField>,
7    pub payload: Bytes,
8}
9
10impl std::fmt::Debug for TsPacket {
11    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
12        f.debug_struct("TsPacket")
13            .field("header", &self.header)
14            .field("adaptation_field", &self.adaptation_field)
15            .field("payload_len", &self.payload.len())
16            .finish()
17    }
18}
19
20impl TsPacket {
21    pub const PACKET_SIZE: usize = 188;
22    pub fn from_bytes(data: Bytes) -> Option<Self> {
23        if data.len() < Self::PACKET_SIZE {
24            return None;
25        }
26        let header =
27            TransportStreamHeader::from_bits(u32::from_be_bytes(data.get(0..4)?.try_into().ok()?));
28        if header.sync_byte() != 0x47 {
29            return None;
30        }
31        let mut adaption_field = None;
32        let mut index = 4;
33        if header.adaptation_field() {
34            let adaption_field_length = *data.get(index)? as usize;
35            index += 1;
36            if index + adaption_field_length > data.len() {
37                return None;
38            }
39            if adaption_field_length > 0 {
40                let field_data = data.slice(index..index + adaption_field_length);
41                index += adaption_field_length;
42                adaption_field = Some(AdaptationField::from_bytes(field_data)?);
43            }
44        }
45        Some(Self {
46            header,
47            adaptation_field: adaption_field,
48            payload: data.slice(index..Self::PACKET_SIZE),
49        })
50    }
51}
52
53/// Decoder that reads 188-byte MPEG-TS packets from a byte stream.
54pub struct TsPacketDecoder {
55    pub stream_position: u64,
56}
57
58impl TsPacketDecoder {
59    pub fn new(stream_position: u64) -> Self {
60        Self { stream_position }
61    }
62}
63
64impl Decoder for TsPacketDecoder {
65    type Item = (u64, TsPacket);
66    type Error = TsPacketError;
67    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>> {
68        if src.len() < TsPacket::PACKET_SIZE {
69            return Ok(None);
70        }
71
72        // Scan for a valid sync byte: 0x47 at current position AND 0x47 at +PACKET_SIZE
73        // (or current position is at end-of-buffer, in which case we accept a single sync).
74        loop {
75            // Skip non-0x47 bytes
76            while !src.is_empty() && src[0] != 0x47 {
77                self.stream_position += 1;
78                src.advance(1);
79            }
80
81            if src.len() < TsPacket::PACKET_SIZE {
82                return Ok(None);
83            }
84
85            // Verify: the byte at +PACKET_SIZE should also be 0x47 (if data available),
86            // otherwise this 0x47 is likely a false positive.
87            if src.len() > TsPacket::PACKET_SIZE && src[TsPacket::PACKET_SIZE] != 0x47 {
88                // False sync — skip this byte and continue scanning
89                self.stream_position += 1;
90                src.advance(1);
91                continue;
92            }
93
94            break;
95        }
96
97        let position = self.stream_position;
98        self.stream_position += TsPacket::PACKET_SIZE as u64;
99
100        let packet = TsPacket::from_bytes(src.split_to(TsPacket::PACKET_SIZE).freeze())
101            .ok_or(TsPacketError::InvalidPacket)?;
102        Ok(Some((position, packet)))
103    }
104}
105
106#[cfg(test)]
107mod tests {
108    use super::*;
109
110    /// Build a minimal valid 188-byte TS packet with payload only (no adaptation field).
111    /// PID = pid, continuity_counter = cc, payload filled with `fill`.
112    fn make_packet(pid: u16, cc: u8, fill: u8) -> [u8; TsPacket::PACKET_SIZE] {
113        let mut buf = [fill; TsPacket::PACKET_SIZE];
114        // Header: sync=0x47, TEI=0, PUSI=0, priority=0, PID=pid,
115        //         scrambling=00, adaptation_field=0, payload=1, cc
116        buf[0] = 0x47;
117        buf[1] = (pid >> 8) as u8 & 0x1F; // top 5 bits of PID
118        buf[2] = pid as u8; // low 8 bits of PID
119        // byte 3: scrambling(2)=00, AF(1)=0, payload(1)=1, cc(4)
120        buf[3] = 0x10 | (cc & 0x0F);
121        buf
122    }
123
124    /// Build a 188-byte TS packet with an adaptation field (flags only, rest stuffing).
125    fn make_packet_with_af(pid: u16, af_len: u8, af_flags: u8) -> [u8; TsPacket::PACKET_SIZE] {
126        let mut buf = [0xFF; TsPacket::PACKET_SIZE];
127        buf[0] = 0x47;
128        buf[1] = (pid >> 8) as u8 & 0x1F;
129        buf[2] = pid as u8;
130        // AF=1, payload=1
131        buf[3] = 0x30;
132        buf[4] = af_len;
133        if af_len > 0 {
134            buf[5] = af_flags;
135        }
136        buf
137    }
138
139    // ------- TsPacket::from_bytes tests -------
140
141    #[test]
142    fn test_from_bytes_valid_payload_only() {
143        let pkt = make_packet(0x100, 3, 0xAB);
144        let ts = TsPacket::from_bytes(Bytes::copy_from_slice(&pkt)).unwrap();
145        assert_eq!(ts.header.pid(), 0x100);
146        assert_eq!(ts.header.continuity_counter(), 3);
147        assert!(ts.header.payload());
148        assert!(!ts.header.adaptation_field());
149        assert!(ts.adaptation_field.is_none());
150        assert_eq!(ts.payload.len(), 184); // 188 - 4
151    }
152
153    #[test]
154    fn test_from_bytes_with_adaptation_field() {
155        let pkt = make_packet_with_af(0x01, 7, 0x10); // af_flags=PCR set
156        // We need valid PCR bytes; the make helper fills with 0xFF which is fine for PCR data
157        let ts = TsPacket::from_bytes(Bytes::copy_from_slice(&pkt)).unwrap();
158        assert!(ts.header.adaptation_field());
159        assert!(ts.adaptation_field.is_some());
160        let af = ts.adaptation_field.unwrap();
161        assert!(af.flags.pcr_flag());
162    }
163
164    #[test]
165    fn test_from_bytes_bad_sync_returns_none() {
166        let mut pkt = make_packet(0x00, 0, 0);
167        pkt[0] = 0x00; // corrupt sync byte
168        assert!(TsPacket::from_bytes(Bytes::copy_from_slice(&pkt)).is_none());
169    }
170
171    #[test]
172    fn test_from_bytes_too_short_returns_none() {
173        assert!(TsPacket::from_bytes(Bytes::from_static(&[0x47, 0x00, 0x00, 0x10])).is_none());
174    }
175
176    // ------- TsPacketDecoder tests -------
177
178    #[test]
179    fn test_decoder_not_enough_data() {
180        let mut decoder = TsPacketDecoder::new(0);
181        let mut buf = BytesMut::from(&[0x47u8; 100][..]);
182        let result = decoder.decode(&mut buf).unwrap();
183        assert!(result.is_none());
184        // Buffer should not be consumed
185        assert_eq!(buf.len(), 100);
186    }
187
188    #[test]
189    fn test_decoder_exact_packet() {
190        let mut decoder = TsPacketDecoder::new(0);
191        let pkt = make_packet(0x20, 5, 0x00);
192        let mut buf = BytesMut::from(&pkt[..]);
193        let result = decoder.decode(&mut buf).unwrap();
194        assert!(result.is_some());
195        let (pos, ts) = result.unwrap();
196        assert_eq!(pos, 0);
197        assert_eq!(ts.header.pid(), 0x20);
198        assert_eq!(ts.header.continuity_counter(), 5);
199        assert_eq!(buf.len(), 0);
200    }
201
202    #[test]
203    fn test_decoder_skips_garbage_before_sync() {
204        let mut decoder = TsPacketDecoder::new(0);
205        let pkt1 = make_packet(0x30, 7, 0xCC);
206        let pkt2 = make_packet(0x31, 0, 0x00);
207        let mut buf = BytesMut::new();
208        buf.extend_from_slice(&[0x00, 0xFF, 0xAA]); // 3 garbage bytes
209        buf.extend_from_slice(&pkt1);
210        buf.extend_from_slice(&pkt2); // needed so sync at pkt1 can be verified
211        let (pos, ts) = decoder.decode(&mut buf).unwrap().unwrap();
212        assert_eq!(pos, 3); // skipped 3 garbage bytes
213        assert_eq!(ts.header.pid(), 0x30);
214        assert_eq!(buf.len(), 188); // pkt2 remains
215    }
216
217    #[test]
218    fn test_decoder_two_packets_sequential() {
219        let mut decoder = TsPacketDecoder::new(0);
220        let pkt1 = make_packet(0x100, 0, 0x11);
221        let pkt2 = make_packet(0x200, 1, 0x22);
222        let mut buf = BytesMut::new();
223        buf.extend_from_slice(&pkt1);
224        buf.extend_from_slice(&pkt2);
225
226        let (pos1, ts1) = decoder.decode(&mut buf).unwrap().unwrap();
227        assert_eq!(pos1, 0);
228        assert_eq!(ts1.header.pid(), 0x100);
229        assert_eq!(buf.len(), 188);
230
231        let (pos2, ts2) = decoder.decode(&mut buf).unwrap().unwrap();
232        assert_eq!(pos2, 188);
233        assert_eq!(ts2.header.pid(), 0x200);
234        assert_eq!(buf.len(), 0);
235    }
236
237    #[test]
238    fn test_decoder_partial_then_complete() {
239        let mut decoder = TsPacketDecoder::new(0);
240        let pkt = make_packet(0x42, 2, 0xDD);
241        let mut buf = BytesMut::new();
242
243        // Feed first half
244        buf.extend_from_slice(&pkt[..100]);
245        assert!(decoder.decode(&mut buf).unwrap().is_none());
246
247        // Feed remaining
248        buf.extend_from_slice(&pkt[100..]);
249        let (pos, ts) = decoder.decode(&mut buf).unwrap().unwrap();
250        assert_eq!(pos, 0);
251        assert_eq!(ts.header.pid(), 0x42);
252        assert_eq!(buf.len(), 0);
253    }
254
255    #[test]
256    fn test_decoder_empty_buffer() {
257        let mut decoder = TsPacketDecoder::new(0);
258        let mut buf = BytesMut::new();
259        assert!(decoder.decode(&mut buf).unwrap().is_none());
260    }
261
262    #[test]
263    fn test_decoder_with_initial_stream_position() {
264        let mut decoder = TsPacketDecoder::new(1000);
265        let pkt1 = make_packet(0x50, 0, 0x00);
266        let pkt2 = make_packet(0x51, 1, 0x00);
267        let mut buf = BytesMut::new();
268        buf.extend_from_slice(&pkt1);
269        buf.extend_from_slice(&pkt2);
270
271        let (pos, ts) = decoder.decode(&mut buf).unwrap().unwrap();
272        assert_eq!(pos, 1000);
273        assert_eq!(ts.header.pid(), 0x50);
274        assert_eq!(decoder.stream_position, 1188);
275    }
276
277    #[test]
278    fn test_decoder_false_sync_byte_skipped() {
279        // A 0x47 byte that does NOT have another 0x47 at +188 should be skipped.
280        let mut decoder = TsPacketDecoder::new(0);
281        let pkt = make_packet(0x60, 0, 0x00);
282        let mut buf = BytesMut::new();
283        // Put a false 0x47 followed by garbage, then a real packet pair
284        buf.extend_from_slice(&[0x47]); // false sync at offset 0
285        buf.extend_from_slice(&[0x00; 187]); // padding (total 188 so far)
286        buf.extend_from_slice(&[0x00]); // byte at 188 is NOT 0x47 → false sync
287        buf.extend_from_slice(&[0x00; 187]); // more padding
288        // Now at offset 376 place two real packets
289        let pkt2 = make_packet(0x61, 1, 0x00);
290        buf.extend_from_slice(&pkt);
291        buf.extend_from_slice(&pkt2);
292
293        let (pos, ts) = decoder.decode(&mut buf).unwrap().unwrap();
294        // Should have skipped past the false 0x47 and found the real packet
295        assert_eq!(pos, 376);
296        assert_eq!(ts.header.pid(), 0x60);
297    }
298}