use crate::error::NetError;
pub const RTP_HEADER_MIN: usize = 12;
#[derive(Debug, Clone, Copy)]
pub struct RtpPacket<'a> {
pub version: u8,
pub padding: bool,
pub extension: bool,
pub marker: bool,
pub payload_type: u8,
pub sequence: u16,
pub timestamp: u32,
pub ssrc: u32,
pub payload: &'a [u8],
}
impl<'a> RtpPacket<'a> {
pub fn parse(buf: &'a [u8]) -> Result<Self, NetError> {
if buf.len() < RTP_HEADER_MIN {
return Err(NetError::Protocol(format!(
"RTP packet too small: {} < {}",
buf.len(),
RTP_HEADER_MIN
)));
}
let b0 = buf[0];
let version = b0 >> 6;
if version != 2 {
return Err(NetError::Protocol(format!(
"unsupported RTP version: {version}"
)));
}
let padding = (b0 & 0x20) != 0;
let extension = (b0 & 0x10) != 0;
let csrc_count = (b0 & 0x0F) as usize;
let b1 = buf[1];
let marker = (b1 & 0x80) != 0;
let payload_type = b1 & 0x7F;
let sequence = u16::from_be_bytes([buf[2], buf[3]]);
let timestamp = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
let ssrc = u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]);
let csrc_bytes = csrc_count * 4;
let mut offset = RTP_HEADER_MIN + csrc_bytes;
if buf.len() < offset {
return Err(NetError::Protocol(format!(
"RTP CSRC list truncated (need {offset}, have {})",
buf.len()
)));
}
if extension {
if buf.len() < offset + 4 {
return Err(NetError::Protocol("RTP extension header truncated".into()));
}
let ext_len_words = u16::from_be_bytes([buf[offset + 2], buf[offset + 3]]) as usize;
offset += 4 + ext_len_words * 4;
if buf.len() < offset {
return Err(NetError::Protocol("RTP extension body truncated".into()));
}
}
let mut end = buf.len();
if padding {
if end <= offset {
return Err(NetError::Protocol(
"RTP padding with no payload room".into(),
));
}
let pad_len = buf[end - 1] as usize;
if pad_len == 0 || end < offset + pad_len {
return Err(NetError::Protocol(format!(
"RTP padding length {pad_len} invalid"
)));
}
end -= pad_len;
}
Ok(Self {
version,
padding,
extension,
marker,
payload_type,
sequence,
timestamp,
ssrc,
payload: &buf[offset..end],
})
}
}
#[derive(Debug, Clone)]
pub struct RtpPacketBuilder {
pub ssrc: u32,
pub payload_type: u8,
pub sequence: u16,
pub timestamp: u32,
pub marker: bool,
}
impl RtpPacketBuilder {
#[must_use]
pub fn new(ssrc: u32, payload_type: u8) -> Self {
Self {
ssrc,
payload_type,
sequence: u16::MAX, timestamp: 0,
marker: false,
}
}
#[must_use]
pub fn with_timestamp(mut self, ts: u32) -> Self {
self.timestamp = ts;
self
}
pub fn next_sequence(&mut self) -> u16 {
self.sequence = self.sequence.wrapping_add(1);
self.sequence
}
#[must_use]
pub fn build(&mut self, payload: &[u8]) -> Vec<u8> {
self.build_with_marker(payload, self.marker)
}
#[must_use]
pub fn build_with_marker(&mut self, payload: &[u8], marker: bool) -> Vec<u8> {
let seq = self.next_sequence();
let mut out = Vec::with_capacity(RTP_HEADER_MIN + payload.len());
out.push(0x80);
let pt_byte = if marker {
0x80 | (self.payload_type & 0x7F)
} else {
self.payload_type & 0x7F
};
out.push(pt_byte);
out.extend_from_slice(&seq.to_be_bytes());
out.extend_from_slice(&self.timestamp.to_be_bytes());
out.extend_from_slice(&self.ssrc.to_be_bytes());
out.extend_from_slice(payload);
out
}
}
#[derive(Debug, Default)]
pub struct SequenceTracker {
last_seq: Option<u16>,
pub received: u64,
pub reordered: u64,
pub duplicates: u64,
pub lost: u64,
}
impl SequenceTracker {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn observe(&mut self, seq: u16) -> Option<i32> {
self.received += 1;
match self.last_seq {
None => {
self.last_seq = Some(seq);
None
}
Some(prev) => {
let delta = signed_seq_delta(prev, seq);
if delta == 0 {
self.duplicates += 1;
} else if delta < 0 {
self.reordered += 1;
} else {
if delta > 1 {
self.lost += (delta as u64) - 1;
}
self.last_seq = Some(seq);
}
Some(delta)
}
}
}
}
fn signed_seq_delta(prev: u16, new: u16) -> i32 {
let diff = new.wrapping_sub(prev) as i32;
if diff > 32_768 {
diff - 65_536
} else {
diff
}
}
#[cfg(test)]
mod tests {
use super::*;
fn minimal_packet(seq: u16, ts: u32, pt: u8, marker: bool, payload: &[u8]) -> Vec<u8> {
let mut buf = Vec::with_capacity(12 + payload.len());
buf.push(0x80); buf.push((u8::from(marker) << 7) | (pt & 0x7F));
buf.extend_from_slice(&seq.to_be_bytes());
buf.extend_from_slice(&ts.to_be_bytes());
buf.extend_from_slice(&0xDEAD_BEEFu32.to_be_bytes());
buf.extend_from_slice(payload);
buf
}
#[test]
fn parses_minimal_packet() {
let raw = minimal_packet(1234, 90_000, 96, true, b"abcd");
let pkt = RtpPacket::parse(&raw).unwrap();
assert_eq!(pkt.version, 2);
assert!(pkt.marker);
assert_eq!(pkt.payload_type, 96);
assert_eq!(pkt.sequence, 1234);
assert_eq!(pkt.timestamp, 90_000);
assert_eq!(pkt.ssrc, 0xDEAD_BEEF);
assert_eq!(pkt.payload, b"abcd");
}
#[test]
fn rejects_short_buffer() {
assert!(RtpPacket::parse(&[0u8; 4]).is_err());
}
#[test]
fn rejects_wrong_version() {
let mut raw = minimal_packet(1, 0, 0, false, b"");
raw[0] = 0x40; assert!(RtpPacket::parse(&raw).is_err());
}
#[test]
fn parses_with_csrcs() {
let mut raw = vec![0x82, 0x60, 0x00, 0x01, 0, 0, 0, 0, 0, 0, 0, 0]; raw.extend_from_slice(&[0, 0, 0, 1, 0, 0, 0, 2]);
raw.extend_from_slice(b"data");
let pkt = RtpPacket::parse(&raw).unwrap();
assert_eq!(pkt.payload, b"data");
}
#[test]
fn parses_with_extension() {
let mut raw = vec![0x90, 0x60, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0];
raw.extend_from_slice(&[0xBE, 0xDE, 0x00, 0x01]);
raw.extend_from_slice(&[0xAA, 0xBB, 0xCC, 0xDD]);
raw.extend_from_slice(b"payload");
let pkt = RtpPacket::parse(&raw).unwrap();
assert!(pkt.extension);
assert_eq!(pkt.payload, b"payload");
}
#[test]
fn parses_with_padding() {
let mut raw = vec![0xA0, 0x60, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0];
raw.extend_from_slice(b"DATA"); raw.extend_from_slice(&[0, 0, 0, 4]); let pkt = RtpPacket::parse(&raw).unwrap();
assert!(pkt.padding);
assert_eq!(pkt.payload, b"DATA");
}
#[test]
fn sequence_tracker_detects_gap() {
let mut t = SequenceTracker::new();
assert_eq!(t.observe(100), None);
assert_eq!(t.observe(101), Some(1));
assert_eq!(t.observe(105), Some(4));
assert_eq!(t.lost, 3);
}
#[test]
fn sequence_tracker_detects_reorder() {
let mut t = SequenceTracker::new();
t.observe(100);
t.observe(102);
assert_eq!(t.observe(101), Some(-1));
assert_eq!(t.reordered, 1);
}
#[test]
fn sequence_tracker_detects_duplicate() {
let mut t = SequenceTracker::new();
t.observe(100);
assert_eq!(t.observe(100), Some(0));
assert_eq!(t.duplicates, 1);
}
#[test]
fn sequence_tracker_handles_wrap() {
let mut t = SequenceTracker::new();
t.observe(65_534);
assert_eq!(t.observe(65_535), Some(1));
assert_eq!(t.observe(0), Some(1));
assert_eq!(t.observe(1), Some(1));
assert_eq!(t.lost, 0);
}
}