#![allow(dead_code)]
#![allow(clippy::too_many_arguments)]
use crate::error::{NetError, NetResult};
use bytes::{Buf, BufMut, Bytes, BytesMut};
#[derive(Debug, Clone)]
pub struct Extension {
pub id: u8,
pub data: Bytes,
}
#[derive(Debug, Clone)]
pub struct Packet {
pub version: u8,
pub padding: bool,
pub extension: bool,
pub csrc_count: u8,
pub marker: bool,
pub payload_type: u8,
pub sequence_number: u16,
pub timestamp: u32,
pub ssrc: u32,
pub csrc: Vec<u32>,
pub extension_profile: u16,
pub extensions: Vec<Extension>,
pub payload: Bytes,
}
impl Packet {
#[must_use]
pub fn new(payload_type: u8, sequence_number: u16, timestamp: u32, ssrc: u32) -> Self {
Self {
version: 2,
padding: false,
extension: false,
csrc_count: 0,
marker: false,
payload_type,
sequence_number,
timestamp,
ssrc,
csrc: Vec::new(),
extension_profile: 0,
extensions: Vec::new(),
payload: Bytes::new(),
}
}
#[must_use]
pub const fn with_marker(mut self) -> Self {
self.marker = true;
self
}
#[must_use]
pub fn with_payload(mut self, payload: impl Into<Bytes>) -> Self {
self.payload = payload.into();
self
}
#[must_use]
pub fn with_csrc(mut self, csrc: u32) -> Self {
self.csrc.push(csrc);
self.csrc_count = self.csrc.len() as u8;
self
}
#[must_use]
pub fn encode(&self) -> Bytes {
let mut buf = BytesMut::new();
let byte0 = (self.version << 6)
| (u8::from(self.padding) << 5)
| (u8::from(self.extension) << 4)
| (self.csrc_count & 0x0F);
buf.put_u8(byte0);
let byte1 = (u8::from(self.marker) << 7) | (self.payload_type & 0x7F);
buf.put_u8(byte1);
buf.put_u16(self.sequence_number);
buf.put_u32(self.timestamp);
buf.put_u32(self.ssrc);
for csrc in &self.csrc {
buf.put_u32(*csrc);
}
if self.extension && !self.extensions.is_empty() {
buf.put_u16(self.extension_profile);
let mut ext_buf = BytesMut::new();
for ext in &self.extensions {
ext_buf.put_u8(ext.id);
ext_buf.put_u8(ext.data.len() as u8);
ext_buf.put(ext.data.clone());
}
let padding = (4 - (ext_buf.len() % 4)) % 4;
for _ in 0..padding {
ext_buf.put_u8(0);
}
buf.put_u16((ext_buf.len() / 4) as u16);
buf.put(ext_buf);
}
buf.put(self.payload.clone());
buf.freeze()
}
pub fn parse(data: &[u8]) -> NetResult<Self> {
if data.len() < 12 {
return Err(NetError::parse(0, "RTP packet too short"));
}
let mut cursor = Bytes::copy_from_slice(data);
let byte0 = cursor.get_u8();
let version = (byte0 >> 6) & 0x03;
let padding = (byte0 & 0x20) != 0;
let extension = (byte0 & 0x10) != 0;
let csrc_count = byte0 & 0x0F;
if version != 2 {
return Err(NetError::parse(0, "Invalid RTP version"));
}
let byte1 = cursor.get_u8();
let marker = (byte1 & 0x80) != 0;
let payload_type = byte1 & 0x7F;
let sequence_number = cursor.get_u16();
let timestamp = cursor.get_u32();
let ssrc = cursor.get_u32();
let mut csrc = Vec::new();
for _ in 0..csrc_count {
if cursor.remaining() < 4 {
return Err(NetError::parse(0, "Incomplete CSRC list"));
}
csrc.push(cursor.get_u32());
}
let mut extension_profile = 0;
let extensions = Vec::new();
if extension {
if cursor.remaining() < 4 {
return Err(NetError::parse(0, "Incomplete extension header"));
}
extension_profile = cursor.get_u16();
let ext_length = cursor.get_u16() as usize * 4;
if cursor.remaining() < ext_length {
return Err(NetError::parse(0, "Incomplete extension data"));
}
let _ext_data = cursor.copy_to_bytes(ext_length);
}
let mut payload = cursor.copy_to_bytes(cursor.remaining());
if padding {
if payload.is_empty() {
return Err(NetError::parse(0, "Padding flag set but no payload"));
}
let padding_length = payload[payload.len() - 1] as usize;
if padding_length > payload.len() {
return Err(NetError::parse(0, "Invalid padding length"));
}
payload.truncate(payload.len() - padding_length);
}
Ok(Self {
version,
padding,
extension,
csrc_count,
marker,
payload_type,
sequence_number,
timestamp,
ssrc,
csrc,
extension_profile,
extensions,
payload,
})
}
#[must_use]
pub fn payload(&self) -> &Bytes {
&self.payload
}
#[must_use]
pub fn payload_size(&self) -> usize {
self.payload.len()
}
}
#[derive(Debug, Clone, Default)]
pub struct Statistics {
pub packets_sent: u64,
pub packets_received: u64,
pub bytes_sent: u64,
pub bytes_received: u64,
pub packets_lost: u64,
pub jitter: f64,
}
pub struct Session {
ssrc: u32,
next_sequence: u16,
stats: Statistics,
last_seq: Option<u16>,
last_rtp_timestamp: Option<u32>,
last_arrival_time: Option<u64>,
jitter_q4: f64,
}
impl Session {
#[must_use]
pub fn new(ssrc: u32) -> Self {
Self {
ssrc,
next_sequence: 0,
stats: Statistics::default(),
last_seq: None,
last_rtp_timestamp: None,
last_arrival_time: None,
jitter_q4: 0.0,
}
}
#[must_use]
pub fn create_packet(
&mut self,
payload_type: u8,
timestamp: u32,
payload: impl Into<Bytes>,
) -> Packet {
let seq = self.next_sequence;
self.next_sequence = self.next_sequence.wrapping_add(1);
let packet = Packet::new(payload_type, seq, timestamp, self.ssrc).with_payload(payload);
self.stats.packets_sent += 1;
self.stats.bytes_sent += packet.payload.len() as u64;
packet
}
fn now_micros() -> u64 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_micros() as u64)
.unwrap_or(0)
}
pub fn process_packet(&mut self, packet: &Packet) {
self.stats.packets_received += 1;
self.stats.bytes_received += packet.payload.len() as u64;
if let Some(last_seq) = self.last_seq {
let expected: u16 = last_seq.wrapping_add(1);
if packet.sequence_number != expected {
let gap = packet.sequence_number.wrapping_sub(expected) as u32;
if gap < 0x8000 {
self.stats.packets_lost += u64::from(gap);
}
}
}
self.last_seq = Some(packet.sequence_number);
let now = Self::now_micros();
if let (Some(last_ts), Some(last_arrival)) =
(self.last_rtp_timestamp, self.last_arrival_time)
{
let rtp_diff = packet.timestamp.wrapping_sub(last_ts) as i64;
let arrival_diff = now.wrapping_sub(last_arrival) as i64;
let d = (arrival_diff - rtp_diff).abs() as f64;
self.jitter_q4 += (d - self.jitter_q4) / 16.0;
self.stats.jitter = self.jitter_q4;
}
self.last_rtp_timestamp = Some(packet.timestamp);
self.last_arrival_time = Some(now);
}
#[must_use]
pub const fn stats(&self) -> &Statistics {
&self.stats
}
#[must_use]
pub const fn ssrc(&self) -> u32 {
self.ssrc
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_packet_new() {
let packet = Packet::new(96, 1000, 48000, 12345);
assert_eq!(packet.version, 2);
assert_eq!(packet.payload_type, 96);
assert_eq!(packet.sequence_number, 1000);
assert_eq!(packet.timestamp, 48000);
assert_eq!(packet.ssrc, 12345);
}
#[test]
fn test_packet_encode_decode() {
let packet = Packet::new(96, 1000, 48000, 12345).with_payload(vec![1, 2, 3, 4]);
let encoded = packet.encode();
let decoded = Packet::parse(&encoded).expect("should succeed in test");
assert_eq!(decoded.version, packet.version);
assert_eq!(decoded.payload_type, packet.payload_type);
assert_eq!(decoded.sequence_number, packet.sequence_number);
assert_eq!(decoded.timestamp, packet.timestamp);
assert_eq!(decoded.ssrc, packet.ssrc);
assert_eq!(decoded.payload, packet.payload);
}
#[test]
fn test_packet_with_marker() {
let packet = Packet::new(96, 1000, 48000, 12345).with_marker();
assert!(packet.marker);
}
#[test]
fn test_session() {
let mut session = Session::new(12345);
let packet = session.create_packet(96, 48000, vec![1, 2, 3, 4]);
assert_eq!(packet.ssrc, 12345);
assert_eq!(packet.sequence_number, 0);
assert_eq!(session.stats().packets_sent, 1);
let packet2 = session.create_packet(96, 48100, vec![5, 6, 7, 8]);
assert_eq!(packet2.sequence_number, 1);
assert_eq!(session.stats().packets_sent, 2);
}
#[test]
fn test_parse_invalid_version() {
let data = vec![0x00, 0x60, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]; assert!(Packet::parse(&data).is_err());
}
}