1use crate::*;
2use tokio_util::codec::Decoder;
3
4pub struct TsPacket {
5 pub header: TransportStreamHeader,
6 pub adaptation_field: Option<AdaptationField>,
7 pub payload: Bytes,
8}
9
10impl std::fmt::Debug for TsPacket {
11 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
12 f.debug_struct("TsPacket")
13 .field("header", &self.header)
14 .field("adaptation_field", &self.adaptation_field)
15 .field("payload_len", &self.payload.len())
16 .finish()
17 }
18}
19
20impl TsPacket {
21 pub const PACKET_SIZE: usize = 188;
22 pub fn from_bytes(data: Bytes) -> Option<Self> {
23 if data.len() < Self::PACKET_SIZE {
24 return None;
25 }
26 let header =
27 TransportStreamHeader::from_bits(u32::from_be_bytes(data.get(0..4)?.try_into().ok()?));
28 if header.sync_byte() != 0x47 {
29 return None;
30 }
31 let mut adaption_field = None;
32 let mut index = 4;
33 if header.adaptation_field() {
34 let adaption_field_length = *data.get(index)? as usize;
35 index += 1;
36 if index + adaption_field_length > data.len() {
37 return None;
38 }
39 if adaption_field_length > 0 {
40 let field_data = data.slice(index..index + adaption_field_length);
41 index += adaption_field_length;
42 adaption_field = Some(AdaptationField::from_bytes(field_data)?);
43 }
44 }
45 Some(Self {
46 header,
47 adaptation_field: adaption_field,
48 payload: data.slice(index..Self::PACKET_SIZE),
49 })
50 }
51}
52
53pub struct TsPacketDecoder {
55 pub stream_position: u64,
56}
57
58impl TsPacketDecoder {
59 pub fn new(stream_position: u64) -> Self {
60 Self { stream_position }
61 }
62}
63
64impl Decoder for TsPacketDecoder {
65 type Item = (u64, TsPacket);
66 type Error = TsPacketError;
67 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>> {
68 if src.len() < TsPacket::PACKET_SIZE {
69 return Ok(None);
70 }
71
72 loop {
75 while !src.is_empty() && src[0] != 0x47 {
77 self.stream_position += 1;
78 src.advance(1);
79 }
80
81 if src.len() < TsPacket::PACKET_SIZE {
82 return Ok(None);
83 }
84
85 if src.len() > TsPacket::PACKET_SIZE && src[TsPacket::PACKET_SIZE] != 0x47 {
88 self.stream_position += 1;
90 src.advance(1);
91 continue;
92 }
93
94 break;
95 }
96
97 let position = self.stream_position;
98 self.stream_position += TsPacket::PACKET_SIZE as u64;
99
100 let packet = TsPacket::from_bytes(src.split_to(TsPacket::PACKET_SIZE).freeze())
101 .ok_or(TsPacketError::InvalidPacket)?;
102 Ok(Some((position, packet)))
103 }
104}
105
106#[cfg(test)]
107mod tests {
108 use super::*;
109
110 fn make_packet(pid: u16, cc: u8, fill: u8) -> [u8; TsPacket::PACKET_SIZE] {
113 let mut buf = [fill; TsPacket::PACKET_SIZE];
114 buf[0] = 0x47;
117 buf[1] = (pid >> 8) as u8 & 0x1F; buf[2] = pid as u8; buf[3] = 0x10 | (cc & 0x0F);
121 buf
122 }
123
124 fn make_packet_with_af(pid: u16, af_len: u8, af_flags: u8) -> [u8; TsPacket::PACKET_SIZE] {
126 let mut buf = [0xFF; TsPacket::PACKET_SIZE];
127 buf[0] = 0x47;
128 buf[1] = (pid >> 8) as u8 & 0x1F;
129 buf[2] = pid as u8;
130 buf[3] = 0x30;
132 buf[4] = af_len;
133 if af_len > 0 {
134 buf[5] = af_flags;
135 }
136 buf
137 }
138
139 #[test]
142 fn test_from_bytes_valid_payload_only() {
143 let pkt = make_packet(0x100, 3, 0xAB);
144 let ts = TsPacket::from_bytes(Bytes::copy_from_slice(&pkt)).unwrap();
145 assert_eq!(ts.header.pid(), 0x100);
146 assert_eq!(ts.header.continuity_counter(), 3);
147 assert!(ts.header.payload());
148 assert!(!ts.header.adaptation_field());
149 assert!(ts.adaptation_field.is_none());
150 assert_eq!(ts.payload.len(), 184); }
152
153 #[test]
154 fn test_from_bytes_with_adaptation_field() {
155 let pkt = make_packet_with_af(0x01, 7, 0x10); let ts = TsPacket::from_bytes(Bytes::copy_from_slice(&pkt)).unwrap();
158 assert!(ts.header.adaptation_field());
159 assert!(ts.adaptation_field.is_some());
160 let af = ts.adaptation_field.unwrap();
161 assert!(af.flags.pcr_flag());
162 }
163
164 #[test]
165 fn test_from_bytes_bad_sync_returns_none() {
166 let mut pkt = make_packet(0x00, 0, 0);
167 pkt[0] = 0x00; assert!(TsPacket::from_bytes(Bytes::copy_from_slice(&pkt)).is_none());
169 }
170
171 #[test]
172 fn test_from_bytes_too_short_returns_none() {
173 assert!(TsPacket::from_bytes(Bytes::from_static(&[0x47, 0x00, 0x00, 0x10])).is_none());
174 }
175
176 #[test]
179 fn test_decoder_not_enough_data() {
180 let mut decoder = TsPacketDecoder::new(0);
181 let mut buf = BytesMut::from(&[0x47u8; 100][..]);
182 let result = decoder.decode(&mut buf).unwrap();
183 assert!(result.is_none());
184 assert_eq!(buf.len(), 100);
186 }
187
188 #[test]
189 fn test_decoder_exact_packet() {
190 let mut decoder = TsPacketDecoder::new(0);
191 let pkt = make_packet(0x20, 5, 0x00);
192 let mut buf = BytesMut::from(&pkt[..]);
193 let result = decoder.decode(&mut buf).unwrap();
194 assert!(result.is_some());
195 let (pos, ts) = result.unwrap();
196 assert_eq!(pos, 0);
197 assert_eq!(ts.header.pid(), 0x20);
198 assert_eq!(ts.header.continuity_counter(), 5);
199 assert_eq!(buf.len(), 0);
200 }
201
202 #[test]
203 fn test_decoder_skips_garbage_before_sync() {
204 let mut decoder = TsPacketDecoder::new(0);
205 let pkt1 = make_packet(0x30, 7, 0xCC);
206 let pkt2 = make_packet(0x31, 0, 0x00);
207 let mut buf = BytesMut::new();
208 buf.extend_from_slice(&[0x00, 0xFF, 0xAA]); buf.extend_from_slice(&pkt1);
210 buf.extend_from_slice(&pkt2); let (pos, ts) = decoder.decode(&mut buf).unwrap().unwrap();
212 assert_eq!(pos, 3); assert_eq!(ts.header.pid(), 0x30);
214 assert_eq!(buf.len(), 188); }
216
217 #[test]
218 fn test_decoder_two_packets_sequential() {
219 let mut decoder = TsPacketDecoder::new(0);
220 let pkt1 = make_packet(0x100, 0, 0x11);
221 let pkt2 = make_packet(0x200, 1, 0x22);
222 let mut buf = BytesMut::new();
223 buf.extend_from_slice(&pkt1);
224 buf.extend_from_slice(&pkt2);
225
226 let (pos1, ts1) = decoder.decode(&mut buf).unwrap().unwrap();
227 assert_eq!(pos1, 0);
228 assert_eq!(ts1.header.pid(), 0x100);
229 assert_eq!(buf.len(), 188);
230
231 let (pos2, ts2) = decoder.decode(&mut buf).unwrap().unwrap();
232 assert_eq!(pos2, 188);
233 assert_eq!(ts2.header.pid(), 0x200);
234 assert_eq!(buf.len(), 0);
235 }
236
237 #[test]
238 fn test_decoder_partial_then_complete() {
239 let mut decoder = TsPacketDecoder::new(0);
240 let pkt = make_packet(0x42, 2, 0xDD);
241 let mut buf = BytesMut::new();
242
243 buf.extend_from_slice(&pkt[..100]);
245 assert!(decoder.decode(&mut buf).unwrap().is_none());
246
247 buf.extend_from_slice(&pkt[100..]);
249 let (pos, ts) = decoder.decode(&mut buf).unwrap().unwrap();
250 assert_eq!(pos, 0);
251 assert_eq!(ts.header.pid(), 0x42);
252 assert_eq!(buf.len(), 0);
253 }
254
255 #[test]
256 fn test_decoder_empty_buffer() {
257 let mut decoder = TsPacketDecoder::new(0);
258 let mut buf = BytesMut::new();
259 assert!(decoder.decode(&mut buf).unwrap().is_none());
260 }
261
262 #[test]
263 fn test_decoder_with_initial_stream_position() {
264 let mut decoder = TsPacketDecoder::new(1000);
265 let pkt1 = make_packet(0x50, 0, 0x00);
266 let pkt2 = make_packet(0x51, 1, 0x00);
267 let mut buf = BytesMut::new();
268 buf.extend_from_slice(&pkt1);
269 buf.extend_from_slice(&pkt2);
270
271 let (pos, ts) = decoder.decode(&mut buf).unwrap().unwrap();
272 assert_eq!(pos, 1000);
273 assert_eq!(ts.header.pid(), 0x50);
274 assert_eq!(decoder.stream_position, 1188);
275 }
276
277 #[test]
278 fn test_decoder_false_sync_byte_skipped() {
279 let mut decoder = TsPacketDecoder::new(0);
281 let pkt = make_packet(0x60, 0, 0x00);
282 let mut buf = BytesMut::new();
283 buf.extend_from_slice(&[0x47]); buf.extend_from_slice(&[0x00; 187]); buf.extend_from_slice(&[0x00]); buf.extend_from_slice(&[0x00; 187]); let pkt2 = make_packet(0x61, 1, 0x00);
290 buf.extend_from_slice(&pkt);
291 buf.extend_from_slice(&pkt2);
292
293 let (pos, ts) = decoder.decode(&mut buf).unwrap().unwrap();
294 assert_eq!(pos, 376);
296 assert_eq!(ts.header.pid(), 0x60);
297 }
298}