Skip to main content

m2ts_packet/pes_packet/
stream.rs

1//! Stream adapter that reassembles TS packets into complete PES packets and PSI sections.
2
3use super::*;
4/// Stream adapter that reassembles TS packets into complete PES packets and PSI sections.
5///
6/// Buffers payload data per-PID. When a new payload unit start indicator (PUSI) arrives,
7/// the previously buffered data for that PID is flushed as a complete [`PesPacket`] item.
8/// Continuation packets without a prior PUSI for their PID are discarded.
9pub struct PacketizedElementaryStream<S> {
10    inner: S,
11    buffers: HashMap<u16, PidBuffer>,
12    pending: VecDeque<PesPacket>,
13    done: bool,
14}
15
16impl<S> PacketizedElementaryStream<S>
17where
18    S: Stream<Item = std::result::Result<(u64, TsPacket), TsPacketError>>,
19    S: Unpin,
20{
21    pub fn from_ts_stream(inner: S) -> Self {
22        Self {
23            inner,
24            buffers: HashMap::new(),
25            pending: VecDeque::new(),
26            done: false,
27        }
28    }
29
30    /// Remove the buffer for `pid` and push its contents as a [`PesPacket`] item to the
31    /// pending queue. Does nothing if the buffer is empty or missing.
32    fn flush_buffer(&mut self, pid: u16) {
33        let Some(buf) = self.buffers.remove(&pid) else {
34            return;
35        };
36        if buf.data.is_empty() {
37            return;
38        }
39        let data = buf.data.freeze();
40        let item = if buf.is_pes {
41            if data.len() >= 4 {
42                parse_pes_packet(pid, buf.random_access_indicator, data)
43            } else {
44                PesPacket::Private(data)
45            }
46        } else if !data.is_empty() {
47            parse_section(data)
48        } else {
49            return;
50        };
51        self.pending.push_back(item);
52    }
53
54    /// Flush all remaining PID buffers (called when the inner stream ends).
55    fn flush_all(&mut self) {
56        let pids: Vec<u16> = self.buffers.keys().copied().collect();
57        for pid in pids {
58            self.flush_buffer(pid);
59        }
60    }
61
62    fn process_packet(&mut self, packet: TsPacket) {
63        let pid = packet.header.pid();
64
65        // Null packets
66        if pid == NULL_PID {
67            self.pending.push_back(PesPacket::Null);
68            return;
69        }
70
71        // Skip packets without payload
72        if !packet.header.payload() || packet.payload.is_empty() {
73            return;
74        }
75
76        let pusi = packet.header.payload_unit_start_indicator();
77        let payload = &packet.payload;
78
79        if pusi {
80            // Detect PES: payload starts with start-code prefix 0x00 0x00 0x01
81            let is_pes = payload.len() >= 3
82                && payload[0] == 0x00
83                && payload[1] == 0x00
84                && payload[2] == 0x01;
85
86            if is_pes {
87                // Flush any previously accumulated data for this PID
88                self.flush_buffer(pid);
89                let random_access_indicator = packet
90                    .adaptation_field
91                    .as_ref()
92                    .map(|af| af.flags.random_access_indicator());
93                self.buffers.insert(
94                    pid,
95                    PidBuffer {
96                        data: BytesMut::from(payload.as_ref()),
97                        is_pes: true,
98                        random_access_indicator,
99                    },
100                );
101            } else {
102                // PSI section — first byte is the pointer field
103                let pointer_field = payload[0] as usize;
104
105                // Append trailing bytes of the previous section and flush it
106                if let Some(buf) = self.buffers.get_mut(&pid) {
107                    let end = (1 + pointer_field).min(payload.len());
108                    buf.data.extend_from_slice(&payload[1..end]);
109                }
110                self.flush_buffer(pid);
111
112                // Start new section buffer after the pointer field
113                let start = 1 + pointer_field;
114                if start < payload.len() {
115                    self.buffers.insert(
116                        pid,
117                        PidBuffer {
118                            data: BytesMut::from(&payload[start..]),
119                            is_pes: false,
120                            random_access_indicator: None,
121                        },
122                    );
123                }
124            }
125        } else {
126            // Continuation packet — append to existing buffer, or discard if no PUSI seen yet
127            if let Some(buf) = self.buffers.get_mut(&pid) {
128                buf.data.extend_from_slice(payload);
129            }
130        }
131    }
132
133    /// Consume the decoder and return the inner stream. Any pending buffered data is lost.
134    pub fn into_inner(self) -> S {
135        self.inner
136    }
137}
138
139impl<S> Stream for PacketizedElementaryStream<S>
140where
141    S: Stream<Item = std::result::Result<(u64, TsPacket), TsPacketError>>,
142    S: Unpin,
143{
144    type Item = std::result::Result<PesPacket, TsPacketError>;
145
146    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
147        let this = self.get_mut();
148        loop {
149            // Drain pending items first
150            if let Some(item) = this.pending.pop_front() {
151                return Poll::Ready(Some(Ok(item)));
152            }
153
154            if this.done {
155                return Poll::Ready(None);
156            }
157
158            match pin!(&mut this.inner).poll_next(cx) {
159                Poll::Ready(Some(Ok((_pos, packet)))) => {
160                    this.process_packet(packet);
161                    // loop back to check pending
162                }
163                Poll::Ready(Some(Err(e))) => {
164                    return Poll::Ready(Some(Err(e)));
165                }
166                Poll::Ready(None) => {
167                    // Inner stream finished — flush all remaining buffers
168                    this.done = true;
169                    this.flush_all();
170                    // loop back to drain pending
171                }
172                Poll::Pending => {
173                    return Poll::Pending;
174                }
175            }
176        }
177    }
178}
179
180pub trait TsPacketStreamAssemble: Sized {
181    fn assemble(self) -> PacketizedElementaryStream<Self>;
182}
183impl<S> TsPacketStreamAssemble for S
184where
185    S: Stream<Item = std::result::Result<(u64, TsPacket), TsPacketError>>,
186    S: Unpin,
187{
188    fn assemble(self) -> PacketizedElementaryStream<Self> {
189        PacketizedElementaryStream::from_ts_stream(self)
190    }
191}
192
193#[cfg(test)]
194mod tests {
195    use super::*;
196    use tokio_stream::StreamExt;
197
198    /// Build a TsPacket with the given PID, PUSI flag, and raw payload.
199    fn make_ts_packet(pid: u16, pusi: bool, payload: &[u8]) -> TsPacket {
200        let header = TransportStreamHeader::new()
201            .with_payload_unit_start_indicator(pusi)
202            .with_pid(pid)
203            .with_payload(true);
204        TsPacket {
205            header,
206            adaptation_field: None,
207            payload: Bytes::copy_from_slice(payload),
208        }
209    }
210
211    fn make_stream(
212        packets: Vec<TsPacket>,
213    ) -> impl Stream<Item = std::result::Result<(u64, TsPacket), TsPacketError>> {
214        tokio_stream::iter(
215            packets
216                .into_iter()
217                .enumerate()
218                .map(|(i, p)| Ok((i as u64 * 188, p))),
219        )
220    }
221
222    // ---- basic tests ----
223
224    #[tokio::test]
225    async fn test_null_packet() {
226        let stream = make_stream(vec![make_ts_packet(NULL_PID, false, &[])]);
227        let mut decoder = PacketizedElementaryStream::from_ts_stream(stream);
228        let item = decoder.next().await.unwrap().unwrap();
229        assert!(matches!(item, PesPacket::Null));
230        assert!(decoder.next().await.is_none());
231    }
232
233    #[tokio::test]
234    async fn test_empty_stream() {
235        let stream = make_stream(vec![]);
236        let mut decoder = PacketizedElementaryStream::from_ts_stream(stream);
237        assert!(decoder.next().await.is_none());
238    }
239
240    // ---- discard tests ----
241
242    #[tokio::test]
243    async fn test_discard_initial_non_pusi() {
244        let stream = make_stream(vec![
245            make_ts_packet(0x100, false, &[0xAA, 0xBB]),
246            make_ts_packet(0x100, false, &[0xCC]),
247        ]);
248        let mut decoder = PacketizedElementaryStream::from_ts_stream(stream);
249        // All packets discarded — no PUSI seen
250        assert!(decoder.next().await.is_none());
251    }
252
253    #[tokio::test]
254    async fn test_discard_then_accept_after_pusi() {
255        let stream = make_stream(vec![
256            make_ts_packet(0x100, false, &[0xAA]), // discarded
257            make_ts_packet(0x100, false, &[0xBB]), // discarded
258            make_ts_packet(0x100, true, &[0x00, 0x00, 0x01, 0xE0, 0xCC]),
259        ]);
260        let mut decoder = PacketizedElementaryStream::from_ts_stream(stream);
261        let item = decoder.next().await.unwrap().unwrap();
262        match item {
263            PesPacket::PES { stream_id, data } => {
264                assert_eq!(stream_id, 0xE0);
265                assert_eq!(&data[..], &[0x00, 0x00, 0x01, 0xE0, 0xCC]);
266            }
267            other => panic!("Expected PES, got {other:?}"),
268        }
269    }
270
271    // ---- PES tests ----
272
273    #[tokio::test]
274    async fn test_pes_single_packet() {
275        let payload: &[u8] = &[0x00, 0x00, 0x01, 0xE0, 0x11, 0x22];
276        let stream = make_stream(vec![make_ts_packet(0x100, true, payload)]);
277        let mut decoder = PacketizedElementaryStream::from_ts_stream(stream);
278        let item = decoder.next().await.unwrap().unwrap();
279        match item {
280            PesPacket::PES { stream_id, data } => {
281                assert_eq!(stream_id, 0xE0);
282                assert_eq!(&data[..], payload);
283            }
284            other => panic!("Expected PES, got {other:?}"),
285        }
286    }
287
288    #[tokio::test]
289    async fn test_pes_multi_packet() {
290        let stream = make_stream(vec![
291            make_ts_packet(0x100, true, &[0x00, 0x00, 0x01, 0xC0, 0xAA]),
292            make_ts_packet(0x100, false, &[0xBB, 0xCC]),
293            make_ts_packet(0x100, false, &[0xDD]),
294        ]);
295        let mut decoder = PacketizedElementaryStream::from_ts_stream(stream);
296        let item = decoder.next().await.unwrap().unwrap();
297        match item {
298            PesPacket::PES { stream_id, data } => {
299                assert_eq!(stream_id, 0xC0);
300                assert_eq!(&data[..], &[0x00, 0x00, 0x01, 0xC0, 0xAA, 0xBB, 0xCC, 0xDD]);
301            }
302            other => panic!("Expected PES, got {other:?}"),
303        }
304    }
305
306    #[tokio::test]
307    async fn test_pes_flush_on_new_pusi() {
308        let p1: &[u8] = &[0x00, 0x00, 0x01, 0xE0, 0x11];
309        let p2: &[u8] = &[0x00, 0x00, 0x01, 0xE0, 0x22];
310        let stream = make_stream(vec![
311            make_ts_packet(0x100, true, p1),
312            make_ts_packet(0x100, true, p2), // new PUSI flushes p1
313        ]);
314        let mut decoder = PacketizedElementaryStream::from_ts_stream(stream);
315
316        // First PES flushed when second PUSI arrives
317        let item = decoder.next().await.unwrap().unwrap();
318        assert!(matches!(
319            &item,
320            PesPacket::PES {
321                stream_id: 0xE0,
322                ..
323            }
324        ));
325        if let PesPacket::PES { data, .. } = &item {
326            assert_eq!(&data[..], p1);
327        }
328
329        // Second PES flushed on stream end
330        let item = decoder.next().await.unwrap().unwrap();
331        if let PesPacket::PES { data, .. } = &item {
332            assert_eq!(&data[..], p2);
333        }
334    }
335
336    // ---- PSI section tests ----
337
338    #[tokio::test]
339    async fn test_section_single_packet() {
340        // pointer_field=0, then section: table_id=0x42
341        let payload: &[u8] = &[0x00, 0x42, 0xF0, 0x05, 0xAA, 0xBB];
342        let stream = make_stream(vec![make_ts_packet(0x00, true, payload)]);
343        let mut decoder = PacketizedElementaryStream::from_ts_stream(stream);
344        let item = decoder.next().await.unwrap().unwrap();
345        match item {
346            PesPacket::Section { table_id, data } => {
347                assert_eq!(table_id, 0x42);
348                assert_eq!(&data[..], &[0x42, 0xF0, 0x05, 0xAA, 0xBB]);
349            }
350            other => panic!("Expected Section, got {other:?}"),
351        }
352    }
353
354    #[tokio::test]
355    async fn test_section_multi_packet() {
356        // pointer_field=0, section starts immediately
357        let stream = make_stream(vec![
358            make_ts_packet(0x00, true, &[0x00, 0x02, 0xB0, 0x0D]),
359            make_ts_packet(0x00, false, &[0xAA, 0xBB]),
360        ]);
361        let mut decoder = PacketizedElementaryStream::from_ts_stream(stream);
362        let item = decoder.next().await.unwrap().unwrap();
363        match item {
364            PesPacket::Section { table_id, data } => {
365                assert_eq!(table_id, 0x02);
366                assert_eq!(&data[..], &[0x02, 0xB0, 0x0D, 0xAA, 0xBB]);
367            }
368            other => panic!("Expected Section, got {other:?}"),
369        }
370    }
371
372    #[tokio::test]
373    async fn test_section_with_pointer_field() {
374        // First packet: section starts at pointer_field=0
375        // Second packet: pointer_field=2, 2 bytes finish old section, rest starts new
376        let stream = make_stream(vec![
377            make_ts_packet(0x00, true, &[0x00, 0x42, 0xAA]),
378            make_ts_packet(0x00, true, &[0x02, 0xBB, 0xCC, 0x43, 0xDD]),
379        ]);
380        let mut decoder = PacketizedElementaryStream::from_ts_stream(stream);
381
382        // First section: table_id=0x42, data = [0x42, 0xAA, 0xBB, 0xCC]
383        let item = decoder.next().await.unwrap().unwrap();
384        match item {
385            PesPacket::Section { table_id, data } => {
386                assert_eq!(table_id, 0x42);
387                assert_eq!(&data[..], &[0x42, 0xAA, 0xBB, 0xCC]);
388            }
389            other => panic!("Expected Section, got {other:?}"),
390        }
391
392        // Second section: table_id=0x43, data = [0x43, 0xDD]
393        let item = decoder.next().await.unwrap().unwrap();
394        match item {
395            PesPacket::Section { table_id, data } => {
396                assert_eq!(table_id, 0x43);
397                assert_eq!(&data[..], &[0x43, 0xDD]);
398            }
399            other => panic!("Expected Section, got {other:?}"),
400        }
401    }
402
403    // ---- multi-PID test ----
404
405    #[tokio::test]
406    async fn test_multiple_pids_interleaved() {
407        let stream = make_stream(vec![
408            make_ts_packet(0x100, true, &[0x00, 0x00, 0x01, 0xE0, 0x11]), // PES PID 0x100
409            make_ts_packet(0x00, true, &[0x00, 0x00, 0xB0, 0x0D]),        // Section PID 0
410            make_ts_packet(0x100, false, &[0x22, 0x33]),                  // continue PES
411            make_ts_packet(0x00, false, &[0xAA]),                         // continue Section
412        ]);
413        let mut decoder = PacketizedElementaryStream::from_ts_stream(stream);
414
415        let mut items = vec![];
416        while let Some(Ok(item)) = decoder.next().await {
417            items.push(item);
418        }
419        assert_eq!(items.len(), 2);
420
421        // Find PES and Section regardless of order
422        let pes_item = items
423            .iter()
424            .find(|i| matches!(i, PesPacket::PES { .. }))
425            .expect("expected a PES item");
426        let section_item = items
427            .iter()
428            .find(|i| matches!(i, PesPacket::Section { .. }))
429            .expect("expected a Section item");
430
431        // PES from PID 0x100
432        match pes_item {
433            PesPacket::PES { stream_id, data } => {
434                assert_eq!(*stream_id, 0xE0);
435                assert_eq!(&data[..], &[0x00, 0x00, 0x01, 0xE0, 0x11, 0x22, 0x33]);
436            }
437            other => panic!("Expected PES, got {other:?}"),
438        }
439
440        // Section from PID 0
441        match section_item {
442            PesPacket::Section { table_id, data } => {
443                assert_eq!(*table_id, 0x00);
444                assert_eq!(&data[..], &[0x00, 0xB0, 0x0D, 0xAA]);
445            }
446            other => panic!("Expected Section, got {other:?}"),
447        }
448    }
449
450    // ---- parse_timestamp unit tests ----
451
452    #[test]
453    fn test_parse_timestamp_known_value() {
454        // PTS = 90000 (= 1 second at 90 kHz)
455        // bits 32..30 = 0, 29..22 = 0, 21..15 = 2, 14..7 = 0xBF, 6..0 = 0x10
456        //   byte0: 0010_000_1 = 0x21
457        //   byte1: 0x00
458        //   byte2: 0000010_1 = 0x05
459        //   byte3: 0xBF
460        //   byte4: 0010000_1 = 0x21
461        let ts_bytes = [0x21, 0x00, 0x05, 0xBF, 0x21];
462        let ts = parse_timestamp(&ts_bytes).unwrap();
463        assert_eq!(ts, 90000);
464    }
465
466    #[test]
467    fn test_parse_timestamp_zero() {
468        let ts_bytes = [0x21, 0x00, 0x01, 0x00, 0x01];
469        let ts = parse_timestamp(&ts_bytes).unwrap();
470        assert_eq!(ts, 0);
471    }
472
473    #[test]
474    fn test_parse_timestamp_too_short() {
475        assert!(parse_timestamp(&[0x21, 0x00, 0x01]).is_none());
476    }
477
478    // ---- Video PES tests ----
479
480    #[tokio::test]
481    async fn test_video_pes_with_pts() {
482        // PES with stream_id=0xE0, PTS = 90000
483        let pes: Vec<u8> = vec![
484            0x00, 0x00, 0x01, 0xE0, // start code + stream_id
485            0x00, 0x10, // PES packet length
486            0x80, 0x80, // flags: PTS only
487            0x05, // PES header data length = 5
488            0x21, 0x00, 0x05, 0xBF, 0x21, // PTS = 90000
489            0xDE, 0xAD, 0xBE, 0xEF, // ES payload
490        ];
491        let stream = make_stream(vec![make_ts_packet(0x100, true, &pes)]);
492        let mut decoder = PacketizedElementaryStream::from_ts_stream(stream);
493        let item = decoder.next().await.unwrap().unwrap();
494        match item {
495            PesPacket::Video {
496                pid,
497                pts,
498                dts,
499                payload,
500                ..
501            } => {
502                assert_eq!(pid, 0x100);
503                assert_eq!(pts, Some(90000));
504                assert!(dts.is_none());
505                assert_eq!(&payload[..], &[0xDE, 0xAD, 0xBE, 0xEF]);
506            }
507            other => panic!("Expected Video, got {other:?}"),
508        }
509    }
510
511    #[tokio::test]
512    async fn test_video_pes_with_pts_and_dts() {
513        // DTS = 45000 = 0xAFC8
514        // bits 32..30=0, 29..22=0, 21..15=1, 14..7=0x5F, 6..0=0x48
515        //   byte0: 0001_000_1 = 0x11
516        //   byte1: 0x00
517        //   byte2: 0000001_1 = 0x03
518        //   byte3: 0x5F
519        //   byte4: 1001000_1 = 0x91
520        let pes: Vec<u8> = vec![
521            0x00, 0x00, 0x01, 0xE1, // stream_id = 0xE1
522            0x00, 0x15, // PES packet length
523            0x80, 0xC0, // flags: PTS + DTS
524            0x0A, // header data length = 10
525            0x21, 0x00, 0x05, 0xBF, 0x21, // PTS = 90000
526            0x11, 0x00, 0x03, 0x5F, 0x91, // DTS = 45000
527            0xCA, 0xFE, // ES payload
528        ];
529        let stream = make_stream(vec![make_ts_packet(0x100, true, &pes)]);
530        let mut decoder = PacketizedElementaryStream::from_ts_stream(stream);
531        let item = decoder.next().await.unwrap().unwrap();
532        match item {
533            PesPacket::Video {
534                pid,
535                pts,
536                dts,
537                payload,
538                ..
539            } => {
540                assert_eq!(pid, 0x100);
541                assert_eq!(pts, Some(90000));
542                assert_eq!(dts, Some(45000));
543                assert_eq!(&payload[..], &[0xCA, 0xFE]);
544            }
545            other => panic!("Expected Video, got {other:?}"),
546        }
547    }
548
549    // ---- Audio PES tests ----
550
551    #[tokio::test]
552    async fn test_audio_pes_with_pts() {
553        let pes: Vec<u8> = vec![
554            0x00, 0x00, 0x01, 0xC0, // stream_id = 0xC0
555            0x00, 0x0E, // PES packet length
556            0x80, 0x80, // flags: PTS only
557            0x05, // header data length
558            0x21, 0x00, 0x05, 0xBF, 0x21, // PTS = 90000
559            0x01, 0x02, 0x03, // audio payload
560        ];
561        let stream = make_stream(vec![make_ts_packet(0x200, true, &pes)]);
562        let mut decoder = PacketizedElementaryStream::from_ts_stream(stream);
563        let item = decoder.next().await.unwrap().unwrap();
564        match item {
565            PesPacket::Audio {
566                pid, pts, payload, ..
567            } => {
568                assert_eq!(pid, 0x200);
569                assert_eq!(pts, Some(90000));
570                assert_eq!(&payload[..], &[0x01, 0x02, 0x03]);
571            }
572            other => panic!("Expected Audio, got {other:?}"),
573        }
574    }
575
576    #[tokio::test]
577    async fn test_audio_pes_no_pts() {
578        let pes: Vec<u8> = vec![
579            0x00, 0x00, 0x01, 0xDF, // stream_id = 0xDF
580            0x00, 0x06, // PES packet length
581            0x80, 0x00, // flags: no PTS, no DTS
582            0x00, // header data length = 0
583            0xAA, 0xBB, 0xCC, // audio payload
584        ];
585        let stream = make_stream(vec![make_ts_packet(0x200, true, &pes)]);
586        let mut decoder = PacketizedElementaryStream::from_ts_stream(stream);
587        let item = decoder.next().await.unwrap().unwrap();
588        match item {
589            PesPacket::Audio {
590                pid, pts, payload, ..
591            } => {
592                assert_eq!(pid, 0x200);
593                assert!(pts.is_none());
594                assert_eq!(&payload[..], &[0xAA, 0xBB, 0xCC]);
595            }
596            other => panic!("Expected Audio, got {other:?}"),
597        }
598    }
599
600    // ---- PAT section dispatch test ----
601
602    #[tokio::test]
603    async fn test_section_dispatches_to_pat() {
604        let pat_section: Vec<u8> = vec![
605            0x00, // table_id = 0x00 (PAT)
606            0xB0, 0x0D, // section_syntax_indicator=1, section_length=13
607            0x00, 0x01, // transport_stream_id = 1
608            0xC1, // version=0, current_next=1
609            0x00, 0x00, // section_number, last_section_number
610            0x00, 0x01, // program_number = 1
611            0xE1, 0x00, // reserved + PID = 0x100
612            0x00, 0x00, 0x00, 0x00, // CRC32
613        ];
614        let mut payload = vec![0x00]; // pointer_field = 0
615        payload.extend_from_slice(&pat_section);
616
617        let stream = make_stream(vec![make_ts_packet(0x00, true, &payload)]);
618        let mut decoder = PacketizedElementaryStream::from_ts_stream(stream);
619        let item = decoder.next().await.unwrap().unwrap();
620        match item {
621            PesPacket::PAT(pat) => {
622                assert_eq!(pat.transport_stream_id, 1);
623                assert_eq!(pat.entries.len(), 1);
624                assert_eq!(pat.entries[0].program_number, 1);
625                assert_eq!(pat.entries[0].pid, 0x100);
626            }
627            other => panic!("Expected PAT, got {other:?}"),
628        }
629    }
630
631    // ---- PMT section dispatch test ----
632
633    #[tokio::test]
634    async fn test_section_dispatches_to_pmt() {
635        let pmt_section: Vec<u8> = vec![
636            0x02, // table_id = 0x02 (PMT)
637            0xB0, 0x12, // section_syntax_indicator=1, section_length=18
638            0x00, 0x01, // program_number = 1
639            0xC1, // version=0, current_next=1
640            0x00, 0x00, // section_number, last_section_number
641            0xE1, 0x00, // reserved + PCR_PID = 0x100
642            0xF0, 0x00, // reserved + program_info_length = 0
643            // H.264 video on PID 0x101
644            0x1B, 0xE1, 0x01, 0xF0, 0x00, 0x00, 0x00, 0x00, 0x00, // CRC32
645        ];
646        let mut payload = vec![0x00]; // pointer_field
647        payload.extend_from_slice(&pmt_section);
648
649        let stream = make_stream(vec![make_ts_packet(0x100, true, &payload)]);
650        let mut decoder = PacketizedElementaryStream::from_ts_stream(stream);
651        let item = decoder.next().await.unwrap().unwrap();
652        match item {
653            PesPacket::PMT(pmt) => {
654                assert_eq!(pmt.program_number, 1);
655                assert_eq!(pmt.pcr_pid, 0x100);
656                assert_eq!(pmt.entries.len(), 1);
657                assert_eq!(pmt.entries[0].stream_type, StreamType::H264);
658                assert_eq!(pmt.entries[0].elementary_pid, 0x101);
659            }
660            other => panic!("Expected PMT, got {other:?}"),
661        }
662    }
663}