Skip to main content

rustrtc/
rtp.rs

1use crate::errors::{RtpError, RtpResult};
2use serde::{Deserialize, Serialize};
3use std::time::SystemTime;
4use tracing::debug;
5
6const RTP_VERSION: u8 = 2;
7pub const RTCP_SR: u8 = 200;
8pub const RTCP_RR: u8 = 201;
9pub const RTCP_SDES: u8 = 202;
10pub const RTCP_BYE: u8 = 203;
11pub const RTCP_RTPFB: u8 = 205;
12pub const RTCP_PSFB: u8 = 206;
13
14pub const RTCP_RTPFB_NACK: u8 = 1;
15pub const RTCP_RTPFB_TWCC: u8 = 15;
16
17pub const RTCP_PSFB_PLI: u8 = 1;
18pub const RTCP_PSFB_FIR: u8 = 4;
19pub const RTCP_PSFB_APP: u8 = 15; // REMB lives under APP-format payload feedback
20
21#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
22pub struct RtpHeaderExtension {
23    pub profile: u16,
24    pub data: Vec<u8>,
25}
26
27impl RtpHeaderExtension {
28    pub fn new(profile: u16, data: Vec<u8>) -> Self {
29        Self { profile, data }
30    }
31}
32
33#[derive(Debug, Clone, PartialEq, Eq)]
34pub struct RtpHeader {
35    pub marker: bool,
36    pub payload_type: u8,
37    pub sequence_number: u16,
38    pub timestamp: u32,
39    pub ssrc: u32,
40    pub csrcs: Vec<u32>,
41    pub extension: Option<RtpHeaderExtension>,
42}
43
44impl RtpHeader {
45    pub fn new(payload_type: u8, sequence_number: u16, timestamp: u32, ssrc: u32) -> Self {
46        Self {
47            marker: false,
48            payload_type,
49            sequence_number,
50            timestamp,
51            ssrc,
52            csrcs: Vec::new(),
53            extension: None,
54        }
55    }
56
57    pub fn get_extension(&self, id: u8) -> Option<Vec<u8>> {
58        if let Some(ext) = &self.extension {
59            if ext.profile == 0xBEDE {
60                let mut offset = 0;
61                while offset < ext.data.len() {
62                    let b = ext.data[offset];
63                    if b == 0 {
64                        offset += 1;
65                        continue;
66                    }
67                    let ext_id = b >> 4;
68                    let len = (b & 0x0F) as usize + 1;
69                    offset += 1;
70
71                    if ext_id == 15 {
72                        break;
73                    }
74
75                    if ext_id == id {
76                        if offset + len <= ext.data.len() {
77                            return Some(ext.data[offset..offset + len].to_vec());
78                        } else {
79                            return None;
80                        }
81                    }
82                    offset += len;
83                }
84            } else if ext.profile == 0x1000 {
85                let mut offset = 0;
86                while offset < ext.data.len() {
87                    let ext_id = ext.data[offset];
88                    if ext_id == 0 {
89                        offset += 1;
90                        continue;
91                    }
92                    offset += 1;
93
94                    if offset >= ext.data.len() {
95                        break;
96                    }
97                    let len = ext.data[offset] as usize;
98                    offset += 1;
99
100                    if ext_id == id {
101                        if offset + len <= ext.data.len() {
102                            return Some(ext.data[offset..offset + len].to_vec());
103                        } else {
104                            return None;
105                        }
106                    }
107                    offset += len;
108                }
109            } else {
110                // Unsupported extension profile
111            }
112        }
113        None
114    }
115
116    pub fn set_extension(&mut self, id: u8, data: &[u8]) -> RtpResult<()> {
117        if id == 0 || id >= 15 {
118            return Err(RtpError::InvalidHeader(
119                "invalid extension id for one-byte header",
120            ));
121        }
122        if data.len() > 16 || data.is_empty() {
123            return Err(RtpError::InvalidHeader("invalid extension data length"));
124        }
125
126        let mut ext = self
127            .extension
128            .clone()
129            .unwrap_or_else(|| RtpHeaderExtension::new(0xBEDE, Vec::new()));
130
131        if ext.profile != 0xBEDE {
132            // For now, we only support modifying 0xBEDE
133            return Err(RtpError::InvalidHeader(
134                "unsupported extension profile for modification",
135            ));
136        }
137
138        let mut new_data = Vec::new();
139        let mut found = false;
140        let mut offset = 0;
141
142        while offset < ext.data.len() {
143            let b = ext.data[offset];
144            if b == 0 {
145                offset += 1;
146                continue;
147            }
148            let ext_id = b >> 4;
149            let len = (b & 0x0F) as usize + 1;
150            offset += 1;
151
152            if ext_id == 15 {
153                break;
154            }
155
156            if ext_id == id {
157                found = true;
158                new_data.push((id << 4) | ((data.len() - 1) as u8));
159                new_data.extend_from_slice(data);
160            } else {
161                new_data.push(b);
162                new_data.extend_from_slice(&ext.data[offset..offset + len]);
163            }
164            offset += len;
165        }
166
167        if !found {
168            new_data.push((id << 4) | ((data.len() - 1) as u8));
169            new_data.extend_from_slice(data);
170        }
171
172        // Align to 32-bit boundary
173        while new_data.len() % 4 != 0 {
174            new_data.push(0);
175        }
176
177        ext.data = new_data;
178        self.extension = Some(ext);
179        Ok(())
180    }
181
182    fn validate(&self) -> RtpResult<()> {
183        if self.csrcs.len() > 15 {
184            return Err(RtpError::InvalidHeader("too many CSRC entries"));
185        }
186        if let Some(ext) = &self.extension
187            && ext.data.len() % 4 != 0
188        {
189            return Err(RtpError::InvalidHeader(
190                "header extension payload must be 32-bit aligned",
191            ));
192        }
193        Ok(())
194    }
195}
196
197#[derive(Debug, Clone, PartialEq, Eq)]
198pub struct RtpPacket {
199    pub header: RtpHeader,
200    pub payload: Vec<u8>,
201    pub padding_len: u8,
202}
203
204impl RtpPacket {
205    pub fn new(header: RtpHeader, payload: Vec<u8>) -> Self {
206        Self {
207            header,
208            payload,
209            padding_len: 0,
210        }
211    }
212
213    pub fn parse(raw: &[u8]) -> RtpResult<Self> {
214        if raw.len() < 12 {
215            return Err(RtpError::PacketTooShort);
216        }
217        let b0 = raw[0];
218        let b1 = raw[1];
219        let version = b0 >> 6;
220        if version != RTP_VERSION {
221            return Err(RtpError::UnsupportedVersion(version));
222        }
223        let padding = (b0 & 0x20) != 0;
224        let extension = (b0 & 0x10) != 0;
225        let csrc_count = (b0 & 0x0F) as usize;
226        let marker = (b1 & 0x80) != 0;
227        let payload_type = b1 & 0x7F;
228
229        let mut offset = 12usize;
230        if raw.len() < offset + csrc_count * 4 {
231            return Err(RtpError::PacketTooShort);
232        }
233        let sequence_number = u16::from_be_bytes([raw[2], raw[3]]);
234        let timestamp = u32::from_be_bytes([raw[4], raw[5], raw[6], raw[7]]);
235        let ssrc = u32::from_be_bytes([raw[8], raw[9], raw[10], raw[11]]);
236
237        let mut csrcs = Vec::with_capacity(csrc_count);
238        for _ in 0..csrc_count {
239            let value = u32::from_be_bytes([
240                raw[offset],
241                raw[offset + 1],
242                raw[offset + 2],
243                raw[offset + 3],
244            ]);
245            csrcs.push(value);
246            offset += 4;
247        }
248
249        let mut extension_header = None;
250        if extension {
251            if raw.len() < offset + 4 {
252                return Err(RtpError::PacketTooShort);
253            }
254            let profile = u16::from_be_bytes([raw[offset], raw[offset + 1]]);
255            let length_words = u16::from_be_bytes([raw[offset + 2], raw[offset + 3]]) as usize;
256            offset += 4;
257            let extension_len = length_words * 4;
258            if raw.len() < offset + extension_len {
259                return Err(RtpError::PacketTooShort);
260            }
261            extension_header = Some(RtpHeaderExtension::new(
262                profile,
263                raw[offset..offset + extension_len].to_vec(),
264            ));
265            offset += extension_len;
266        }
267
268        let mut payload_end = raw.len();
269        let mut padding_len = 0u8;
270        if padding {
271            padding_len = *raw.last().ok_or(RtpError::PacketTooShort)?;
272            if padding_len as usize > raw.len().saturating_sub(offset) {
273                return Err(RtpError::InvalidHeader("padding larger than payload"));
274            }
275            payload_end -= padding_len as usize;
276        }
277        let payload = raw[offset..payload_end].to_vec();
278
279        let header = RtpHeader {
280            marker,
281            payload_type,
282            sequence_number,
283            timestamp,
284            ssrc,
285            csrcs,
286            extension: extension_header,
287        };
288
289        Ok(Self {
290            header,
291            payload,
292            padding_len,
293        })
294    }
295
296    pub fn marshal(&self) -> RtpResult<Vec<u8>> {
297        self.header.validate()?;
298        let mut buffer = Vec::with_capacity(12 + self.header.csrcs.len() * 4 + self.payload.len());
299        let mut b0 = RTP_VERSION << 6;
300        if self.padding_len > 0 {
301            b0 |= 0x20;
302        }
303        if self.header.extension.is_some() {
304            b0 |= 0x10;
305        }
306        b0 |= (self.header.csrcs.len() & 0x0F) as u8;
307        let mut b1 = self.header.payload_type & 0x7F;
308        if self.header.marker {
309            b1 |= 0x80;
310        }
311        buffer.push(b0);
312        buffer.push(b1);
313        buffer.extend_from_slice(&self.header.sequence_number.to_be_bytes());
314        buffer.extend_from_slice(&self.header.timestamp.to_be_bytes());
315        buffer.extend_from_slice(&self.header.ssrc.to_be_bytes());
316        for csrc in &self.header.csrcs {
317            buffer.extend_from_slice(&csrc.to_be_bytes());
318        }
319        if let Some(extension) = &self.header.extension {
320            let length_words = (extension.data.len() / 4) as u16;
321            buffer.extend_from_slice(&extension.profile.to_be_bytes());
322            buffer.extend_from_slice(&length_words.to_be_bytes());
323            buffer.extend_from_slice(&extension.data);
324        }
325        buffer.extend_from_slice(&self.payload);
326        if self.padding_len > 0 {
327            buffer.extend(std::iter::repeat_n(
328                self.padding_len,
329                self.padding_len as usize,
330            ));
331        }
332        Ok(buffer)
333    }
334}
335
336pub fn calculate_abs_send_time(time: SystemTime) -> u32 {
337    let duration = time
338        .duration_since(std::time::UNIX_EPOCH)
339        .unwrap_or_default();
340    // NTP epoch is 1900-01-01, Unix epoch is 1970-01-01. Difference is 70 years.
341    // 70 years in seconds: (70 * 365 + 17) * 24 * 3600 = 2208988800
342    let ntp_seconds = duration.as_secs() + 2208988800;
343    let ntp_fraction = (duration.subsec_nanos() as u64 * (1u64 << 32) / 1_000_000_000) as u32;
344
345    let ntp_timestamp = ((ntp_seconds as u64) << 32) | (ntp_fraction as u64);
346    ((ntp_timestamp >> 14) & 0x00ffffff) as u32
347}
348
349#[derive(Debug, Clone, PartialEq, Eq)]
350pub struct ReportBlock {
351    pub ssrc: u32,
352    pub fraction_lost: u8,
353    pub packets_lost: i32,
354    pub highest_sequence: u32,
355    pub jitter: u32,
356    pub last_sender_report: u32,
357    pub delay_since_last_sender_report: u32,
358}
359
360#[derive(Debug, Clone, PartialEq, Eq)]
361pub struct SenderReport {
362    pub sender_ssrc: u32,
363    pub ntp_most: u32,
364    pub ntp_least: u32,
365    pub rtp_timestamp: u32,
366    pub packet_count: u32,
367    pub octet_count: u32,
368    pub report_blocks: Vec<ReportBlock>,
369}
370
371#[derive(Debug, Clone, PartialEq, Eq)]
372pub struct ReceiverReport {
373    pub sender_ssrc: u32,
374    pub report_blocks: Vec<ReportBlock>,
375}
376
377#[derive(Debug, Clone, PartialEq, Eq)]
378pub struct PictureLossIndication {
379    pub sender_ssrc: u32,
380    pub media_ssrc: u32,
381}
382
383#[derive(Debug, Clone, PartialEq, Eq)]
384pub struct FirRequest {
385    pub ssrc: u32,
386    pub sequence_number: u8,
387}
388
389#[derive(Debug, Clone, PartialEq, Eq)]
390pub struct FullIntraRequest {
391    pub sender_ssrc: u32,
392    pub requests: Vec<FirRequest>,
393}
394
395#[derive(Debug, Clone, PartialEq, Eq)]
396pub struct GenericNack {
397    pub sender_ssrc: u32,
398    pub media_ssrc: u32,
399    pub lost_packets: Vec<u16>,
400}
401
402#[derive(Debug, Clone, PartialEq, Eq)]
403pub struct RemoteBitrateEstimate {
404    pub sender_ssrc: u32,
405    pub bitrate_bps: u64,
406    pub ssrcs: Vec<u32>,
407}
408
409#[derive(Debug, Clone, PartialEq, Eq)]
410pub struct TransportWideCc {
411    pub sender_ssrc: u32,
412    pub media_ssrc: u32,
413    pub base_sequence: u16,
414    pub packet_status_count: u16,
415    pub reference_time_64ms: u32,
416    pub feedback_packet_count: u8,
417    pub payload: Vec<u8>,
418}
419
420#[derive(Debug, Clone, PartialEq, Eq)]
421pub struct SdesItem {
422    pub ty: u8,
423    pub text: String,
424}
425
426#[derive(Debug, Clone, PartialEq, Eq)]
427pub struct SdesChunk {
428    pub ssrc: u32,
429    pub items: Vec<SdesItem>,
430}
431
432#[derive(Debug, Clone, PartialEq, Eq)]
433pub struct SourceDescription {
434    pub chunks: Vec<SdesChunk>,
435}
436
437#[derive(Debug, Clone, PartialEq, Eq)]
438pub struct Goodbye {
439    pub sources: Vec<u32>,
440    pub reason: Option<String>,
441}
442
443#[derive(Debug, Clone, PartialEq, Eq)]
444pub enum RtcpPacket {
445    SenderReport(SenderReport),
446    ReceiverReport(ReceiverReport),
447    SourceDescription(SourceDescription),
448    Goodbye(Goodbye),
449    PictureLossIndication(PictureLossIndication),
450    FullIntraRequest(FullIntraRequest),
451    GenericNack(GenericNack),
452    RemoteBitrateEstimate(RemoteBitrateEstimate),
453    TransportWideCc(TransportWideCc),
454}
455
456pub fn parse_rtcp_packets(raw: &[u8]) -> RtpResult<Vec<RtcpPacket>> {
457    let mut packets = Vec::new();
458    let mut offset = 0usize;
459    while offset + 4 <= raw.len() {
460        let vrc = raw[offset];
461        let version = vrc >> 6;
462        if version != RTP_VERSION {
463            return Err(RtpError::InvalidRtcp("invalid RTCP version"));
464        }
465        let padding = (vrc & 0x20) != 0;
466        let fmt = vrc & 0x1F;
467        let packet_type = raw[offset + 1];
468        let length_words = u16::from_be_bytes([raw[offset + 2], raw[offset + 3]]) as usize;
469        let packet_len = (length_words + 1) * 4;
470        if raw.len() < offset + packet_len {
471            return Err(RtpError::LengthMismatch);
472        }
473        let body_len = packet_len.saturating_sub(4);
474        let mut body_end = offset + packet_len;
475        if padding {
476            let pad = raw[body_end - 1] as usize;
477            if pad == 0 || pad > body_len {
478                return Err(RtpError::InvalidRtcp("invalid padding in RTCP packet"));
479            }
480            body_end -= pad;
481        }
482        let body = &raw[offset + 4..body_end];
483        match packet_type {
484            RTCP_SR => packets.push(RtcpPacket::SenderReport(parse_sender_report(fmt, body)?)),
485            RTCP_RR => packets.push(RtcpPacket::ReceiverReport(parse_receiver_report(
486                fmt, body,
487            )?)),
488            RTCP_SDES => packets.push(RtcpPacket::SourceDescription(parse_sdes(fmt, body)?)),
489            RTCP_BYE => packets.push(RtcpPacket::Goodbye(parse_goodbye(fmt, body)?)),
490            RTCP_RTPFB => packets.push(parse_rtcp_rtpfb(fmt, body)?),
491            RTCP_PSFB => packets.push(parse_rtcp_psfb(fmt, body)?),
492            _ => {
493                debug!("unsupported RTCP packet type: {}", packet_type);
494            }
495        }
496        offset += packet_len;
497    }
498    Ok(packets)
499}
500
501pub fn marshal_rtcp_packets(packets: &[RtcpPacket]) -> RtpResult<Vec<u8>> {
502    let mut out = Vec::new();
503    for packet in packets {
504        match packet {
505            RtcpPacket::SenderReport(sr) => write_rtcp_packet(
506                &mut out,
507                sr.report_blocks.len() as u8,
508                RTCP_SR,
509                build_sender_report_body(sr)?,
510            ),
511            RtcpPacket::ReceiverReport(rr) => write_rtcp_packet(
512                &mut out,
513                rr.report_blocks.len() as u8,
514                RTCP_RR,
515                build_receiver_report_body(rr)?,
516            ),
517            RtcpPacket::SourceDescription(sdes) => write_rtcp_packet(
518                &mut out,
519                sdes.chunks.len() as u8,
520                RTCP_SDES,
521                build_sdes_body(sdes),
522            ),
523            RtcpPacket::Goodbye(bye) => write_rtcp_packet(
524                &mut out,
525                bye.sources.len() as u8,
526                RTCP_BYE,
527                build_goodbye_body(bye),
528            ),
529            RtcpPacket::PictureLossIndication(pli) => write_rtcp_packet(
530                &mut out,
531                RTCP_PSFB_PLI,
532                RTCP_PSFB,
533                build_psfb_common(pli.sender_ssrc, pli.media_ssrc),
534            ),
535            RtcpPacket::FullIntraRequest(fir) => {
536                write_rtcp_packet(&mut out, RTCP_PSFB_FIR, RTCP_PSFB, build_fir_body(fir))
537            }
538            RtcpPacket::GenericNack(nack) => write_rtcp_packet(
539                &mut out,
540                RTCP_RTPFB_NACK,
541                RTCP_RTPFB,
542                build_nack_body(nack)?,
543            ),
544            RtcpPacket::RemoteBitrateEstimate(remb) => {
545                write_rtcp_packet(&mut out, RTCP_PSFB_APP, RTCP_PSFB, build_remb_body(remb)?)
546            }
547            RtcpPacket::TransportWideCc(twcc) => {
548                write_rtcp_packet(&mut out, RTCP_RTPFB_TWCC, RTCP_RTPFB, build_twcc_body(twcc))
549            }
550        }
551    }
552    Ok(out)
553}
554
555fn write_rtcp_packet(out: &mut Vec<u8>, fmt: u8, packet_type: u8, mut body: Vec<u8>) {
556    while !body.len().is_multiple_of(4) {
557        body.push(0);
558    }
559    let length = ((body.len() + 4) / 4).saturating_sub(1) as u16;
560    out.push((RTP_VERSION << 6) | (fmt & 0x1F));
561    out.push(packet_type);
562    out.extend_from_slice(&length.to_be_bytes());
563    out.extend_from_slice(&body);
564}
565
566fn parse_sender_report(fmt: u8, body: &[u8]) -> RtpResult<SenderReport> {
567    if body.len() < 24 {
568        return Err(RtpError::InvalidRtcp("sender report too short"));
569    }
570    let sender_ssrc = u32::from_be_bytes([body[0], body[1], body[2], body[3]]);
571    let ntp_most = u32::from_be_bytes([body[4], body[5], body[6], body[7]]);
572    let ntp_least = u32::from_be_bytes([body[8], body[9], body[10], body[11]]);
573    let rtp_timestamp = u32::from_be_bytes([body[12], body[13], body[14], body[15]]);
574    let packet_count = u32::from_be_bytes([body[16], body[17], body[18], body[19]]);
575    let octet_count = u32::from_be_bytes([body[20], body[21], body[22], body[23]]);
576    let mut offset = 24;
577    let mut report_blocks = Vec::with_capacity(fmt as usize);
578    for _ in 0..fmt {
579        if body.len() < offset + 24 {
580            return Err(RtpError::LengthMismatch);
581        }
582        report_blocks.push(parse_report_block(&body[offset..offset + 24]));
583        offset += 24;
584    }
585    Ok(SenderReport {
586        sender_ssrc,
587        ntp_most,
588        ntp_least,
589        rtp_timestamp,
590        packet_count,
591        octet_count,
592        report_blocks,
593    })
594}
595
596fn parse_receiver_report(fmt: u8, body: &[u8]) -> RtpResult<ReceiverReport> {
597    if body.len() < 4 {
598        return Err(RtpError::InvalidRtcp("receiver report too short"));
599    }
600    let sender_ssrc = u32::from_be_bytes([body[0], body[1], body[2], body[3]]);
601    let mut offset = 4;
602    let mut report_blocks = Vec::with_capacity(fmt as usize);
603    for _ in 0..fmt {
604        if body.len() < offset + 24 {
605            return Err(RtpError::LengthMismatch);
606        }
607        report_blocks.push(parse_report_block(&body[offset..offset + 24]));
608        offset += 24;
609    }
610    Ok(ReceiverReport {
611        sender_ssrc,
612        report_blocks,
613    })
614}
615
616fn parse_sdes(count: u8, body: &[u8]) -> RtpResult<SourceDescription> {
617    let mut chunks = Vec::with_capacity(count as usize);
618    let mut offset = 0;
619    for _ in 0..count {
620        if body.len() < offset + 4 {
621            return Err(RtpError::PacketTooShort);
622        }
623        let ssrc = u32::from_be_bytes([
624            body[offset],
625            body[offset + 1],
626            body[offset + 2],
627            body[offset + 3],
628        ]);
629        offset += 4;
630        let mut items = Vec::new();
631        loop {
632            if offset >= body.len() {
633                break;
634            }
635            let ty = body[offset];
636            offset += 1;
637            if ty == 0 {
638                // End of list, skip padding until 32-bit boundary
639                while offset % 4 != 0 {
640                    if offset >= body.len() {
641                        break;
642                    }
643                    offset += 1;
644                }
645                break;
646            }
647            if offset >= body.len() {
648                return Err(RtpError::PacketTooShort);
649            }
650            let len = body[offset] as usize;
651            offset += 1;
652            if body.len() < offset + len {
653                return Err(RtpError::PacketTooShort);
654            }
655            let text = String::from_utf8_lossy(&body[offset..offset + len]).to_string();
656            items.push(SdesItem { ty, text });
657            offset += len;
658        }
659        chunks.push(SdesChunk { ssrc, items });
660    }
661    Ok(SourceDescription { chunks })
662}
663
664fn parse_goodbye(count: u8, body: &[u8]) -> RtpResult<Goodbye> {
665    let mut sources = Vec::with_capacity(count as usize);
666    let mut offset = 0;
667    for _ in 0..count {
668        if body.len() < offset + 4 {
669            return Err(RtpError::PacketTooShort);
670        }
671        let ssrc = u32::from_be_bytes([
672            body[offset],
673            body[offset + 1],
674            body[offset + 2],
675            body[offset + 3],
676        ]);
677        sources.push(ssrc);
678        offset += 4;
679    }
680
681    let mut reason = None;
682    if offset < body.len() {
683        let len = body[offset] as usize;
684        offset += 1;
685        if body.len() < offset + len {
686            return Err(RtpError::PacketTooShort);
687        }
688        reason = Some(String::from_utf8_lossy(&body[offset..offset + len]).to_string());
689    }
690
691    Ok(Goodbye { sources, reason })
692}
693
694fn parse_report_block(bytes: &[u8]) -> ReportBlock {
695    let ssrc = u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
696    let fraction_lost = bytes[4];
697    let packets_lost =
698        (((bytes[5] as i32) << 16) | ((bytes[6] as i32) << 8) | bytes[7] as i32) << 8 >> 8;
699    let highest_sequence = u32::from_be_bytes([bytes[8], bytes[9], bytes[10], bytes[11]]);
700    let jitter = u32::from_be_bytes([bytes[12], bytes[13], bytes[14], bytes[15]]);
701    let last_sender_report = u32::from_be_bytes([bytes[16], bytes[17], bytes[18], bytes[19]]);
702    let delay_since_last_sender_report =
703        u32::from_be_bytes([bytes[20], bytes[21], bytes[22], bytes[23]]);
704    ReportBlock {
705        ssrc,
706        fraction_lost,
707        packets_lost,
708        highest_sequence,
709        jitter,
710        last_sender_report,
711        delay_since_last_sender_report,
712    }
713}
714
715fn parse_rtcp_rtpfb(fmt: u8, body: &[u8]) -> RtpResult<RtcpPacket> {
716    match fmt {
717        RTCP_RTPFB_NACK => Ok(RtcpPacket::GenericNack(parse_nack_body(body)?)),
718        RTCP_RTPFB_TWCC => Ok(RtcpPacket::TransportWideCc(parse_twcc_body(body)?)),
719        _ => Err(RtpError::InvalidRtcp("unsupported RTP feedback format")),
720    }
721}
722
723fn parse_rtcp_psfb(fmt: u8, body: &[u8]) -> RtpResult<RtcpPacket> {
724    match fmt {
725        RTCP_PSFB_PLI => Ok(RtcpPacket::PictureLossIndication(parse_psfb_common(body)?)),
726        RTCP_PSFB_FIR => Ok(RtcpPacket::FullIntraRequest(parse_fir_body(body)?)),
727        RTCP_PSFB_APP => Ok(RtcpPacket::RemoteBitrateEstimate(parse_remb_body(body)?)),
728        _ => Err(RtpError::InvalidRtcp("unsupported payload feedback format")),
729    }
730}
731
732fn parse_psfb_common(body: &[u8]) -> RtpResult<PictureLossIndication> {
733    if body.len() < 8 {
734        return Err(RtpError::InvalidRtcp("payload feedback body too short"));
735    }
736    Ok(PictureLossIndication {
737        sender_ssrc: u32::from_be_bytes([body[0], body[1], body[2], body[3]]),
738        media_ssrc: u32::from_be_bytes([body[4], body[5], body[6], body[7]]),
739    })
740}
741
742fn parse_fir_body(body: &[u8]) -> RtpResult<FullIntraRequest> {
743    if body.len() < 8 {
744        return Err(RtpError::InvalidRtcp("FIR body too short"));
745    }
746    let sender_ssrc = u32::from_be_bytes([body[0], body[1], body[2], body[3]]);
747    let mut offset = 8; // bytes 4..7 are reserved
748    let mut requests = Vec::new();
749    while offset + 8 <= body.len() {
750        requests.push(FirRequest {
751            ssrc: u32::from_be_bytes([
752                body[offset],
753                body[offset + 1],
754                body[offset + 2],
755                body[offset + 3],
756            ]),
757            sequence_number: body[offset + 4],
758        });
759        offset += 8;
760    }
761    Ok(FullIntraRequest {
762        sender_ssrc,
763        requests,
764    })
765}
766
767fn parse_nack_body(body: &[u8]) -> RtpResult<GenericNack> {
768    if body.len() < 8 {
769        return Err(RtpError::InvalidRtcp("NACK body too short"));
770    }
771    let sender_ssrc = u32::from_be_bytes([body[0], body[1], body[2], body[3]]);
772    let media_ssrc = u32::from_be_bytes([body[4], body[5], body[6], body[7]]);
773    let mut lost_packets = Vec::new();
774    let mut offset = 8;
775    while offset + 4 <= body.len() {
776        let pid = u16::from_be_bytes([body[offset], body[offset + 1]]);
777        let blp = u16::from_be_bytes([body[offset + 2], body[offset + 3]]);
778        lost_packets.push(pid);
779        for bit in 0..16 {
780            if (blp >> bit) & 1 == 1 {
781                lost_packets.push(pid.wrapping_add(bit + 1));
782            }
783        }
784        offset += 4;
785    }
786    Ok(GenericNack {
787        sender_ssrc,
788        media_ssrc,
789        lost_packets,
790    })
791}
792
793fn parse_remb_body(body: &[u8]) -> RtpResult<RemoteBitrateEstimate> {
794    if body.len() < 16 || &body[8..12] != b"REMB" {
795        return Err(RtpError::InvalidRtcp("invalid REMB payload"));
796    }
797    let sender_ssrc = u32::from_be_bytes([body[0], body[1], body[2], body[3]]);
798    let num_ssrc = body[12] as usize;
799    let exponent = (body[13] & 0xFC) >> 2;
800    let mantissa = ((u32::from(body[13] & 0x03) << 16)
801        | (u32::from(body[14]) << 8)
802        | u32::from(body[15])) as u64;
803    let bitrate_bps = mantissa << exponent;
804    let mut ssrcs = Vec::with_capacity(num_ssrc);
805    let mut offset = 16;
806    for _ in 0..num_ssrc {
807        if body.len() < offset + 4 {
808            return Err(RtpError::LengthMismatch);
809        }
810        ssrcs.push(u32::from_be_bytes([
811            body[offset],
812            body[offset + 1],
813            body[offset + 2],
814            body[offset + 3],
815        ]));
816        offset += 4;
817    }
818    Ok(RemoteBitrateEstimate {
819        sender_ssrc,
820        bitrate_bps,
821        ssrcs,
822    })
823}
824
825fn parse_twcc_body(body: &[u8]) -> RtpResult<TransportWideCc> {
826    if body.len() < 16 {
827        return Err(RtpError::InvalidRtcp("TWCC body too short"));
828    }
829    let sender_ssrc = u32::from_be_bytes([body[0], body[1], body[2], body[3]]);
830    let media_ssrc = u32::from_be_bytes([body[4], body[5], body[6], body[7]]);
831    let base_sequence = u16::from_be_bytes([body[8], body[9]]);
832    let packet_status_count = u16::from_be_bytes([body[10], body[11]]);
833    let reference_time_64ms = u32::from_be_bytes([0, body[12], body[13], body[14]]);
834    let feedback_packet_count = body[15];
835    let payload = body[16..].to_vec();
836    Ok(TransportWideCc {
837        sender_ssrc,
838        media_ssrc,
839        base_sequence,
840        packet_status_count,
841        reference_time_64ms,
842        feedback_packet_count,
843        payload,
844    })
845}
846
847fn build_sender_report_body(sr: &SenderReport) -> RtpResult<Vec<u8>> {
848    let mut body = Vec::with_capacity(24 + sr.report_blocks.len() * 24);
849    body.extend_from_slice(&sr.sender_ssrc.to_be_bytes());
850    body.extend_from_slice(&sr.ntp_most.to_be_bytes());
851    body.extend_from_slice(&sr.ntp_least.to_be_bytes());
852    body.extend_from_slice(&sr.rtp_timestamp.to_be_bytes());
853    body.extend_from_slice(&sr.packet_count.to_be_bytes());
854    body.extend_from_slice(&sr.octet_count.to_be_bytes());
855    for block in &sr.report_blocks {
856        body.extend_from_slice(&build_report_block(block));
857    }
858    Ok(body)
859}
860
861fn build_receiver_report_body(rr: &ReceiverReport) -> RtpResult<Vec<u8>> {
862    let mut body = Vec::with_capacity(4 + rr.report_blocks.len() * 24);
863    body.extend_from_slice(&rr.sender_ssrc.to_be_bytes());
864    for block in &rr.report_blocks {
865        body.extend_from_slice(&build_report_block(block));
866    }
867    Ok(body)
868}
869
870fn build_sdes_body(sdes: &SourceDescription) -> Vec<u8> {
871    let mut body = Vec::new();
872    for chunk in &sdes.chunks {
873        body.extend_from_slice(&chunk.ssrc.to_be_bytes());
874        for item in &chunk.items {
875            body.push(item.ty);
876            body.push(item.text.len() as u8);
877            body.extend_from_slice(item.text.as_bytes());
878        }
879        body.push(0); // End of list
880        while body.len() % 4 != 0 {
881            body.push(0);
882        }
883    }
884    body
885}
886
887fn build_goodbye_body(bye: &Goodbye) -> Vec<u8> {
888    let mut body = Vec::new();
889    for ssrc in &bye.sources {
890        body.extend_from_slice(&ssrc.to_be_bytes());
891    }
892    if let Some(reason) = &bye.reason {
893        let bytes = reason.as_bytes();
894        let len = bytes.len().min(255) as u8;
895        body.push(len);
896        body.extend_from_slice(&bytes[..len as usize]);
897        // Padding to 32-bit boundary is handled by write_rtcp_packet
898    }
899    body
900}
901
902fn build_report_block(block: &ReportBlock) -> [u8; 24] {
903    let mut buf = [0u8; 24];
904    buf[0..4].copy_from_slice(&block.ssrc.to_be_bytes());
905    buf[4] = block.fraction_lost;
906    let clamped = block.packets_lost.clamp(-(1 << 23), (1 << 23) - 1);
907    let lost_bytes = (clamped as u32 & 0x00FF_FFFF).to_be_bytes();
908    buf[5..8].copy_from_slice(&lost_bytes[1..]);
909    buf[8..12].copy_from_slice(&block.highest_sequence.to_be_bytes());
910    buf[12..16].copy_from_slice(&block.jitter.to_be_bytes());
911    buf[16..20].copy_from_slice(&block.last_sender_report.to_be_bytes());
912    buf[20..24].copy_from_slice(&block.delay_since_last_sender_report.to_be_bytes());
913    buf
914}
915
916fn build_psfb_common(sender_ssrc: u32, media_ssrc: u32) -> Vec<u8> {
917    let mut body = Vec::with_capacity(8);
918    body.extend_from_slice(&sender_ssrc.to_be_bytes());
919    body.extend_from_slice(&media_ssrc.to_be_bytes());
920    body
921}
922
923fn build_fir_body(fir: &FullIntraRequest) -> Vec<u8> {
924    let mut body = Vec::with_capacity(8 + fir.requests.len() * 8);
925    body.extend_from_slice(&fir.sender_ssrc.to_be_bytes());
926    body.extend_from_slice(&0u32.to_be_bytes());
927    for entry in &fir.requests {
928        body.extend_from_slice(&entry.ssrc.to_be_bytes());
929        body.push(entry.sequence_number);
930        body.extend_from_slice(&[0u8; 3]);
931    }
932    body
933}
934
935fn build_nack_body(nack: &GenericNack) -> RtpResult<Vec<u8>> {
936    if nack.lost_packets.is_empty() {
937        return Err(RtpError::InvalidRtcp("NACK requires at least one packet"));
938    }
939    let pairs = pack_nack_pairs(&nack.lost_packets);
940    let mut body = Vec::with_capacity(8 + pairs.len() * 4);
941    body.extend_from_slice(&nack.sender_ssrc.to_be_bytes());
942    body.extend_from_slice(&nack.media_ssrc.to_be_bytes());
943    for (pid, blp) in pairs {
944        body.extend_from_slice(&pid.to_be_bytes());
945        body.extend_from_slice(&blp.to_be_bytes());
946    }
947    Ok(body)
948}
949
950fn build_remb_body(remb: &RemoteBitrateEstimate) -> RtpResult<Vec<u8>> {
951    if remb.ssrcs.len() > 0xFF {
952        return Err(RtpError::InvalidRtcp("too many REMB SSRC entries"));
953    }
954    let mut body = Vec::with_capacity(16 + remb.ssrcs.len() * 4);
955    body.extend_from_slice(&remb.sender_ssrc.to_be_bytes());
956    body.extend_from_slice(&0u32.to_be_bytes());
957    body.extend_from_slice(b"REMB");
958    body.push(remb.ssrcs.len() as u8);
959    let mut exponent = 0u8;
960    let mut mantissa = remb.bitrate_bps;
961    while mantissa > 0x3FFFF {
962        mantissa >>= 1;
963        exponent += 1;
964    }
965    let mantissa_u32 = mantissa as u32;
966    body.push(((exponent & 0x3F) << 2) | ((mantissa_u32 >> 16) as u8 & 0x03));
967    body.push(((mantissa_u32 >> 8) & 0xFF) as u8);
968    body.push((mantissa_u32 & 0xFF) as u8);
969    for ssrc in &remb.ssrcs {
970        body.extend_from_slice(&ssrc.to_be_bytes());
971    }
972    Ok(body)
973}
974
975fn build_twcc_body(twcc: &TransportWideCc) -> Vec<u8> {
976    let mut body = Vec::with_capacity(16 + twcc.payload.len());
977    body.extend_from_slice(&twcc.sender_ssrc.to_be_bytes());
978    body.extend_from_slice(&twcc.media_ssrc.to_be_bytes());
979    body.extend_from_slice(&twcc.base_sequence.to_be_bytes());
980    body.extend_from_slice(&twcc.packet_status_count.to_be_bytes());
981    let ref_time = twcc.reference_time_64ms & 0x00FF_FFFF;
982    body.extend_from_slice(&ref_time.to_be_bytes()[1..]);
983    body.push(twcc.feedback_packet_count);
984    body.extend_from_slice(&twcc.payload);
985    body
986}
987
988fn pack_nack_pairs(packets: &[u16]) -> Vec<(u16, u16)> {
989    let mut seqs = packets.to_vec();
990    seqs.sort_unstable();
991    seqs.dedup();
992    let mut pairs = Vec::new();
993    let mut idx = 0;
994    while idx < seqs.len() {
995        let pid = seqs[idx];
996        idx += 1;
997        let mut blp = 0u16;
998        while idx < seqs.len() {
999            let diff = seqs[idx].wrapping_sub(pid);
1000            if diff == 0 {
1001                idx += 1;
1002                continue;
1003            }
1004            if diff > 16 {
1005                break;
1006            }
1007            blp |= 1 << (diff - 1);
1008            idx += 1;
1009        }
1010        pairs.push((pid, blp));
1011    }
1012    pairs
1013}
1014
1015pub fn is_rtcp(packet: &[u8]) -> bool {
1016    packet.len() >= 2 && (192..=208).contains(&packet[1])
1017}
1018
1019#[cfg(test)]
1020mod tests {
1021    use super::*;
1022
1023    #[test]
1024    fn rtp_roundtrip() {
1025        let mut header = RtpHeader::new(96, 1000, 42, 0x1234_5678);
1026        header.marker = true;
1027        header.csrcs = vec![0x0102_0304];
1028        header.extension = Some(RtpHeaderExtension::new(0xBEDE, vec![0, 1, 2, 3]));
1029        let packet = RtpPacket {
1030            header,
1031            payload: vec![9, 8, 7, 6],
1032            padding_len: 0,
1033        };
1034        let serialized = packet.marshal().unwrap();
1035        let parsed = RtpPacket::parse(&serialized).unwrap();
1036        assert_eq!(parsed.header.payload_type, 96);
1037        assert_eq!(parsed.header.sequence_number, 1000);
1038        assert!(parsed.header.marker);
1039        assert_eq!(parsed.payload, vec![9, 8, 7, 6]);
1040    }
1041
1042    #[test]
1043    fn remb_roundtrip() {
1044        let remb = RemoteBitrateEstimate {
1045            sender_ssrc: 1,
1046            bitrate_bps: 750_000,
1047            ssrcs: vec![2, 3],
1048        };
1049        let bytes =
1050            marshal_rtcp_packets(&[RtcpPacket::RemoteBitrateEstimate(remb.clone())]).unwrap();
1051        let parsed = parse_rtcp_packets(&bytes).unwrap();
1052        match &parsed[0] {
1053            RtcpPacket::RemoteBitrateEstimate(decoded) => {
1054                assert_eq!(decoded.sender_ssrc, remb.sender_ssrc);
1055                assert_eq!(decoded.ssrcs, remb.ssrcs);
1056            }
1057            other => panic!("unexpected packet: {other:?}"),
1058        }
1059    }
1060
1061    #[test]
1062    fn nack_pair_encoding() {
1063        let pairs = pack_nack_pairs(&[10, 11, 12, 30]);
1064        assert_eq!(pairs, vec![(10, 0b0000_0000_0000_0011), (30, 0)]);
1065    }
1066
1067    #[test]
1068    fn pli_roundtrip() {
1069        let pli = RtcpPacket::PictureLossIndication(PictureLossIndication {
1070            sender_ssrc: 1,
1071            media_ssrc: 2,
1072        });
1073        let bytes = marshal_rtcp_packets(&[pli.clone()]).unwrap();
1074        let parsed = parse_rtcp_packets(&bytes).unwrap();
1075        assert!(matches!(parsed[0], RtcpPacket::PictureLossIndication(_)));
1076    }
1077
1078    #[test]
1079    fn generic_nack_roundtrip() {
1080        let nack = RtcpPacket::GenericNack(GenericNack {
1081            sender_ssrc: 5,
1082            media_ssrc: 6,
1083            lost_packets: vec![100, 102],
1084        });
1085        let bytes = marshal_rtcp_packets(&[nack.clone()]).unwrap();
1086        let parsed = parse_rtcp_packets(&bytes).unwrap();
1087        match &parsed[0] {
1088            RtcpPacket::GenericNack(out) => {
1089                assert_eq!(out.sender_ssrc, 5);
1090                assert_eq!(out.media_ssrc, 6);
1091                assert_eq!(out.lost_packets.len(), 2);
1092            }
1093            other => panic!("unexpected packet: {other:?}"),
1094        }
1095    }
1096
1097    #[test]
1098    fn rtcp_detection() {
1099        let pli =
1100            marshal_rtcp_packets(&[RtcpPacket::PictureLossIndication(PictureLossIndication {
1101                sender_ssrc: 1,
1102                media_ssrc: 2,
1103            })])
1104            .unwrap();
1105        assert!(is_rtcp(&pli));
1106    }
1107
1108    #[test]
1109    fn sdes_roundtrip() {
1110        let sdes = SourceDescription {
1111            chunks: vec![SdesChunk {
1112                ssrc: 0x12345678,
1113                items: vec![
1114                    SdesItem {
1115                        ty: 1, // CNAME
1116                        text: "user@host".to_string(),
1117                    },
1118                    SdesItem {
1119                        ty: 2, // NAME
1120                        text: "My Name".to_string(),
1121                    },
1122                ],
1123            }],
1124        };
1125        let packet = RtcpPacket::SourceDescription(sdes.clone());
1126        let bytes = marshal_rtcp_packets(&[packet]).unwrap();
1127        let parsed = parse_rtcp_packets(&bytes).unwrap();
1128
1129        match &parsed[0] {
1130            RtcpPacket::SourceDescription(decoded) => {
1131                assert_eq!(decoded.chunks.len(), 1);
1132                let chunk = &decoded.chunks[0];
1133                assert_eq!(chunk.ssrc, 0x12345678);
1134                assert_eq!(chunk.items.len(), 2);
1135                assert_eq!(chunk.items[0].ty, 1);
1136                assert_eq!(chunk.items[0].text, "user@host");
1137                assert_eq!(chunk.items[1].ty, 2);
1138                assert_eq!(chunk.items[1].text, "My Name");
1139            }
1140            other => panic!("unexpected packet: {other:?}"),
1141        }
1142    }
1143
1144    #[test]
1145    fn test_set_extension() {
1146        let mut header = RtpHeader::new(96, 1000, 42, 0x1234_5678);
1147        header.set_extension(1, &[0xAA, 0xBB, 0xCC]).unwrap();
1148
1149        let ext = header.extension.as_ref().unwrap();
1150        assert_eq!(ext.profile, 0xBEDE);
1151        // 1 byte header: (ID << 4) | (len - 1) = (1 << 4) | 2 = 0x12
1152        // Data: AA BB CC
1153        assert_eq!(ext.data[0..4], [0x12, 0xAA, 0xBB, 0xCC]);
1154
1155        // Update existing
1156        header.set_extension(1, &[0x11, 0x22]).unwrap();
1157        let ext_updated = header.extension.as_ref().unwrap();
1158        // (1 << 4) | 1 = 0x11
1159        // Data: 11 22
1160        // Padding: 00 00
1161        assert_eq!(ext_updated.data[0..4], [0x11, 0x11, 0x22, 0x00]);
1162
1163        // Add another
1164        header.set_extension(2, &[0xFF]).unwrap();
1165        // Ext 1: 11 11 22
1166        // Ext 2: (2 << 4) | 0 = 0x20, Data: FF
1167        // Total: 11 11 22 20 FF -> pad to 8 bytes: 11 11 22 20 FF 00 00 00
1168        assert!(header.get_extension(1).is_some());
1169        assert!(header.get_extension(2).is_some());
1170        assert_eq!(header.get_extension(1).unwrap(), vec![0x11, 0x22]);
1171        assert_eq!(header.get_extension(2).unwrap(), vec![0xFF]);
1172    }
1173
1174    #[test]
1175    fn test_abs_send_time_calculation() {
1176        let t = SystemTime::UNIX_EPOCH;
1177        let abs = calculate_abs_send_time(t);
1178        assert_eq!(abs, 0); // 2208988800 is multiple of 64
1179
1180        let t2 = t + std::time::Duration::from_secs(1);
1181        let abs2 = calculate_abs_send_time(t2);
1182        assert_eq!(abs2, 0x40000); // 1 << 18
1183    }
1184}