use crate::*;
use std::collections::{HashMap, VecDeque};
use std::pin::{Pin, pin};
use std::task::{Context, Poll};
use tokio_stream::Stream;
use pat::*;
mod pat;
use pmt::*;
mod pmt;
pub use stream::*;
mod stream;
pub use assembler::*;
mod assembler;
const NULL_PID: u16 = 0x1FFF;
pub enum PesPacket {
Video {
pid: u16,
random_access: Option<bool>, pts: Option<u64>,
dts: Option<u64>,
payload: Bytes,
},
Audio {
pid: u16,
random_access: Option<bool>, pts: Option<u64>,
payload: Bytes,
},
PMT(ProgramMapTable),
PAT(ProgramAssociationTable),
PES {
stream_id: u8,
data: Bytes,
},
Section {
table_id: u8,
data: Bytes,
},
Null,
Private(Bytes),
}
impl std::fmt::Debug for PesPacket {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PesPacket::Video {
pid,
pts,
dts,
payload,
random_access,
} => f
.debug_struct("Video")
.field("pid", &format_args!("0x{pid:02X}"))
.field("pts", pts)
.field("dts", dts)
.field("random_access", random_access)
.field("payload_len", &payload.len())
.field("payload_preview", &&payload[..payload.len().min(16)])
.finish(),
PesPacket::Audio {
pid,
pts,
payload,
random_access,
} => f
.debug_struct("Audio")
.field("pid", &format_args!("0x{pid:02X}"))
.field("pts", pts)
.field("random_access", random_access)
.field("payload_len", &payload.len())
.field("payload_preview", &&payload[..payload.len().min(16)])
.finish(),
PesPacket::PMT(pmt) => write!(f, "PMT({pmt:?})"),
PesPacket::PAT(pat) => write!(f, "PAT({pat:?})"),
PesPacket::PES { stream_id, data } => f
.debug_struct("PES")
.field("stream_id", &format_args!("0x{stream_id:02X}"))
.field("data_prefix", &&data[..data.len().min(16)])
.field("data_len", &data.len())
.finish(),
PesPacket::Section { table_id, data } => f
.debug_struct("Section")
.field("table_id", &format_args!("0x{table_id:02X}"))
.field("data_prefix", &&data[..data.len().min(16)])
.field("data_len", &data.len())
.finish(),
PesPacket::Null => write!(f, "Null"),
PesPacket::Private(data) => f
.debug_struct("Private")
.field("data_prefix", &&data[..data.len().min(16)])
.field("data_len", &data.len())
.finish(),
}
}
}
fn parse_timestamp(data: &[u8]) -> Option<u64> {
if data.len() < 5 {
return None;
}
let ts = ((data[0] as u64 >> 1) & 0x07) << 30
| (data[1] as u64) << 22
| ((data[2] as u64 >> 1) & 0x7F) << 15
| (data[3] as u64) << 7
| (data[4] as u64 >> 1) & 0x7F;
Some(ts)
}
fn parse_pes_packet(pid: u16, random_access_indicator: Option<bool>, data: Bytes) -> PesPacket {
if data.len() < 9 {
let stream_id = if data.len() >= 4 { data[3] } else { 0 };
return PesPacket::PES { stream_id, data };
}
let stream_id = data[3];
let pts_dts_flags = (data[7] >> 6) & 0x03;
let pes_header_data_length = data[8] as usize;
let header_end = 9 + pes_header_data_length;
let optional_header = &data[9..data.len().min(header_end)];
let pts = if pts_dts_flags >= 0b10 && optional_header.len() >= 5 {
parse_timestamp(&optional_header[0..5])
} else {
None
};
let dts = if pts_dts_flags == 0b11 && optional_header.len() >= 10 {
parse_timestamp(&optional_header[5..10])
} else {
None
};
let payload_start = header_end.min(data.len());
let payload = data.slice(payload_start..);
if (0xE0..=0xEF).contains(&stream_id) {
return PesPacket::Video {
pid,
random_access: random_access_indicator,
pts,
dts,
payload,
};
}
if (0xC0..=0xDF).contains(&stream_id) {
return PesPacket::Audio {
pid,
random_access: random_access_indicator,
pts,
payload,
};
}
PesPacket::PES { stream_id, data }
}
fn parse_section(data: Bytes) -> PesPacket {
if data.is_empty() {
return PesPacket::Private(data);
}
let table_id = data[0];
match table_id {
0x00 => {
if let Some(pat) = ProgramAssociationTable::from_bytes(&data) {
return PesPacket::PAT(pat);
}
}
0x02 => {
if let Some(pmt) = ProgramMapTable::from_bytes(&data) {
return PesPacket::PMT(pmt);
}
}
_ => {}
}
PesPacket::Section { table_id, data }
}
#[derive(Debug)]
struct PidBuffer {
data: BytesMut,
is_pes: bool,
random_access_indicator: Option<bool>,
}