1use super::*;
4pub struct PacketizedElementaryStream<S> {
10 inner: S,
11 buffers: HashMap<u16, PidBuffer>,
12 pending: VecDeque<PesPacket>,
13 done: bool,
14}
15
16impl<S> PacketizedElementaryStream<S>
17where
18 S: Stream<Item = std::result::Result<(u64, TsPacket), TsPacketError>>,
19 S: Unpin,
20{
21 pub fn from_ts_stream(inner: S) -> Self {
22 Self {
23 inner,
24 buffers: HashMap::new(),
25 pending: VecDeque::new(),
26 done: false,
27 }
28 }
29
30 fn flush_buffer(&mut self, pid: u16) {
33 let Some(buf) = self.buffers.remove(&pid) else {
34 return;
35 };
36 if buf.data.is_empty() {
37 return;
38 }
39 let data = buf.data.freeze();
40 let item = if buf.is_pes {
41 if data.len() >= 4 {
42 parse_pes_packet(pid, buf.random_access_indicator, data)
43 } else {
44 PesPacket::Private(data)
45 }
46 } else if !data.is_empty() {
47 parse_section(data)
48 } else {
49 return;
50 };
51 self.pending.push_back(item);
52 }
53
54 fn flush_all(&mut self) {
56 let pids: Vec<u16> = self.buffers.keys().copied().collect();
57 for pid in pids {
58 self.flush_buffer(pid);
59 }
60 }
61
62 fn process_packet(&mut self, packet: TsPacket) {
63 let pid = packet.header.pid();
64
65 if pid == NULL_PID {
67 self.pending.push_back(PesPacket::Null);
68 return;
69 }
70
71 if !packet.header.payload() || packet.payload.is_empty() {
73 return;
74 }
75
76 let pusi = packet.header.payload_unit_start_indicator();
77 let payload = &packet.payload;
78
79 if pusi {
80 let is_pes = payload.len() >= 3
82 && payload[0] == 0x00
83 && payload[1] == 0x00
84 && payload[2] == 0x01;
85
86 if is_pes {
87 self.flush_buffer(pid);
89 let random_access_indicator = packet
90 .adaptation_field
91 .as_ref()
92 .map(|af| af.flags.random_access_indicator());
93 self.buffers.insert(
94 pid,
95 PidBuffer {
96 data: BytesMut::from(payload.as_ref()),
97 is_pes: true,
98 random_access_indicator,
99 },
100 );
101 } else {
102 let pointer_field = payload[0] as usize;
104
105 if let Some(buf) = self.buffers.get_mut(&pid) {
107 let end = (1 + pointer_field).min(payload.len());
108 buf.data.extend_from_slice(&payload[1..end]);
109 }
110 self.flush_buffer(pid);
111
112 let start = 1 + pointer_field;
114 if start < payload.len() {
115 self.buffers.insert(
116 pid,
117 PidBuffer {
118 data: BytesMut::from(&payload[start..]),
119 is_pes: false,
120 random_access_indicator: None,
121 },
122 );
123 }
124 }
125 } else {
126 if let Some(buf) = self.buffers.get_mut(&pid) {
128 buf.data.extend_from_slice(payload);
129 }
130 }
131 }
132
133 pub fn into_inner(self) -> S {
135 self.inner
136 }
137}
138
139impl<S> Stream for PacketizedElementaryStream<S>
140where
141 S: Stream<Item = std::result::Result<(u64, TsPacket), TsPacketError>>,
142 S: Unpin,
143{
144 type Item = std::result::Result<PesPacket, TsPacketError>;
145
146 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
147 let this = self.get_mut();
148 loop {
149 if let Some(item) = this.pending.pop_front() {
151 return Poll::Ready(Some(Ok(item)));
152 }
153
154 if this.done {
155 return Poll::Ready(None);
156 }
157
158 match pin!(&mut this.inner).poll_next(cx) {
159 Poll::Ready(Some(Ok((_pos, packet)))) => {
160 this.process_packet(packet);
161 }
163 Poll::Ready(Some(Err(e))) => {
164 return Poll::Ready(Some(Err(e)));
165 }
166 Poll::Ready(None) => {
167 this.done = true;
169 this.flush_all();
170 }
172 Poll::Pending => {
173 return Poll::Pending;
174 }
175 }
176 }
177 }
178}
179
180pub trait TsPacketStreamAssemble: Sized {
181 fn assemble(self) -> PacketizedElementaryStream<Self>;
182}
183impl<S> TsPacketStreamAssemble for S
184where
185 S: Stream<Item = std::result::Result<(u64, TsPacket), TsPacketError>>,
186 S: Unpin,
187{
188 fn assemble(self) -> PacketizedElementaryStream<Self> {
189 PacketizedElementaryStream::from_ts_stream(self)
190 }
191}
192
193#[cfg(test)]
194mod tests {
195 use super::*;
196 use tokio_stream::StreamExt;
197
198 fn make_ts_packet(pid: u16, pusi: bool, payload: &[u8]) -> TsPacket {
200 let header = TransportStreamHeader::new()
201 .with_payload_unit_start_indicator(pusi)
202 .with_pid(pid)
203 .with_payload(true);
204 TsPacket {
205 header,
206 adaptation_field: None,
207 payload: Bytes::copy_from_slice(payload),
208 }
209 }
210
211 fn make_stream(
212 packets: Vec<TsPacket>,
213 ) -> impl Stream<Item = std::result::Result<(u64, TsPacket), TsPacketError>> {
214 tokio_stream::iter(
215 packets
216 .into_iter()
217 .enumerate()
218 .map(|(i, p)| Ok((i as u64 * 188, p))),
219 )
220 }
221
222 #[tokio::test]
225 async fn test_null_packet() {
226 let stream = make_stream(vec![make_ts_packet(NULL_PID, false, &[])]);
227 let mut decoder = PacketizedElementaryStream::from_ts_stream(stream);
228 let item = decoder.next().await.unwrap().unwrap();
229 assert!(matches!(item, PesPacket::Null));
230 assert!(decoder.next().await.is_none());
231 }
232
233 #[tokio::test]
234 async fn test_empty_stream() {
235 let stream = make_stream(vec![]);
236 let mut decoder = PacketizedElementaryStream::from_ts_stream(stream);
237 assert!(decoder.next().await.is_none());
238 }
239
240 #[tokio::test]
243 async fn test_discard_initial_non_pusi() {
244 let stream = make_stream(vec![
245 make_ts_packet(0x100, false, &[0xAA, 0xBB]),
246 make_ts_packet(0x100, false, &[0xCC]),
247 ]);
248 let mut decoder = PacketizedElementaryStream::from_ts_stream(stream);
249 assert!(decoder.next().await.is_none());
251 }
252
253 #[tokio::test]
254 async fn test_discard_then_accept_after_pusi() {
255 let stream = make_stream(vec![
256 make_ts_packet(0x100, false, &[0xAA]), make_ts_packet(0x100, false, &[0xBB]), make_ts_packet(0x100, true, &[0x00, 0x00, 0x01, 0xE0, 0xCC]),
259 ]);
260 let mut decoder = PacketizedElementaryStream::from_ts_stream(stream);
261 let item = decoder.next().await.unwrap().unwrap();
262 match item {
263 PesPacket::PES { stream_id, data } => {
264 assert_eq!(stream_id, 0xE0);
265 assert_eq!(&data[..], &[0x00, 0x00, 0x01, 0xE0, 0xCC]);
266 }
267 other => panic!("Expected PES, got {other:?}"),
268 }
269 }
270
271 #[tokio::test]
274 async fn test_pes_single_packet() {
275 let payload: &[u8] = &[0x00, 0x00, 0x01, 0xE0, 0x11, 0x22];
276 let stream = make_stream(vec![make_ts_packet(0x100, true, payload)]);
277 let mut decoder = PacketizedElementaryStream::from_ts_stream(stream);
278 let item = decoder.next().await.unwrap().unwrap();
279 match item {
280 PesPacket::PES { stream_id, data } => {
281 assert_eq!(stream_id, 0xE0);
282 assert_eq!(&data[..], payload);
283 }
284 other => panic!("Expected PES, got {other:?}"),
285 }
286 }
287
288 #[tokio::test]
289 async fn test_pes_multi_packet() {
290 let stream = make_stream(vec![
291 make_ts_packet(0x100, true, &[0x00, 0x00, 0x01, 0xC0, 0xAA]),
292 make_ts_packet(0x100, false, &[0xBB, 0xCC]),
293 make_ts_packet(0x100, false, &[0xDD]),
294 ]);
295 let mut decoder = PacketizedElementaryStream::from_ts_stream(stream);
296 let item = decoder.next().await.unwrap().unwrap();
297 match item {
298 PesPacket::PES { stream_id, data } => {
299 assert_eq!(stream_id, 0xC0);
300 assert_eq!(&data[..], &[0x00, 0x00, 0x01, 0xC0, 0xAA, 0xBB, 0xCC, 0xDD]);
301 }
302 other => panic!("Expected PES, got {other:?}"),
303 }
304 }
305
306 #[tokio::test]
307 async fn test_pes_flush_on_new_pusi() {
308 let p1: &[u8] = &[0x00, 0x00, 0x01, 0xE0, 0x11];
309 let p2: &[u8] = &[0x00, 0x00, 0x01, 0xE0, 0x22];
310 let stream = make_stream(vec![
311 make_ts_packet(0x100, true, p1),
312 make_ts_packet(0x100, true, p2), ]);
314 let mut decoder = PacketizedElementaryStream::from_ts_stream(stream);
315
316 let item = decoder.next().await.unwrap().unwrap();
318 assert!(matches!(
319 &item,
320 PesPacket::PES {
321 stream_id: 0xE0,
322 ..
323 }
324 ));
325 if let PesPacket::PES { data, .. } = &item {
326 assert_eq!(&data[..], p1);
327 }
328
329 let item = decoder.next().await.unwrap().unwrap();
331 if let PesPacket::PES { data, .. } = &item {
332 assert_eq!(&data[..], p2);
333 }
334 }
335
336 #[tokio::test]
339 async fn test_section_single_packet() {
340 let payload: &[u8] = &[0x00, 0x42, 0xF0, 0x05, 0xAA, 0xBB];
342 let stream = make_stream(vec![make_ts_packet(0x00, true, payload)]);
343 let mut decoder = PacketizedElementaryStream::from_ts_stream(stream);
344 let item = decoder.next().await.unwrap().unwrap();
345 match item {
346 PesPacket::Section { table_id, data } => {
347 assert_eq!(table_id, 0x42);
348 assert_eq!(&data[..], &[0x42, 0xF0, 0x05, 0xAA, 0xBB]);
349 }
350 other => panic!("Expected Section, got {other:?}"),
351 }
352 }
353
354 #[tokio::test]
355 async fn test_section_multi_packet() {
356 let stream = make_stream(vec![
358 make_ts_packet(0x00, true, &[0x00, 0x02, 0xB0, 0x0D]),
359 make_ts_packet(0x00, false, &[0xAA, 0xBB]),
360 ]);
361 let mut decoder = PacketizedElementaryStream::from_ts_stream(stream);
362 let item = decoder.next().await.unwrap().unwrap();
363 match item {
364 PesPacket::Section { table_id, data } => {
365 assert_eq!(table_id, 0x02);
366 assert_eq!(&data[..], &[0x02, 0xB0, 0x0D, 0xAA, 0xBB]);
367 }
368 other => panic!("Expected Section, got {other:?}"),
369 }
370 }
371
372 #[tokio::test]
373 async fn test_section_with_pointer_field() {
374 let stream = make_stream(vec![
377 make_ts_packet(0x00, true, &[0x00, 0x42, 0xAA]),
378 make_ts_packet(0x00, true, &[0x02, 0xBB, 0xCC, 0x43, 0xDD]),
379 ]);
380 let mut decoder = PacketizedElementaryStream::from_ts_stream(stream);
381
382 let item = decoder.next().await.unwrap().unwrap();
384 match item {
385 PesPacket::Section { table_id, data } => {
386 assert_eq!(table_id, 0x42);
387 assert_eq!(&data[..], &[0x42, 0xAA, 0xBB, 0xCC]);
388 }
389 other => panic!("Expected Section, got {other:?}"),
390 }
391
392 let item = decoder.next().await.unwrap().unwrap();
394 match item {
395 PesPacket::Section { table_id, data } => {
396 assert_eq!(table_id, 0x43);
397 assert_eq!(&data[..], &[0x43, 0xDD]);
398 }
399 other => panic!("Expected Section, got {other:?}"),
400 }
401 }
402
403 #[tokio::test]
406 async fn test_multiple_pids_interleaved() {
407 let stream = make_stream(vec![
408 make_ts_packet(0x100, true, &[0x00, 0x00, 0x01, 0xE0, 0x11]), make_ts_packet(0x00, true, &[0x00, 0x00, 0xB0, 0x0D]), make_ts_packet(0x100, false, &[0x22, 0x33]), make_ts_packet(0x00, false, &[0xAA]), ]);
413 let mut decoder = PacketizedElementaryStream::from_ts_stream(stream);
414
415 let mut items = vec![];
416 while let Some(Ok(item)) = decoder.next().await {
417 items.push(item);
418 }
419 assert_eq!(items.len(), 2);
420
421 let pes_item = items
423 .iter()
424 .find(|i| matches!(i, PesPacket::PES { .. }))
425 .expect("expected a PES item");
426 let section_item = items
427 .iter()
428 .find(|i| matches!(i, PesPacket::Section { .. }))
429 .expect("expected a Section item");
430
431 match pes_item {
433 PesPacket::PES { stream_id, data } => {
434 assert_eq!(*stream_id, 0xE0);
435 assert_eq!(&data[..], &[0x00, 0x00, 0x01, 0xE0, 0x11, 0x22, 0x33]);
436 }
437 other => panic!("Expected PES, got {other:?}"),
438 }
439
440 match section_item {
442 PesPacket::Section { table_id, data } => {
443 assert_eq!(*table_id, 0x00);
444 assert_eq!(&data[..], &[0x00, 0xB0, 0x0D, 0xAA]);
445 }
446 other => panic!("Expected Section, got {other:?}"),
447 }
448 }
449
450 #[test]
453 fn test_parse_timestamp_known_value() {
454 let ts_bytes = [0x21, 0x00, 0x05, 0xBF, 0x21];
462 let ts = parse_timestamp(&ts_bytes).unwrap();
463 assert_eq!(ts, 90000);
464 }
465
466 #[test]
467 fn test_parse_timestamp_zero() {
468 let ts_bytes = [0x21, 0x00, 0x01, 0x00, 0x01];
469 let ts = parse_timestamp(&ts_bytes).unwrap();
470 assert_eq!(ts, 0);
471 }
472
473 #[test]
474 fn test_parse_timestamp_too_short() {
475 assert!(parse_timestamp(&[0x21, 0x00, 0x01]).is_none());
476 }
477
478 #[tokio::test]
481 async fn test_video_pes_with_pts() {
482 let pes: Vec<u8> = vec![
484 0x00, 0x00, 0x01, 0xE0, 0x00, 0x10, 0x80, 0x80, 0x05, 0x21, 0x00, 0x05, 0xBF, 0x21, 0xDE, 0xAD, 0xBE, 0xEF, ];
491 let stream = make_stream(vec![make_ts_packet(0x100, true, &pes)]);
492 let mut decoder = PacketizedElementaryStream::from_ts_stream(stream);
493 let item = decoder.next().await.unwrap().unwrap();
494 match item {
495 PesPacket::Video {
496 pid,
497 pts,
498 dts,
499 payload,
500 ..
501 } => {
502 assert_eq!(pid, 0x100);
503 assert_eq!(pts, Some(90000));
504 assert!(dts.is_none());
505 assert_eq!(&payload[..], &[0xDE, 0xAD, 0xBE, 0xEF]);
506 }
507 other => panic!("Expected Video, got {other:?}"),
508 }
509 }
510
511 #[tokio::test]
512 async fn test_video_pes_with_pts_and_dts() {
513 let pes: Vec<u8> = vec![
521 0x00, 0x00, 0x01, 0xE1, 0x00, 0x15, 0x80, 0xC0, 0x0A, 0x21, 0x00, 0x05, 0xBF, 0x21, 0x11, 0x00, 0x03, 0x5F, 0x91, 0xCA, 0xFE, ];
529 let stream = make_stream(vec![make_ts_packet(0x100, true, &pes)]);
530 let mut decoder = PacketizedElementaryStream::from_ts_stream(stream);
531 let item = decoder.next().await.unwrap().unwrap();
532 match item {
533 PesPacket::Video {
534 pid,
535 pts,
536 dts,
537 payload,
538 ..
539 } => {
540 assert_eq!(pid, 0x100);
541 assert_eq!(pts, Some(90000));
542 assert_eq!(dts, Some(45000));
543 assert_eq!(&payload[..], &[0xCA, 0xFE]);
544 }
545 other => panic!("Expected Video, got {other:?}"),
546 }
547 }
548
549 #[tokio::test]
552 async fn test_audio_pes_with_pts() {
553 let pes: Vec<u8> = vec![
554 0x00, 0x00, 0x01, 0xC0, 0x00, 0x0E, 0x80, 0x80, 0x05, 0x21, 0x00, 0x05, 0xBF, 0x21, 0x01, 0x02, 0x03, ];
561 let stream = make_stream(vec![make_ts_packet(0x200, true, &pes)]);
562 let mut decoder = PacketizedElementaryStream::from_ts_stream(stream);
563 let item = decoder.next().await.unwrap().unwrap();
564 match item {
565 PesPacket::Audio {
566 pid, pts, payload, ..
567 } => {
568 assert_eq!(pid, 0x200);
569 assert_eq!(pts, Some(90000));
570 assert_eq!(&payload[..], &[0x01, 0x02, 0x03]);
571 }
572 other => panic!("Expected Audio, got {other:?}"),
573 }
574 }
575
576 #[tokio::test]
577 async fn test_audio_pes_no_pts() {
578 let pes: Vec<u8> = vec![
579 0x00, 0x00, 0x01, 0xDF, 0x00, 0x06, 0x80, 0x00, 0x00, 0xAA, 0xBB, 0xCC, ];
585 let stream = make_stream(vec![make_ts_packet(0x200, true, &pes)]);
586 let mut decoder = PacketizedElementaryStream::from_ts_stream(stream);
587 let item = decoder.next().await.unwrap().unwrap();
588 match item {
589 PesPacket::Audio {
590 pid, pts, payload, ..
591 } => {
592 assert_eq!(pid, 0x200);
593 assert!(pts.is_none());
594 assert_eq!(&payload[..], &[0xAA, 0xBB, 0xCC]);
595 }
596 other => panic!("Expected Audio, got {other:?}"),
597 }
598 }
599
600 #[tokio::test]
603 async fn test_section_dispatches_to_pat() {
604 let pat_section: Vec<u8> = vec![
605 0x00, 0xB0, 0x0D, 0x00, 0x01, 0xC1, 0x00, 0x00, 0x00, 0x01, 0xE1, 0x00, 0x00, 0x00, 0x00, 0x00, ];
614 let mut payload = vec![0x00]; payload.extend_from_slice(&pat_section);
616
617 let stream = make_stream(vec![make_ts_packet(0x00, true, &payload)]);
618 let mut decoder = PacketizedElementaryStream::from_ts_stream(stream);
619 let item = decoder.next().await.unwrap().unwrap();
620 match item {
621 PesPacket::PAT(pat) => {
622 assert_eq!(pat.transport_stream_id, 1);
623 assert_eq!(pat.entries.len(), 1);
624 assert_eq!(pat.entries[0].program_number, 1);
625 assert_eq!(pat.entries[0].pid, 0x100);
626 }
627 other => panic!("Expected PAT, got {other:?}"),
628 }
629 }
630
631 #[tokio::test]
634 async fn test_section_dispatches_to_pmt() {
635 let pmt_section: Vec<u8> = vec![
636 0x02, 0xB0, 0x12, 0x00, 0x01, 0xC1, 0x00, 0x00, 0xE1, 0x00, 0xF0, 0x00, 0x1B, 0xE1, 0x01, 0xF0, 0x00, 0x00, 0x00, 0x00, 0x00, ];
646 let mut payload = vec![0x00]; payload.extend_from_slice(&pmt_section);
648
649 let stream = make_stream(vec![make_ts_packet(0x100, true, &payload)]);
650 let mut decoder = PacketizedElementaryStream::from_ts_stream(stream);
651 let item = decoder.next().await.unwrap().unwrap();
652 match item {
653 PesPacket::PMT(pmt) => {
654 assert_eq!(pmt.program_number, 1);
655 assert_eq!(pmt.pcr_pid, 0x100);
656 assert_eq!(pmt.entries.len(), 1);
657 assert_eq!(pmt.entries[0].stream_type, StreamType::H264);
658 assert_eq!(pmt.entries[0].elementary_pid, 0x101);
659 }
660 other => panic!("Expected PMT, got {other:?}"),
661 }
662 }
663}