1use tokio::net::UdpSocket;
9use tokio::select;
10use tokio_util::sync::CancellationToken;
11use tracing::{debug, info, warn};
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub struct RtpHeader {
16 pub version: u8,
17 pub padding: bool,
18 pub extension: bool,
19 pub csrc_count: u8,
20 pub marker: bool,
21 pub payload_type: u8,
22 pub sequence: u16,
23 pub timestamp: u32,
24 pub ssrc: u32,
25}
26
27impl RtpHeader {
28 pub fn parse(buf: &[u8]) -> Option<Self> {
31 if buf.len() < 12 {
32 return None;
33 }
34
35 let version = (buf[0] >> 6) & 0x03;
36 if version != 2 {
37 return None;
38 }
39
40 let padding = (buf[0] >> 5) & 0x01 != 0;
41 let extension = (buf[0] >> 4) & 0x01 != 0;
42 let csrc_count = buf[0] & 0x0F;
43 let marker = (buf[1] >> 7) & 0x01 != 0;
44 let payload_type = buf[1] & 0x7F;
45 let sequence = u16::from_be_bytes([buf[2], buf[3]]);
46 let timestamp = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
47 let ssrc = u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]);
48
49 Some(Self {
50 version,
51 padding,
52 extension,
53 csrc_count,
54 marker,
55 payload_type,
56 sequence,
57 timestamp,
58 ssrc,
59 })
60 }
61
62 pub fn header_len(&self) -> usize {
64 12 + 4 * self.csrc_count as usize
65 }
66}
67
68fn payload_type_name(pt: u8) -> &'static str {
70 match pt {
71 0 => "PCMU",
72 8 => "PCMA",
73 _ => "unknown",
74 }
75}
76
77pub async fn receive_rtp(socket: UdpSocket, cancel: CancellationToken) {
81 let mut buf = [0u8; 2048];
82 let mut count = 0u64;
83
84 let local = socket
85 .local_addr()
86 .map(|a| a.to_string())
87 .unwrap_or_else(|_| "<unknown>".into());
88 info!("RTP receiver started on {local}");
89
90 loop {
91 select! {
92 result = socket.recv_from(&mut buf) => {
93 match result {
94 Ok((len, from)) => {
95 if let Some(header) = RtpHeader::parse(&buf[..len]) {
96 count += 1;
97 let payload_len = len.saturating_sub(header.header_len());
98 debug!(
99 "RTP #{} | PT={} ({}) | TS={} | SSRC=0x{:08X} | {} bytes from {}",
100 header.sequence,
101 header.payload_type,
102 payload_type_name(header.payload_type),
103 header.timestamp,
104 header.ssrc,
105 payload_len,
106 from,
107 );
108
109 if count.is_multiple_of(100) {
110 info!("Received {count} RTP packets so far");
111 }
112 } else {
113 warn!("Non-RTP packet ({len} bytes) from {from}");
114 }
115 }
116 Err(e) => {
117 warn!("RTP recv error: {e}");
118 break;
119 }
120 }
121 }
122 _ = cancel.cancelled() => break,
123 }
124 }
125
126 info!("RTP receiver stopped. Total packets: {count}");
127}
128
129#[cfg(test)]
130mod tests {
131 use super::*;
132
133 fn make_packet(version: u8, pt: u8, seq: u16, ts: u32, ssrc: u32) -> Vec<u8> {
134 let mut buf = vec![0u8; 12];
135 buf[0] = (version << 6) & 0xC0; buf[1] = pt & 0x7F; buf[2..4].copy_from_slice(&seq.to_be_bytes());
138 buf[4..8].copy_from_slice(&ts.to_be_bytes());
139 buf[8..12].copy_from_slice(&ssrc.to_be_bytes());
140 buf
141 }
142
143 #[test]
144 fn parse_minimum_header() {
145 let buf = make_packet(2, 0, 1234, 5678, 0xDEADBEEF);
146 let h = RtpHeader::parse(&buf).unwrap();
147 assert_eq!(h.version, 2);
148 assert_eq!(h.payload_type, 0);
149 assert_eq!(h.sequence, 1234);
150 assert_eq!(h.timestamp, 5678);
151 assert_eq!(h.ssrc, 0xDEADBEEF);
152 assert_eq!(h.csrc_count, 0);
153 assert_eq!(h.header_len(), 12);
154 }
155
156 #[test]
157 fn parse_rejects_short_buffer() {
158 let buf = vec![0u8; 11];
159 assert!(RtpHeader::parse(&buf).is_none());
160 }
161
162 #[test]
163 fn parse_rejects_wrong_version() {
164 let buf = make_packet(1, 0, 0, 0, 0);
165 assert!(RtpHeader::parse(&buf).is_none());
166 }
167
168 #[test]
169 fn parse_extracts_marker_bit() {
170 let mut buf = make_packet(2, 8, 0, 0, 0);
171 buf[1] |= 0x80; let h = RtpHeader::parse(&buf).unwrap();
173 assert!(h.marker);
174 assert_eq!(h.payload_type, 8); }
176
177 #[test]
178 fn header_len_accounts_for_csrcs() {
179 let mut buf = make_packet(2, 0, 0, 0, 0);
180 buf[0] |= 0x03;
182 buf.extend(std::iter::repeat_n(0u8, 12));
183 let h = RtpHeader::parse(&buf).unwrap();
184 assert_eq!(h.csrc_count, 3);
185 assert_eq!(h.header_len(), 24);
186 }
187
188 #[test]
189 fn payload_type_names() {
190 assert_eq!(payload_type_name(0), "PCMU");
191 assert_eq!(payload_type_name(8), "PCMA");
192 assert_eq!(payload_type_name(127), "unknown");
193 }
194}