use parsely_rs::*;
use super::{rtcp_fb_header::RtcpFbHeader, rtcp_header::RtcpHeader};
const U1_ZERO: u1 = u1::new(0);
const U1_ONE: u1 = u1::new(1);
const U2_ZERO: u2 = u2::new(0);
const U2_ONE: u2 = u2::new(1);
const U2_TWO: u2 = u2::new(2);
#[derive(Debug)]
pub struct RtcpFbTccPacket {
pub header: RtcpHeader,
pub fb_header: RtcpFbHeader,
pub packet_reports: Vec<PacketReport>,
pub reference_time: u24,
pub feedback_packet_count: u8,
}
impl RtcpFbTccPacket {
pub const FMT: u5 = u5::new(15);
pub fn payload_length_bytes(&self) -> usize {
todo!()
}
}
impl<B: BitBuf> ParselyRead<B> for RtcpFbTccPacket {
type Ctx = (RtcpHeader, RtcpFbHeader);
fn read<T: ByteOrder>(buf: &mut B, (header, fb_header): Self::Ctx) -> ParselyResult<Self> {
let bytes_remaining_start = buf.remaining_bytes();
let base_seq_num = buf.get_u16::<T>().context("Reading field 'base_seq_num'")?;
let packet_status_count = buf
.get_u16::<T>()
.context("Reading field 'packet_status_count'")?;
let reference_time = buf
.get_u24::<T>()
.context("Reading field 'reference_time'")?;
let feedback_packet_count = buf
.get_u8()
.context("Reading field 'feedback_packet_count'")?;
let mut num_status_remaining = packet_status_count;
let mut chunks: Vec<SomePacketStatusChunk> = vec![];
while num_status_remaining > 0 {
let chunk = SomePacketStatusChunk::read::<T>(buf, (num_status_remaining as usize,))
.context("packet status chunk")?;
num_status_remaining -= chunk.num_symbols();
chunks.push(chunk);
}
let mut packet_reports: Vec<PacketReport> =
Vec::with_capacity(packet_status_count as usize);
let mut curr_seq_num = base_seq_num;
for chunk in &chunks {
for status_symbol in chunk.iter() {
match status_symbol.delta_size_bytes() {
0 => packet_reports.push(PacketReport::UnreceivedPacket {
seq_num: curr_seq_num,
}),
1 => {
let delta_ticks = buf
.get_u8()
.with_context(|| format!("delta ticks for packet {curr_seq_num}"))?;
packet_reports.push(PacketReport::ReceivedPacketSmallDelta {
seq_num: curr_seq_num,
delta_ticks,
})
}
2 => {
let delta_ticks = buf
.get_u16::<T>()
.with_context(|| format!("delta ticks for packet {curr_seq_num}"))?
as i16;
packet_reports.push(PacketReport::ReceivedPacketLargeOrNegativeDelta {
seq_num: curr_seq_num,
delta_ticks,
})
}
delta_size_bytes => bail!("Invalid delta size: {delta_size_bytes} bytes"),
}
curr_seq_num = curr_seq_num.wrapping_add(1);
}
}
while (bytes_remaining_start - buf.remaining_bytes()) % 4 != 0 {
let _ = buf.get_u8().context("padding")?;
}
Ok(RtcpFbTccPacket {
header,
fb_header,
packet_reports,
reference_time,
feedback_packet_count,
})
}
}
impl<B: BitBufMut> ParselyWrite<B> for RtcpFbTccPacket {
type Ctx = ();
fn write<T: ByteOrder>(&self, buf: &mut B, _ctx: Self::Ctx) -> ParselyResult<()> {
self.header.write::<T>(buf, ()).context("header")?;
self.fb_header.write::<T>(buf, ()).context("fb header")?;
if self.packet_reports.is_empty() {
return Ok(());
}
let base_seq_num = self.packet_reports[0].seq_num();
let packet_status_count = self.packet_reports.len() as u16;
buf.put_u16::<T>(base_seq_num).context("base_seq_num")?;
buf.put_u16::<T>(packet_status_count)
.context("packet_status_count")?;
buf.put_u24::<T>(self.reference_time)
.context("reference_time")?;
buf.put_u8(self.feedback_packet_count)
.context("feedback_packet_count")?;
todo!()
}
}
impl StateSync for RtcpFbTccPacket {
type SyncCtx = ();
fn sync(&mut self, _sync_ctx: Self::SyncCtx) -> ParselyResult<()> {
self.header
.sync((self.payload_length_bytes() as u16, Self::FMT))?;
self.packet_reports.sort_by_key(|pr| pr.seq_num());
Ok(())
}
}
#[derive(Debug, PartialEq)]
pub enum PacketReport {
UnreceivedPacket { seq_num: u16 },
ReceivedPacketSmallDelta { seq_num: u16, delta_ticks: u8 },
ReceivedPacketLargeOrNegativeDelta { seq_num: u16, delta_ticks: i16 },
}
impl PacketReport {
pub fn seq_num(&self) -> u16 {
match self {
Self::UnreceivedPacket { seq_num } => *seq_num,
Self::ReceivedPacketSmallDelta { seq_num, .. } => *seq_num,
Self::ReceivedPacketLargeOrNegativeDelta { seq_num, .. } => *seq_num,
}
}
pub fn symbol(&self) -> PacketStatusSymbol {
match self {
PacketReport::UnreceivedPacket { .. } => PacketStatusSymbol::NotReceived,
PacketReport::ReceivedPacketSmallDelta { .. } => PacketStatusSymbol::ReceivedSmallDelta,
PacketReport::ReceivedPacketLargeOrNegativeDelta { .. } => {
PacketStatusSymbol::ReceivedLargeOrNegativeDelta
}
}
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum PacketStatusSymbol {
NotReceived = 0,
ReceivedSmallDelta = 1,
ReceivedLargeOrNegativeDelta = 2,
}
impl PacketStatusSymbol {
pub(crate) fn delta_size_bytes(&self) -> u8 {
match self {
PacketStatusSymbol::NotReceived => 0,
PacketStatusSymbol::ReceivedSmallDelta => 1,
PacketStatusSymbol::ReceivedLargeOrNegativeDelta => 2,
}
}
}
impl From<u1> for PacketStatusSymbol {
fn from(value: u1) -> Self {
match value {
U1_ZERO => PacketStatusSymbol::NotReceived,
U1_ONE => PacketStatusSymbol::ReceivedSmallDelta,
_ => unreachable!(),
}
}
}
impl TryInto<u1> for &PacketStatusSymbol {
type Error = anyhow::Error;
fn try_into(self) -> std::prelude::v1::Result<u1, Self::Error> {
match self {
PacketStatusSymbol::NotReceived => Ok(U1_ZERO),
PacketStatusSymbol::ReceivedSmallDelta => Ok(U1_ONE),
PacketStatusSymbol::ReceivedLargeOrNegativeDelta => Err(anyhow!(
"PacketStatusSymbol::ReceivedLargeOrNegativeDelta can't be encoded into a u1"
)),
}
}
}
impl TryInto<u1> for PacketStatusSymbol {
type Error = anyhow::Error;
fn try_into(self) -> std::prelude::v1::Result<u1, Self::Error> {
(&self).try_into()
}
}
impl TryFrom<u2> for PacketStatusSymbol {
type Error = anyhow::Error;
fn try_from(value: u2) -> std::prelude::v1::Result<Self, Self::Error> {
match value {
U2_ZERO => Ok(PacketStatusSymbol::NotReceived),
U2_ONE => Ok(PacketStatusSymbol::ReceivedSmallDelta),
U2_TWO => Ok(PacketStatusSymbol::ReceivedLargeOrNegativeDelta),
pss => Err(anyhow!("Invalid 2 bit packet status symbol: {pss}")),
}
}
}
impl From<&PacketStatusSymbol> for u2 {
fn from(val: &PacketStatusSymbol) -> Self {
match val {
PacketStatusSymbol::NotReceived => u2::new(0),
PacketStatusSymbol::ReceivedSmallDelta => u2::new(1),
PacketStatusSymbol::ReceivedLargeOrNegativeDelta => u2::new(2),
}
}
}
impl From<PacketStatusSymbol> for u2 {
fn from(val: PacketStatusSymbol) -> Self {
(&val).into()
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct StatusVectorChunk(pub(crate) Vec<PacketStatusSymbol>);
impl StatusVectorChunk {
fn has_two_bit_symbols(&self) -> bool {
self.0
.iter()
.any(|ss| matches!(ss, PacketStatusSymbol::ReceivedLargeOrNegativeDelta))
}
fn iter(&self) -> std::slice::Iter<'_, PacketStatusSymbol> {
self.0.iter()
}
}
impl<B: BitBuf> ParselyRead<B> for StatusVectorChunk {
type Ctx = (usize,);
fn read<T: ByteOrder>(buf: &mut B, (max_symbol_count,): Self::Ctx) -> ParselyResult<Self> {
let symbol_size = buf.get_u1().context("symbol size")?;
let mut packet_status_symbols = match symbol_size {
s if s == 0 => {
let mut symbols = Vec::with_capacity(14);
for i in 0..14 {
let symbol: PacketStatusSymbol = buf
.get_u1()
.with_context(|| format!("packet status symbol {i}"))
.map(|v| v.into())?;
symbols.push(symbol);
}
symbols
}
s if s == 1 => {
let mut symbols = Vec::with_capacity(7);
for i in 0..7 {
let symbol: PacketStatusSymbol = buf
.get_u2()
.with_context(|| format!("packet status symbol {i}"))?
.try_into()
.context("converting u2 to packet status symbol")?;
symbols.push(symbol);
}
symbols
}
_ => unreachable!("u1 can only be 1 or 0"),
};
packet_status_symbols.truncate(max_symbol_count);
Ok(StatusVectorChunk(packet_status_symbols))
}
}
impl<B: BitBufMut> ParselyWrite<B> for StatusVectorChunk {
type Ctx = ();
fn write<T: ByteOrder>(&self, buf: &mut B, _ctx: Self::Ctx) -> ParselyResult<()> {
buf.put_u1(u1::new(1)).context("SV chunk type")?;
if self.has_two_bit_symbols() {
buf.put_u1(u1::new(1)).context("SV chunk symbol size")?;
for (i, symbol) in self.iter().enumerate() {
buf.put_u2(symbol.into())
.with_context(|| format!("2 bit sv chunk symbol {i}"))?;
}
} else {
buf.put_u1(u1::new(0)).context("SV chunk symbol size")?;
for (i, symbol) in self.iter().enumerate() {
buf.put_u1(
symbol
.try_into()
.context("Trying to convert status symbol to u1")?,
)
.with_context(|| format!("2 bit sv chunk symbol {i}"))?;
}
}
Ok(())
}
}
impl_stateless_sync!(StatusVectorChunk);
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RunLengthEncodingChunk {
pub symbol: PacketStatusSymbol,
pub run_length: u13,
}
impl<B: BitBuf> ParselyRead<B> for RunLengthEncodingChunk {
type Ctx = ();
fn read<T: ByteOrder>(buf: &mut B, _ctx: Self::Ctx) -> ParselyResult<Self> {
let symbol = buf
.get_u2()
.context("Reading run length encoding symbol")?
.try_into()
.context("Converting u2 to packet status symbol")?;
let run_length = buf.get_u13::<T>().context("Reading run length")?;
Ok(RunLengthEncodingChunk { symbol, run_length })
}
}
impl<B: BitBufMut> ParselyWrite<B> for RunLengthEncodingChunk {
type Ctx = ();
fn write<T: ByteOrder>(&self, buf: &mut B, _ctx: Self::Ctx) -> ParselyResult<()> {
buf.put_u1(u1::new(0)).context("rle chunk type")?;
buf.put_u2(self.symbol.into()).context("rle chunk symbol")?;
buf.put_u13::<T>(self.run_length)
.context("rle chunk run length")?;
Ok(())
}
}
impl_stateless_sync!(RunLengthEncodingChunk);
#[derive(Debug, Clone)]
pub(crate) enum SomePacketStatusChunk {
StatusVectorChunk(StatusVectorChunk),
RunLengthEncodingChunk(RunLengthEncodingChunk),
}
pub(crate) enum SomePacketStatusChunkIterator<'a> {
StatusVector(std::slice::Iter<'a, PacketStatusSymbol>),
RunLength(std::iter::Repeat<PacketStatusSymbol>, usize), }
impl Iterator for SomePacketStatusChunkIterator<'_> {
type Item = PacketStatusSymbol;
fn next(&mut self) -> Option<Self::Item> {
match self {
SomePacketStatusChunkIterator::StatusVector(iter) => iter.next().copied(),
SomePacketStatusChunkIterator::RunLength(iter, remaining) => {
if *remaining == 0 {
None
} else {
*remaining -= 1;
iter.next()
}
}
}
}
}
impl SomePacketStatusChunk {
pub(crate) fn num_symbols(&self) -> u16 {
match self {
SomePacketStatusChunk::StatusVectorChunk(svc) => svc.0.len() as u16,
SomePacketStatusChunk::RunLengthEncodingChunk(rlec) => rlec.run_length.into(),
}
}
pub fn iter(&self) -> SomePacketStatusChunkIterator<'_> {
match self {
SomePacketStatusChunk::StatusVectorChunk(StatusVectorChunk(vec)) => {
SomePacketStatusChunkIterator::StatusVector(vec.iter())
}
SomePacketStatusChunk::RunLengthEncodingChunk(chunk) => {
SomePacketStatusChunkIterator::RunLength(
std::iter::repeat(chunk.symbol),
chunk.run_length.into(),
)
}
}
}
}
impl<B: BitBuf> ParselyRead<B> for SomePacketStatusChunk {
type Ctx = (usize,);
fn read<T: ByteOrder>(buf: &mut B, (max_symbol_count,): Self::Ctx) -> ParselyResult<Self> {
let chunk_type = buf.get_u1().context("chunk type")?;
match chunk_type {
ct if ct == 0 => RunLengthEncodingChunk::read::<T>(buf, ())
.map(SomePacketStatusChunk::RunLengthEncodingChunk)
.context("run length encoding chunk"),
ct if ct == 1 => StatusVectorChunk::read::<T>(buf, (max_symbol_count,))
.map(SomePacketStatusChunk::StatusVectorChunk)
.context("status vector chunk"),
_ => unreachable!(),
}
}
}
impl<B: BitBufMut> ParselyWrite<B> for SomePacketStatusChunk {
type Ctx = ();
fn write<T: ByteOrder>(&self, buf: &mut B, _ctx: Self::Ctx) -> ParselyResult<()> {
match self {
SomePacketStatusChunk::RunLengthEncodingChunk(rle_chunk) => {
rle_chunk.write::<T>(buf, ())?
}
SomePacketStatusChunk::StatusVectorChunk(sv_chunk) => sv_chunk.write::<T>(buf, ())?,
}
Ok(())
}
}
impl_stateless_sync!(SomePacketStatusChunk);
#[cfg(test)]
mod test {
use super::*;
use bits_io::bits;
#[test]
fn test_sv_chunk_1_bit_symbols() {
let chunk_data = bits!(1, 0, 1, 1, 0, 0, 1, 1, 0, 0, 1, 1, 0, 0, 1, 1);
let mut bits = Bits::copy_from_bit_slice(chunk_data);
bits.advance_bits(1);
let sv_chunk = StatusVectorChunk::read::<NetworkOrder>(&mut bits, (14,)).unwrap();
assert_eq!(sv_chunk.0.len(), 14);
assert!(bits.is_empty());
assert_eq!(
sv_chunk.0,
vec![
PacketStatusSymbol::ReceivedSmallDelta,
PacketStatusSymbol::ReceivedSmallDelta,
PacketStatusSymbol::NotReceived,
PacketStatusSymbol::NotReceived,
PacketStatusSymbol::ReceivedSmallDelta,
PacketStatusSymbol::ReceivedSmallDelta,
PacketStatusSymbol::NotReceived,
PacketStatusSymbol::NotReceived,
PacketStatusSymbol::ReceivedSmallDelta,
PacketStatusSymbol::ReceivedSmallDelta,
PacketStatusSymbol::NotReceived,
PacketStatusSymbol::NotReceived,
PacketStatusSymbol::ReceivedSmallDelta,
PacketStatusSymbol::ReceivedSmallDelta,
]
);
let mut bits_mut = BitsMut::new();
sv_chunk.write::<NetworkOrder>(&mut bits_mut, ()).unwrap();
assert_eq!(chunk_data, bits_mut.as_ref());
}
#[test]
fn test_sv_chunk_1_bit_symbols_with_limit() {
let mut chunk_data = bits!(0, 1, 1, 0, 0, 1, 1, 0, 0, 1, 1, 0, 0, 1, 1);
let sv_chunk = StatusVectorChunk::read::<NetworkOrder>(&mut chunk_data, (3,)).unwrap();
assert_eq!(sv_chunk.0.len(), 3);
assert!(chunk_data.is_empty());
assert_eq!(
sv_chunk.0,
vec![
PacketStatusSymbol::ReceivedSmallDelta,
PacketStatusSymbol::ReceivedSmallDelta,
PacketStatusSymbol::NotReceived,
]
);
}
#[test]
fn test_sv_chunk_2_bit_symbols() {
let chunk_data = bits!(1, 1, 0, 0, 0, 1, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0);
let mut bits = Bits::copy_from_bit_slice(chunk_data);
bits.advance_bits(1);
let sv_chunk = StatusVectorChunk::read::<NetworkOrder>(&mut bits, (15,)).unwrap();
assert_eq!(sv_chunk.0.len(), 7);
assert!(bits.is_empty());
assert_eq!(
sv_chunk.0,
vec![
PacketStatusSymbol::NotReceived,
PacketStatusSymbol::ReceivedSmallDelta,
PacketStatusSymbol::ReceivedLargeOrNegativeDelta,
PacketStatusSymbol::NotReceived,
PacketStatusSymbol::ReceivedSmallDelta,
PacketStatusSymbol::ReceivedLargeOrNegativeDelta,
PacketStatusSymbol::NotReceived,
]
);
let mut bits_mut = BitsMut::new();
sv_chunk.write::<NetworkOrder>(&mut bits_mut, ()).unwrap();
assert_eq!(chunk_data, bits_mut.as_ref());
}
#[test]
fn test_rle_chunk() {
let chunk_data = bits!(0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 1);
let mut bits = Bits::copy_from_bit_slice(chunk_data);
bits.advance_bits(1);
let rle_chunk = RunLengthEncodingChunk::read::<NetworkOrder>(&mut bits, ()).unwrap();
assert!(bits.is_empty());
assert_eq!(rle_chunk.symbol, PacketStatusSymbol::ReceivedSmallDelta);
assert_eq!(rle_chunk.run_length, 0b0000000010101);
let mut bits_mut = BitsMut::new();
rle_chunk.write::<NetworkOrder>(&mut bits_mut, ()).unwrap();
assert_eq!(chunk_data, bits_mut.as_ref());
}
#[test]
fn test_rtcp_fb_tcc_packet() {
#[rustfmt::skip]
let data_buf = [
0x01, 0x81, 0x00, 0x08,
0x19, 0xae, 0xe8, 0x45,
0xd9, 0x55, 0x20, 0x01,
0xa8, 0xff, 0xfc, 0x04,
0x00, 0x50, 0x04, 0x00,
0x00, 0x00, 0x00, 0x00
];
let mut bits = Bits::copy_from_bytes(&data_buf[..]);
let tcc_packet = RtcpFbTccPacket::read::<NetworkOrder>(
&mut bits,
(RtcpHeader::default(), RtcpFbHeader::default()),
)
.unwrap();
assert_eq!(tcc_packet.reference_time, u24::new(1683176));
assert_eq!(tcc_packet.feedback_packet_count, 69);
assert_eq!(
tcc_packet.packet_reports,
[
PacketReport::ReceivedPacketSmallDelta {
seq_num: 385,
delta_ticks: 168,
},
PacketReport::ReceivedPacketLargeOrNegativeDelta {
seq_num: 386,
delta_ticks: -4,
},
PacketReport::ReceivedPacketSmallDelta {
seq_num: 387,
delta_ticks: 4,
},
PacketReport::ReceivedPacketSmallDelta {
seq_num: 388,
delta_ticks: 0,
},
PacketReport::ReceivedPacketSmallDelta {
seq_num: 389,
delta_ticks: 80,
},
PacketReport::ReceivedPacketSmallDelta {
seq_num: 390,
delta_ticks: 4,
},
PacketReport::ReceivedPacketSmallDelta {
seq_num: 391,
delta_ticks: 0,
},
PacketReport::ReceivedPacketSmallDelta {
seq_num: 392,
delta_ticks: 0,
},
]
);
}
}