1use crate::errors::{RtpError, RtpResult};
2use serde::{Deserialize, Serialize};
3use std::time::SystemTime;
4use tracing::debug;
5
6const RTP_VERSION: u8 = 2;
7pub const RTCP_SR: u8 = 200;
8pub const RTCP_RR: u8 = 201;
9pub const RTCP_SDES: u8 = 202;
10pub const RTCP_BYE: u8 = 203;
11pub const RTCP_RTPFB: u8 = 205;
12pub const RTCP_PSFB: u8 = 206;
13
14pub const RTCP_RTPFB_NACK: u8 = 1;
15pub const RTCP_RTPFB_TWCC: u8 = 15;
16
17pub const RTCP_PSFB_PLI: u8 = 1;
18pub const RTCP_PSFB_FIR: u8 = 4;
19pub const RTCP_PSFB_APP: u8 = 15; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
22pub struct RtpHeaderExtension {
23 pub profile: u16,
24 pub data: Vec<u8>,
25}
26
27impl RtpHeaderExtension {
28 pub fn new(profile: u16, data: Vec<u8>) -> Self {
29 Self { profile, data }
30 }
31}
32
33#[derive(Debug, Clone, PartialEq, Eq)]
34pub struct RtpHeader {
35 pub marker: bool,
36 pub payload_type: u8,
37 pub sequence_number: u16,
38 pub timestamp: u32,
39 pub ssrc: u32,
40 pub csrcs: Vec<u32>,
41 pub extension: Option<RtpHeaderExtension>,
42}
43
44impl RtpHeader {
45 pub fn new(payload_type: u8, sequence_number: u16, timestamp: u32, ssrc: u32) -> Self {
46 Self {
47 marker: false,
48 payload_type,
49 sequence_number,
50 timestamp,
51 ssrc,
52 csrcs: Vec::new(),
53 extension: None,
54 }
55 }
56
57 pub fn get_extension(&self, id: u8) -> Option<Vec<u8>> {
58 if let Some(ext) = &self.extension {
59 if ext.profile == 0xBEDE {
60 let mut offset = 0;
61 while offset < ext.data.len() {
62 let b = ext.data[offset];
63 if b == 0 {
64 offset += 1;
65 continue;
66 }
67 let ext_id = b >> 4;
68 let len = (b & 0x0F) as usize + 1;
69 offset += 1;
70
71 if ext_id == 15 {
72 break;
73 }
74
75 if ext_id == id {
76 if offset + len <= ext.data.len() {
77 return Some(ext.data[offset..offset + len].to_vec());
78 } else {
79 return None;
80 }
81 }
82 offset += len;
83 }
84 } else if ext.profile == 0x1000 {
85 let mut offset = 0;
86 while offset < ext.data.len() {
87 let ext_id = ext.data[offset];
88 if ext_id == 0 {
89 offset += 1;
90 continue;
91 }
92 offset += 1;
93
94 if offset >= ext.data.len() {
95 break;
96 }
97 let len = ext.data[offset] as usize;
98 offset += 1;
99
100 if ext_id == id {
101 if offset + len <= ext.data.len() {
102 return Some(ext.data[offset..offset + len].to_vec());
103 } else {
104 return None;
105 }
106 }
107 offset += len;
108 }
109 } else {
110 }
112 }
113 None
114 }
115
116 pub fn set_extension(&mut self, id: u8, data: &[u8]) -> RtpResult<()> {
117 if id == 0 || id >= 15 {
118 return Err(RtpError::InvalidHeader(
119 "invalid extension id for one-byte header",
120 ));
121 }
122 if data.len() > 16 || data.is_empty() {
123 return Err(RtpError::InvalidHeader("invalid extension data length"));
124 }
125
126 let mut ext = self
127 .extension
128 .clone()
129 .unwrap_or_else(|| RtpHeaderExtension::new(0xBEDE, Vec::new()));
130
131 if ext.profile != 0xBEDE {
132 return Err(RtpError::InvalidHeader(
134 "unsupported extension profile for modification",
135 ));
136 }
137
138 let mut new_data = Vec::new();
139 let mut found = false;
140 let mut offset = 0;
141
142 while offset < ext.data.len() {
143 let b = ext.data[offset];
144 if b == 0 {
145 offset += 1;
146 continue;
147 }
148 let ext_id = b >> 4;
149 let len = (b & 0x0F) as usize + 1;
150 offset += 1;
151
152 if ext_id == 15 {
153 break;
154 }
155
156 if ext_id == id {
157 found = true;
158 new_data.push((id << 4) | ((data.len() - 1) as u8));
159 new_data.extend_from_slice(data);
160 } else {
161 new_data.push(b);
162 new_data.extend_from_slice(&ext.data[offset..offset + len]);
163 }
164 offset += len;
165 }
166
167 if !found {
168 new_data.push((id << 4) | ((data.len() - 1) as u8));
169 new_data.extend_from_slice(data);
170 }
171
172 while new_data.len() % 4 != 0 {
174 new_data.push(0);
175 }
176
177 ext.data = new_data;
178 self.extension = Some(ext);
179 Ok(())
180 }
181
182 fn validate(&self) -> RtpResult<()> {
183 if self.csrcs.len() > 15 {
184 return Err(RtpError::InvalidHeader("too many CSRC entries"));
185 }
186 if let Some(ext) = &self.extension
187 && ext.data.len() % 4 != 0
188 {
189 return Err(RtpError::InvalidHeader(
190 "header extension payload must be 32-bit aligned",
191 ));
192 }
193 Ok(())
194 }
195}
196
197#[derive(Debug, Clone, PartialEq, Eq)]
198pub struct RtpPacket {
199 pub header: RtpHeader,
200 pub payload: Vec<u8>,
201 pub padding_len: u8,
202}
203
204impl RtpPacket {
205 pub fn new(header: RtpHeader, payload: Vec<u8>) -> Self {
206 Self {
207 header,
208 payload,
209 padding_len: 0,
210 }
211 }
212
213 pub fn parse(raw: &[u8]) -> RtpResult<Self> {
214 if raw.len() < 12 {
215 return Err(RtpError::PacketTooShort);
216 }
217 let b0 = raw[0];
218 let b1 = raw[1];
219 let version = b0 >> 6;
220 if version != RTP_VERSION {
221 return Err(RtpError::UnsupportedVersion(version));
222 }
223 let padding = (b0 & 0x20) != 0;
224 let extension = (b0 & 0x10) != 0;
225 let csrc_count = (b0 & 0x0F) as usize;
226 let marker = (b1 & 0x80) != 0;
227 let payload_type = b1 & 0x7F;
228
229 let mut offset = 12usize;
230 if raw.len() < offset + csrc_count * 4 {
231 return Err(RtpError::PacketTooShort);
232 }
233 let sequence_number = u16::from_be_bytes([raw[2], raw[3]]);
234 let timestamp = u32::from_be_bytes([raw[4], raw[5], raw[6], raw[7]]);
235 let ssrc = u32::from_be_bytes([raw[8], raw[9], raw[10], raw[11]]);
236
237 let mut csrcs = Vec::with_capacity(csrc_count);
238 for _ in 0..csrc_count {
239 let value = u32::from_be_bytes([
240 raw[offset],
241 raw[offset + 1],
242 raw[offset + 2],
243 raw[offset + 3],
244 ]);
245 csrcs.push(value);
246 offset += 4;
247 }
248
249 let mut extension_header = None;
250 if extension {
251 if raw.len() < offset + 4 {
252 return Err(RtpError::PacketTooShort);
253 }
254 let profile = u16::from_be_bytes([raw[offset], raw[offset + 1]]);
255 let length_words = u16::from_be_bytes([raw[offset + 2], raw[offset + 3]]) as usize;
256 offset += 4;
257 let extension_len = length_words * 4;
258 if raw.len() < offset + extension_len {
259 return Err(RtpError::PacketTooShort);
260 }
261 extension_header = Some(RtpHeaderExtension::new(
262 profile,
263 raw[offset..offset + extension_len].to_vec(),
264 ));
265 offset += extension_len;
266 }
267
268 let mut payload_end = raw.len();
269 let mut padding_len = 0u8;
270 if padding {
271 padding_len = *raw.last().ok_or(RtpError::PacketTooShort)?;
272 if padding_len as usize > raw.len().saturating_sub(offset) {
273 return Err(RtpError::InvalidHeader("padding larger than payload"));
274 }
275 payload_end -= padding_len as usize;
276 }
277 let payload = raw[offset..payload_end].to_vec();
278
279 let header = RtpHeader {
280 marker,
281 payload_type,
282 sequence_number,
283 timestamp,
284 ssrc,
285 csrcs,
286 extension: extension_header,
287 };
288
289 Ok(Self {
290 header,
291 payload,
292 padding_len,
293 })
294 }
295
296 pub fn marshal(&self) -> RtpResult<Vec<u8>> {
297 self.header.validate()?;
298 let mut buffer = Vec::with_capacity(12 + self.header.csrcs.len() * 4 + self.payload.len());
299 let mut b0 = RTP_VERSION << 6;
300 if self.padding_len > 0 {
301 b0 |= 0x20;
302 }
303 if self.header.extension.is_some() {
304 b0 |= 0x10;
305 }
306 b0 |= (self.header.csrcs.len() & 0x0F) as u8;
307 let mut b1 = self.header.payload_type & 0x7F;
308 if self.header.marker {
309 b1 |= 0x80;
310 }
311 buffer.push(b0);
312 buffer.push(b1);
313 buffer.extend_from_slice(&self.header.sequence_number.to_be_bytes());
314 buffer.extend_from_slice(&self.header.timestamp.to_be_bytes());
315 buffer.extend_from_slice(&self.header.ssrc.to_be_bytes());
316 for csrc in &self.header.csrcs {
317 buffer.extend_from_slice(&csrc.to_be_bytes());
318 }
319 if let Some(extension) = &self.header.extension {
320 let length_words = (extension.data.len() / 4) as u16;
321 buffer.extend_from_slice(&extension.profile.to_be_bytes());
322 buffer.extend_from_slice(&length_words.to_be_bytes());
323 buffer.extend_from_slice(&extension.data);
324 }
325 buffer.extend_from_slice(&self.payload);
326 if self.padding_len > 0 {
327 buffer.extend(std::iter::repeat_n(
328 self.padding_len,
329 self.padding_len as usize,
330 ));
331 }
332 Ok(buffer)
333 }
334}
335
336pub fn calculate_abs_send_time(time: SystemTime) -> u32 {
337 let duration = time
338 .duration_since(std::time::UNIX_EPOCH)
339 .unwrap_or_default();
340 let ntp_seconds = duration.as_secs() + 2208988800;
343 let ntp_fraction = (duration.subsec_nanos() as u64 * (1u64 << 32) / 1_000_000_000) as u32;
344
345 let ntp_timestamp = ((ntp_seconds as u64) << 32) | (ntp_fraction as u64);
346 ((ntp_timestamp >> 14) & 0x00ffffff) as u32
347}
348
349#[derive(Debug, Clone, PartialEq, Eq)]
350pub struct ReportBlock {
351 pub ssrc: u32,
352 pub fraction_lost: u8,
353 pub packets_lost: i32,
354 pub highest_sequence: u32,
355 pub jitter: u32,
356 pub last_sender_report: u32,
357 pub delay_since_last_sender_report: u32,
358}
359
360#[derive(Debug, Clone, PartialEq, Eq)]
361pub struct SenderReport {
362 pub sender_ssrc: u32,
363 pub ntp_most: u32,
364 pub ntp_least: u32,
365 pub rtp_timestamp: u32,
366 pub packet_count: u32,
367 pub octet_count: u32,
368 pub report_blocks: Vec<ReportBlock>,
369}
370
371#[derive(Debug, Clone, PartialEq, Eq)]
372pub struct ReceiverReport {
373 pub sender_ssrc: u32,
374 pub report_blocks: Vec<ReportBlock>,
375}
376
377#[derive(Debug, Clone, PartialEq, Eq)]
378pub struct PictureLossIndication {
379 pub sender_ssrc: u32,
380 pub media_ssrc: u32,
381}
382
383#[derive(Debug, Clone, PartialEq, Eq)]
384pub struct FirRequest {
385 pub ssrc: u32,
386 pub sequence_number: u8,
387}
388
389#[derive(Debug, Clone, PartialEq, Eq)]
390pub struct FullIntraRequest {
391 pub sender_ssrc: u32,
392 pub requests: Vec<FirRequest>,
393}
394
395#[derive(Debug, Clone, PartialEq, Eq)]
396pub struct GenericNack {
397 pub sender_ssrc: u32,
398 pub media_ssrc: u32,
399 pub lost_packets: Vec<u16>,
400}
401
402#[derive(Debug, Clone, PartialEq, Eq)]
403pub struct RemoteBitrateEstimate {
404 pub sender_ssrc: u32,
405 pub bitrate_bps: u64,
406 pub ssrcs: Vec<u32>,
407}
408
409#[derive(Debug, Clone, PartialEq, Eq)]
410pub struct TransportWideCc {
411 pub sender_ssrc: u32,
412 pub media_ssrc: u32,
413 pub base_sequence: u16,
414 pub packet_status_count: u16,
415 pub reference_time_64ms: u32,
416 pub feedback_packet_count: u8,
417 pub payload: Vec<u8>,
418}
419
420#[derive(Debug, Clone, PartialEq, Eq)]
421pub struct SdesItem {
422 pub ty: u8,
423 pub text: String,
424}
425
426#[derive(Debug, Clone, PartialEq, Eq)]
427pub struct SdesChunk {
428 pub ssrc: u32,
429 pub items: Vec<SdesItem>,
430}
431
432#[derive(Debug, Clone, PartialEq, Eq)]
433pub struct SourceDescription {
434 pub chunks: Vec<SdesChunk>,
435}
436
437#[derive(Debug, Clone, PartialEq, Eq)]
438pub struct Goodbye {
439 pub sources: Vec<u32>,
440 pub reason: Option<String>,
441}
442
443#[derive(Debug, Clone, PartialEq, Eq)]
444pub enum RtcpPacket {
445 SenderReport(SenderReport),
446 ReceiverReport(ReceiverReport),
447 SourceDescription(SourceDescription),
448 Goodbye(Goodbye),
449 PictureLossIndication(PictureLossIndication),
450 FullIntraRequest(FullIntraRequest),
451 GenericNack(GenericNack),
452 RemoteBitrateEstimate(RemoteBitrateEstimate),
453 TransportWideCc(TransportWideCc),
454}
455
456pub fn parse_rtcp_packets(raw: &[u8]) -> RtpResult<Vec<RtcpPacket>> {
457 let mut packets = Vec::new();
458 let mut offset = 0usize;
459 while offset + 4 <= raw.len() {
460 let vrc = raw[offset];
461 let version = vrc >> 6;
462 if version != RTP_VERSION {
463 return Err(RtpError::InvalidRtcp("invalid RTCP version"));
464 }
465 let padding = (vrc & 0x20) != 0;
466 let fmt = vrc & 0x1F;
467 let packet_type = raw[offset + 1];
468 let length_words = u16::from_be_bytes([raw[offset + 2], raw[offset + 3]]) as usize;
469 let packet_len = (length_words + 1) * 4;
470 if raw.len() < offset + packet_len {
471 return Err(RtpError::LengthMismatch);
472 }
473 let body_len = packet_len.saturating_sub(4);
474 let mut body_end = offset + packet_len;
475 if padding {
476 let pad = raw[body_end - 1] as usize;
477 if pad == 0 || pad > body_len {
478 return Err(RtpError::InvalidRtcp("invalid padding in RTCP packet"));
479 }
480 body_end -= pad;
481 }
482 let body = &raw[offset + 4..body_end];
483 match packet_type {
484 RTCP_SR => packets.push(RtcpPacket::SenderReport(parse_sender_report(fmt, body)?)),
485 RTCP_RR => packets.push(RtcpPacket::ReceiverReport(parse_receiver_report(
486 fmt, body,
487 )?)),
488 RTCP_SDES => packets.push(RtcpPacket::SourceDescription(parse_sdes(fmt, body)?)),
489 RTCP_BYE => packets.push(RtcpPacket::Goodbye(parse_goodbye(fmt, body)?)),
490 RTCP_RTPFB => packets.push(parse_rtcp_rtpfb(fmt, body)?),
491 RTCP_PSFB => packets.push(parse_rtcp_psfb(fmt, body)?),
492 _ => {
493 debug!("unsupported RTCP packet type: {}", packet_type);
494 }
495 }
496 offset += packet_len;
497 }
498 Ok(packets)
499}
500
501pub fn marshal_rtcp_packets(packets: &[RtcpPacket]) -> RtpResult<Vec<u8>> {
502 let mut out = Vec::new();
503 for packet in packets {
504 match packet {
505 RtcpPacket::SenderReport(sr) => write_rtcp_packet(
506 &mut out,
507 sr.report_blocks.len() as u8,
508 RTCP_SR,
509 build_sender_report_body(sr)?,
510 ),
511 RtcpPacket::ReceiverReport(rr) => write_rtcp_packet(
512 &mut out,
513 rr.report_blocks.len() as u8,
514 RTCP_RR,
515 build_receiver_report_body(rr)?,
516 ),
517 RtcpPacket::SourceDescription(sdes) => write_rtcp_packet(
518 &mut out,
519 sdes.chunks.len() as u8,
520 RTCP_SDES,
521 build_sdes_body(sdes),
522 ),
523 RtcpPacket::Goodbye(bye) => write_rtcp_packet(
524 &mut out,
525 bye.sources.len() as u8,
526 RTCP_BYE,
527 build_goodbye_body(bye),
528 ),
529 RtcpPacket::PictureLossIndication(pli) => write_rtcp_packet(
530 &mut out,
531 RTCP_PSFB_PLI,
532 RTCP_PSFB,
533 build_psfb_common(pli.sender_ssrc, pli.media_ssrc),
534 ),
535 RtcpPacket::FullIntraRequest(fir) => {
536 write_rtcp_packet(&mut out, RTCP_PSFB_FIR, RTCP_PSFB, build_fir_body(fir))
537 }
538 RtcpPacket::GenericNack(nack) => write_rtcp_packet(
539 &mut out,
540 RTCP_RTPFB_NACK,
541 RTCP_RTPFB,
542 build_nack_body(nack)?,
543 ),
544 RtcpPacket::RemoteBitrateEstimate(remb) => {
545 write_rtcp_packet(&mut out, RTCP_PSFB_APP, RTCP_PSFB, build_remb_body(remb)?)
546 }
547 RtcpPacket::TransportWideCc(twcc) => {
548 write_rtcp_packet(&mut out, RTCP_RTPFB_TWCC, RTCP_RTPFB, build_twcc_body(twcc))
549 }
550 }
551 }
552 Ok(out)
553}
554
555fn write_rtcp_packet(out: &mut Vec<u8>, fmt: u8, packet_type: u8, mut body: Vec<u8>) {
556 while !body.len().is_multiple_of(4) {
557 body.push(0);
558 }
559 let length = ((body.len() + 4) / 4).saturating_sub(1) as u16;
560 out.push((RTP_VERSION << 6) | (fmt & 0x1F));
561 out.push(packet_type);
562 out.extend_from_slice(&length.to_be_bytes());
563 out.extend_from_slice(&body);
564}
565
566fn parse_sender_report(fmt: u8, body: &[u8]) -> RtpResult<SenderReport> {
567 if body.len() < 24 {
568 return Err(RtpError::InvalidRtcp("sender report too short"));
569 }
570 let sender_ssrc = u32::from_be_bytes([body[0], body[1], body[2], body[3]]);
571 let ntp_most = u32::from_be_bytes([body[4], body[5], body[6], body[7]]);
572 let ntp_least = u32::from_be_bytes([body[8], body[9], body[10], body[11]]);
573 let rtp_timestamp = u32::from_be_bytes([body[12], body[13], body[14], body[15]]);
574 let packet_count = u32::from_be_bytes([body[16], body[17], body[18], body[19]]);
575 let octet_count = u32::from_be_bytes([body[20], body[21], body[22], body[23]]);
576 let mut offset = 24;
577 let mut report_blocks = Vec::with_capacity(fmt as usize);
578 for _ in 0..fmt {
579 if body.len() < offset + 24 {
580 return Err(RtpError::LengthMismatch);
581 }
582 report_blocks.push(parse_report_block(&body[offset..offset + 24]));
583 offset += 24;
584 }
585 Ok(SenderReport {
586 sender_ssrc,
587 ntp_most,
588 ntp_least,
589 rtp_timestamp,
590 packet_count,
591 octet_count,
592 report_blocks,
593 })
594}
595
596fn parse_receiver_report(fmt: u8, body: &[u8]) -> RtpResult<ReceiverReport> {
597 if body.len() < 4 {
598 return Err(RtpError::InvalidRtcp("receiver report too short"));
599 }
600 let sender_ssrc = u32::from_be_bytes([body[0], body[1], body[2], body[3]]);
601 let mut offset = 4;
602 let mut report_blocks = Vec::with_capacity(fmt as usize);
603 for _ in 0..fmt {
604 if body.len() < offset + 24 {
605 return Err(RtpError::LengthMismatch);
606 }
607 report_blocks.push(parse_report_block(&body[offset..offset + 24]));
608 offset += 24;
609 }
610 Ok(ReceiverReport {
611 sender_ssrc,
612 report_blocks,
613 })
614}
615
616fn parse_sdes(count: u8, body: &[u8]) -> RtpResult<SourceDescription> {
617 let mut chunks = Vec::with_capacity(count as usize);
618 let mut offset = 0;
619 for _ in 0..count {
620 if body.len() < offset + 4 {
621 return Err(RtpError::PacketTooShort);
622 }
623 let ssrc = u32::from_be_bytes([
624 body[offset],
625 body[offset + 1],
626 body[offset + 2],
627 body[offset + 3],
628 ]);
629 offset += 4;
630 let mut items = Vec::new();
631 loop {
632 if offset >= body.len() {
633 break;
634 }
635 let ty = body[offset];
636 offset += 1;
637 if ty == 0 {
638 while offset % 4 != 0 {
640 if offset >= body.len() {
641 break;
642 }
643 offset += 1;
644 }
645 break;
646 }
647 if offset >= body.len() {
648 return Err(RtpError::PacketTooShort);
649 }
650 let len = body[offset] as usize;
651 offset += 1;
652 if body.len() < offset + len {
653 return Err(RtpError::PacketTooShort);
654 }
655 let text = String::from_utf8_lossy(&body[offset..offset + len]).to_string();
656 items.push(SdesItem { ty, text });
657 offset += len;
658 }
659 chunks.push(SdesChunk { ssrc, items });
660 }
661 Ok(SourceDescription { chunks })
662}
663
664fn parse_goodbye(count: u8, body: &[u8]) -> RtpResult<Goodbye> {
665 let mut sources = Vec::with_capacity(count as usize);
666 let mut offset = 0;
667 for _ in 0..count {
668 if body.len() < offset + 4 {
669 return Err(RtpError::PacketTooShort);
670 }
671 let ssrc = u32::from_be_bytes([
672 body[offset],
673 body[offset + 1],
674 body[offset + 2],
675 body[offset + 3],
676 ]);
677 sources.push(ssrc);
678 offset += 4;
679 }
680
681 let mut reason = None;
682 if offset < body.len() {
683 let len = body[offset] as usize;
684 offset += 1;
685 if body.len() < offset + len {
686 return Err(RtpError::PacketTooShort);
687 }
688 reason = Some(String::from_utf8_lossy(&body[offset..offset + len]).to_string());
689 }
690
691 Ok(Goodbye { sources, reason })
692}
693
694fn parse_report_block(bytes: &[u8]) -> ReportBlock {
695 let ssrc = u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
696 let fraction_lost = bytes[4];
697 let packets_lost =
698 (((bytes[5] as i32) << 16) | ((bytes[6] as i32) << 8) | bytes[7] as i32) << 8 >> 8;
699 let highest_sequence = u32::from_be_bytes([bytes[8], bytes[9], bytes[10], bytes[11]]);
700 let jitter = u32::from_be_bytes([bytes[12], bytes[13], bytes[14], bytes[15]]);
701 let last_sender_report = u32::from_be_bytes([bytes[16], bytes[17], bytes[18], bytes[19]]);
702 let delay_since_last_sender_report =
703 u32::from_be_bytes([bytes[20], bytes[21], bytes[22], bytes[23]]);
704 ReportBlock {
705 ssrc,
706 fraction_lost,
707 packets_lost,
708 highest_sequence,
709 jitter,
710 last_sender_report,
711 delay_since_last_sender_report,
712 }
713}
714
715fn parse_rtcp_rtpfb(fmt: u8, body: &[u8]) -> RtpResult<RtcpPacket> {
716 match fmt {
717 RTCP_RTPFB_NACK => Ok(RtcpPacket::GenericNack(parse_nack_body(body)?)),
718 RTCP_RTPFB_TWCC => Ok(RtcpPacket::TransportWideCc(parse_twcc_body(body)?)),
719 _ => Err(RtpError::InvalidRtcp("unsupported RTP feedback format")),
720 }
721}
722
723fn parse_rtcp_psfb(fmt: u8, body: &[u8]) -> RtpResult<RtcpPacket> {
724 match fmt {
725 RTCP_PSFB_PLI => Ok(RtcpPacket::PictureLossIndication(parse_psfb_common(body)?)),
726 RTCP_PSFB_FIR => Ok(RtcpPacket::FullIntraRequest(parse_fir_body(body)?)),
727 RTCP_PSFB_APP => Ok(RtcpPacket::RemoteBitrateEstimate(parse_remb_body(body)?)),
728 _ => Err(RtpError::InvalidRtcp("unsupported payload feedback format")),
729 }
730}
731
732fn parse_psfb_common(body: &[u8]) -> RtpResult<PictureLossIndication> {
733 if body.len() < 8 {
734 return Err(RtpError::InvalidRtcp("payload feedback body too short"));
735 }
736 Ok(PictureLossIndication {
737 sender_ssrc: u32::from_be_bytes([body[0], body[1], body[2], body[3]]),
738 media_ssrc: u32::from_be_bytes([body[4], body[5], body[6], body[7]]),
739 })
740}
741
742fn parse_fir_body(body: &[u8]) -> RtpResult<FullIntraRequest> {
743 if body.len() < 8 {
744 return Err(RtpError::InvalidRtcp("FIR body too short"));
745 }
746 let sender_ssrc = u32::from_be_bytes([body[0], body[1], body[2], body[3]]);
747 let mut offset = 8; let mut requests = Vec::new();
749 while offset + 8 <= body.len() {
750 requests.push(FirRequest {
751 ssrc: u32::from_be_bytes([
752 body[offset],
753 body[offset + 1],
754 body[offset + 2],
755 body[offset + 3],
756 ]),
757 sequence_number: body[offset + 4],
758 });
759 offset += 8;
760 }
761 Ok(FullIntraRequest {
762 sender_ssrc,
763 requests,
764 })
765}
766
767fn parse_nack_body(body: &[u8]) -> RtpResult<GenericNack> {
768 if body.len() < 8 {
769 return Err(RtpError::InvalidRtcp("NACK body too short"));
770 }
771 let sender_ssrc = u32::from_be_bytes([body[0], body[1], body[2], body[3]]);
772 let media_ssrc = u32::from_be_bytes([body[4], body[5], body[6], body[7]]);
773 let mut lost_packets = Vec::new();
774 let mut offset = 8;
775 while offset + 4 <= body.len() {
776 let pid = u16::from_be_bytes([body[offset], body[offset + 1]]);
777 let blp = u16::from_be_bytes([body[offset + 2], body[offset + 3]]);
778 lost_packets.push(pid);
779 for bit in 0..16 {
780 if (blp >> bit) & 1 == 1 {
781 lost_packets.push(pid.wrapping_add(bit + 1));
782 }
783 }
784 offset += 4;
785 }
786 Ok(GenericNack {
787 sender_ssrc,
788 media_ssrc,
789 lost_packets,
790 })
791}
792
793fn parse_remb_body(body: &[u8]) -> RtpResult<RemoteBitrateEstimate> {
794 if body.len() < 16 || &body[8..12] != b"REMB" {
795 return Err(RtpError::InvalidRtcp("invalid REMB payload"));
796 }
797 let sender_ssrc = u32::from_be_bytes([body[0], body[1], body[2], body[3]]);
798 let num_ssrc = body[12] as usize;
799 let exponent = (body[13] & 0xFC) >> 2;
800 let mantissa = ((u32::from(body[13] & 0x03) << 16)
801 | (u32::from(body[14]) << 8)
802 | u32::from(body[15])) as u64;
803 let bitrate_bps = mantissa << exponent;
804 let mut ssrcs = Vec::with_capacity(num_ssrc);
805 let mut offset = 16;
806 for _ in 0..num_ssrc {
807 if body.len() < offset + 4 {
808 return Err(RtpError::LengthMismatch);
809 }
810 ssrcs.push(u32::from_be_bytes([
811 body[offset],
812 body[offset + 1],
813 body[offset + 2],
814 body[offset + 3],
815 ]));
816 offset += 4;
817 }
818 Ok(RemoteBitrateEstimate {
819 sender_ssrc,
820 bitrate_bps,
821 ssrcs,
822 })
823}
824
825fn parse_twcc_body(body: &[u8]) -> RtpResult<TransportWideCc> {
826 if body.len() < 16 {
827 return Err(RtpError::InvalidRtcp("TWCC body too short"));
828 }
829 let sender_ssrc = u32::from_be_bytes([body[0], body[1], body[2], body[3]]);
830 let media_ssrc = u32::from_be_bytes([body[4], body[5], body[6], body[7]]);
831 let base_sequence = u16::from_be_bytes([body[8], body[9]]);
832 let packet_status_count = u16::from_be_bytes([body[10], body[11]]);
833 let reference_time_64ms = u32::from_be_bytes([0, body[12], body[13], body[14]]);
834 let feedback_packet_count = body[15];
835 let payload = body[16..].to_vec();
836 Ok(TransportWideCc {
837 sender_ssrc,
838 media_ssrc,
839 base_sequence,
840 packet_status_count,
841 reference_time_64ms,
842 feedback_packet_count,
843 payload,
844 })
845}
846
847fn build_sender_report_body(sr: &SenderReport) -> RtpResult<Vec<u8>> {
848 let mut body = Vec::with_capacity(24 + sr.report_blocks.len() * 24);
849 body.extend_from_slice(&sr.sender_ssrc.to_be_bytes());
850 body.extend_from_slice(&sr.ntp_most.to_be_bytes());
851 body.extend_from_slice(&sr.ntp_least.to_be_bytes());
852 body.extend_from_slice(&sr.rtp_timestamp.to_be_bytes());
853 body.extend_from_slice(&sr.packet_count.to_be_bytes());
854 body.extend_from_slice(&sr.octet_count.to_be_bytes());
855 for block in &sr.report_blocks {
856 body.extend_from_slice(&build_report_block(block));
857 }
858 Ok(body)
859}
860
861fn build_receiver_report_body(rr: &ReceiverReport) -> RtpResult<Vec<u8>> {
862 let mut body = Vec::with_capacity(4 + rr.report_blocks.len() * 24);
863 body.extend_from_slice(&rr.sender_ssrc.to_be_bytes());
864 for block in &rr.report_blocks {
865 body.extend_from_slice(&build_report_block(block));
866 }
867 Ok(body)
868}
869
870fn build_sdes_body(sdes: &SourceDescription) -> Vec<u8> {
871 let mut body = Vec::new();
872 for chunk in &sdes.chunks {
873 body.extend_from_slice(&chunk.ssrc.to_be_bytes());
874 for item in &chunk.items {
875 body.push(item.ty);
876 body.push(item.text.len() as u8);
877 body.extend_from_slice(item.text.as_bytes());
878 }
879 body.push(0); while body.len() % 4 != 0 {
881 body.push(0);
882 }
883 }
884 body
885}
886
887fn build_goodbye_body(bye: &Goodbye) -> Vec<u8> {
888 let mut body = Vec::new();
889 for ssrc in &bye.sources {
890 body.extend_from_slice(&ssrc.to_be_bytes());
891 }
892 if let Some(reason) = &bye.reason {
893 let bytes = reason.as_bytes();
894 let len = bytes.len().min(255) as u8;
895 body.push(len);
896 body.extend_from_slice(&bytes[..len as usize]);
897 }
899 body
900}
901
902fn build_report_block(block: &ReportBlock) -> [u8; 24] {
903 let mut buf = [0u8; 24];
904 buf[0..4].copy_from_slice(&block.ssrc.to_be_bytes());
905 buf[4] = block.fraction_lost;
906 let clamped = block.packets_lost.clamp(-(1 << 23), (1 << 23) - 1);
907 let lost_bytes = (clamped as u32 & 0x00FF_FFFF).to_be_bytes();
908 buf[5..8].copy_from_slice(&lost_bytes[1..]);
909 buf[8..12].copy_from_slice(&block.highest_sequence.to_be_bytes());
910 buf[12..16].copy_from_slice(&block.jitter.to_be_bytes());
911 buf[16..20].copy_from_slice(&block.last_sender_report.to_be_bytes());
912 buf[20..24].copy_from_slice(&block.delay_since_last_sender_report.to_be_bytes());
913 buf
914}
915
916fn build_psfb_common(sender_ssrc: u32, media_ssrc: u32) -> Vec<u8> {
917 let mut body = Vec::with_capacity(8);
918 body.extend_from_slice(&sender_ssrc.to_be_bytes());
919 body.extend_from_slice(&media_ssrc.to_be_bytes());
920 body
921}
922
923fn build_fir_body(fir: &FullIntraRequest) -> Vec<u8> {
924 let mut body = Vec::with_capacity(8 + fir.requests.len() * 8);
925 body.extend_from_slice(&fir.sender_ssrc.to_be_bytes());
926 body.extend_from_slice(&0u32.to_be_bytes());
927 for entry in &fir.requests {
928 body.extend_from_slice(&entry.ssrc.to_be_bytes());
929 body.push(entry.sequence_number);
930 body.extend_from_slice(&[0u8; 3]);
931 }
932 body
933}
934
935fn build_nack_body(nack: &GenericNack) -> RtpResult<Vec<u8>> {
936 if nack.lost_packets.is_empty() {
937 return Err(RtpError::InvalidRtcp("NACK requires at least one packet"));
938 }
939 let pairs = pack_nack_pairs(&nack.lost_packets);
940 let mut body = Vec::with_capacity(8 + pairs.len() * 4);
941 body.extend_from_slice(&nack.sender_ssrc.to_be_bytes());
942 body.extend_from_slice(&nack.media_ssrc.to_be_bytes());
943 for (pid, blp) in pairs {
944 body.extend_from_slice(&pid.to_be_bytes());
945 body.extend_from_slice(&blp.to_be_bytes());
946 }
947 Ok(body)
948}
949
950fn build_remb_body(remb: &RemoteBitrateEstimate) -> RtpResult<Vec<u8>> {
951 if remb.ssrcs.len() > 0xFF {
952 return Err(RtpError::InvalidRtcp("too many REMB SSRC entries"));
953 }
954 let mut body = Vec::with_capacity(16 + remb.ssrcs.len() * 4);
955 body.extend_from_slice(&remb.sender_ssrc.to_be_bytes());
956 body.extend_from_slice(&0u32.to_be_bytes());
957 body.extend_from_slice(b"REMB");
958 body.push(remb.ssrcs.len() as u8);
959 let mut exponent = 0u8;
960 let mut mantissa = remb.bitrate_bps;
961 while mantissa > 0x3FFFF {
962 mantissa >>= 1;
963 exponent += 1;
964 }
965 let mantissa_u32 = mantissa as u32;
966 body.push(((exponent & 0x3F) << 2) | ((mantissa_u32 >> 16) as u8 & 0x03));
967 body.push(((mantissa_u32 >> 8) & 0xFF) as u8);
968 body.push((mantissa_u32 & 0xFF) as u8);
969 for ssrc in &remb.ssrcs {
970 body.extend_from_slice(&ssrc.to_be_bytes());
971 }
972 Ok(body)
973}
974
975fn build_twcc_body(twcc: &TransportWideCc) -> Vec<u8> {
976 let mut body = Vec::with_capacity(16 + twcc.payload.len());
977 body.extend_from_slice(&twcc.sender_ssrc.to_be_bytes());
978 body.extend_from_slice(&twcc.media_ssrc.to_be_bytes());
979 body.extend_from_slice(&twcc.base_sequence.to_be_bytes());
980 body.extend_from_slice(&twcc.packet_status_count.to_be_bytes());
981 let ref_time = twcc.reference_time_64ms & 0x00FF_FFFF;
982 body.extend_from_slice(&ref_time.to_be_bytes()[1..]);
983 body.push(twcc.feedback_packet_count);
984 body.extend_from_slice(&twcc.payload);
985 body
986}
987
988fn pack_nack_pairs(packets: &[u16]) -> Vec<(u16, u16)> {
989 let mut seqs = packets.to_vec();
990 seqs.sort_unstable();
991 seqs.dedup();
992 let mut pairs = Vec::new();
993 let mut idx = 0;
994 while idx < seqs.len() {
995 let pid = seqs[idx];
996 idx += 1;
997 let mut blp = 0u16;
998 while idx < seqs.len() {
999 let diff = seqs[idx].wrapping_sub(pid);
1000 if diff == 0 {
1001 idx += 1;
1002 continue;
1003 }
1004 if diff > 16 {
1005 break;
1006 }
1007 blp |= 1 << (diff - 1);
1008 idx += 1;
1009 }
1010 pairs.push((pid, blp));
1011 }
1012 pairs
1013}
1014
1015pub fn is_rtcp(packet: &[u8]) -> bool {
1016 packet.len() >= 2 && (192..=208).contains(&packet[1])
1017}
1018
1019#[cfg(test)]
1020mod tests {
1021 use super::*;
1022
1023 #[test]
1024 fn rtp_roundtrip() {
1025 let mut header = RtpHeader::new(96, 1000, 42, 0x1234_5678);
1026 header.marker = true;
1027 header.csrcs = vec![0x0102_0304];
1028 header.extension = Some(RtpHeaderExtension::new(0xBEDE, vec![0, 1, 2, 3]));
1029 let packet = RtpPacket {
1030 header,
1031 payload: vec![9, 8, 7, 6],
1032 padding_len: 0,
1033 };
1034 let serialized = packet.marshal().unwrap();
1035 let parsed = RtpPacket::parse(&serialized).unwrap();
1036 assert_eq!(parsed.header.payload_type, 96);
1037 assert_eq!(parsed.header.sequence_number, 1000);
1038 assert!(parsed.header.marker);
1039 assert_eq!(parsed.payload, vec![9, 8, 7, 6]);
1040 }
1041
1042 #[test]
1043 fn remb_roundtrip() {
1044 let remb = RemoteBitrateEstimate {
1045 sender_ssrc: 1,
1046 bitrate_bps: 750_000,
1047 ssrcs: vec![2, 3],
1048 };
1049 let bytes =
1050 marshal_rtcp_packets(&[RtcpPacket::RemoteBitrateEstimate(remb.clone())]).unwrap();
1051 let parsed = parse_rtcp_packets(&bytes).unwrap();
1052 match &parsed[0] {
1053 RtcpPacket::RemoteBitrateEstimate(decoded) => {
1054 assert_eq!(decoded.sender_ssrc, remb.sender_ssrc);
1055 assert_eq!(decoded.ssrcs, remb.ssrcs);
1056 }
1057 other => panic!("unexpected packet: {other:?}"),
1058 }
1059 }
1060
1061 #[test]
1062 fn nack_pair_encoding() {
1063 let pairs = pack_nack_pairs(&[10, 11, 12, 30]);
1064 assert_eq!(pairs, vec![(10, 0b0000_0000_0000_0011), (30, 0)]);
1065 }
1066
1067 #[test]
1068 fn pli_roundtrip() {
1069 let pli = RtcpPacket::PictureLossIndication(PictureLossIndication {
1070 sender_ssrc: 1,
1071 media_ssrc: 2,
1072 });
1073 let bytes = marshal_rtcp_packets(&[pli.clone()]).unwrap();
1074 let parsed = parse_rtcp_packets(&bytes).unwrap();
1075 assert!(matches!(parsed[0], RtcpPacket::PictureLossIndication(_)));
1076 }
1077
1078 #[test]
1079 fn generic_nack_roundtrip() {
1080 let nack = RtcpPacket::GenericNack(GenericNack {
1081 sender_ssrc: 5,
1082 media_ssrc: 6,
1083 lost_packets: vec![100, 102],
1084 });
1085 let bytes = marshal_rtcp_packets(&[nack.clone()]).unwrap();
1086 let parsed = parse_rtcp_packets(&bytes).unwrap();
1087 match &parsed[0] {
1088 RtcpPacket::GenericNack(out) => {
1089 assert_eq!(out.sender_ssrc, 5);
1090 assert_eq!(out.media_ssrc, 6);
1091 assert_eq!(out.lost_packets.len(), 2);
1092 }
1093 other => panic!("unexpected packet: {other:?}"),
1094 }
1095 }
1096
1097 #[test]
1098 fn rtcp_detection() {
1099 let pli =
1100 marshal_rtcp_packets(&[RtcpPacket::PictureLossIndication(PictureLossIndication {
1101 sender_ssrc: 1,
1102 media_ssrc: 2,
1103 })])
1104 .unwrap();
1105 assert!(is_rtcp(&pli));
1106 }
1107
1108 #[test]
1109 fn sdes_roundtrip() {
1110 let sdes = SourceDescription {
1111 chunks: vec![SdesChunk {
1112 ssrc: 0x12345678,
1113 items: vec![
1114 SdesItem {
1115 ty: 1, text: "user@host".to_string(),
1117 },
1118 SdesItem {
1119 ty: 2, text: "My Name".to_string(),
1121 },
1122 ],
1123 }],
1124 };
1125 let packet = RtcpPacket::SourceDescription(sdes.clone());
1126 let bytes = marshal_rtcp_packets(&[packet]).unwrap();
1127 let parsed = parse_rtcp_packets(&bytes).unwrap();
1128
1129 match &parsed[0] {
1130 RtcpPacket::SourceDescription(decoded) => {
1131 assert_eq!(decoded.chunks.len(), 1);
1132 let chunk = &decoded.chunks[0];
1133 assert_eq!(chunk.ssrc, 0x12345678);
1134 assert_eq!(chunk.items.len(), 2);
1135 assert_eq!(chunk.items[0].ty, 1);
1136 assert_eq!(chunk.items[0].text, "user@host");
1137 assert_eq!(chunk.items[1].ty, 2);
1138 assert_eq!(chunk.items[1].text, "My Name");
1139 }
1140 other => panic!("unexpected packet: {other:?}"),
1141 }
1142 }
1143
1144 #[test]
1145 fn test_set_extension() {
1146 let mut header = RtpHeader::new(96, 1000, 42, 0x1234_5678);
1147 header.set_extension(1, &[0xAA, 0xBB, 0xCC]).unwrap();
1148
1149 let ext = header.extension.as_ref().unwrap();
1150 assert_eq!(ext.profile, 0xBEDE);
1151 assert_eq!(ext.data[0..4], [0x12, 0xAA, 0xBB, 0xCC]);
1154
1155 header.set_extension(1, &[0x11, 0x22]).unwrap();
1157 let ext_updated = header.extension.as_ref().unwrap();
1158 assert_eq!(ext_updated.data[0..4], [0x11, 0x11, 0x22, 0x00]);
1162
1163 header.set_extension(2, &[0xFF]).unwrap();
1165 assert!(header.get_extension(1).is_some());
1169 assert!(header.get_extension(2).is_some());
1170 assert_eq!(header.get_extension(1).unwrap(), vec![0x11, 0x22]);
1171 assert_eq!(header.get_extension(2).unwrap(), vec![0xFF]);
1172 }
1173
1174 #[test]
1175 fn test_abs_send_time_calculation() {
1176 let t = SystemTime::UNIX_EPOCH;
1177 let abs = calculate_abs_send_time(t);
1178 assert_eq!(abs, 0); let t2 = t + std::time::Duration::from_secs(1);
1181 let abs2 = calculate_abs_send_time(t2);
1182 assert_eq!(abs2, 0x40000); }
1184}