1use std::net::SocketAddr;
10use std::sync::Arc;
11
12use tokio::net::UdpSocket;
13use tokio::select;
14use tokio::sync::mpsc;
15use tokio_util::sync::CancellationToken;
16use tracing::{debug, info, warn};
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
20pub struct RtpHeader {
21 pub version: u8,
22 pub padding: bool,
23 pub extension: bool,
24 pub csrc_count: u8,
25 pub marker: bool,
26 pub payload_type: u8,
27 pub sequence: u16,
28 pub timestamp: u32,
29 pub ssrc: u32,
30}
31
32impl RtpHeader {
33 pub fn parse(buf: &[u8]) -> Option<Self> {
36 if buf.len() < 12 {
37 return None;
38 }
39
40 let version = (buf[0] >> 6) & 0x03;
41 if version != 2 {
42 return None;
43 }
44
45 let padding = (buf[0] >> 5) & 0x01 != 0;
46 let extension = (buf[0] >> 4) & 0x01 != 0;
47 let csrc_count = buf[0] & 0x0F;
48 let marker = (buf[1] >> 7) & 0x01 != 0;
49 let payload_type = buf[1] & 0x7F;
50 let sequence = u16::from_be_bytes([buf[2], buf[3]]);
51 let timestamp = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
52 let ssrc = u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]);
53
54 Some(Self {
55 version,
56 padding,
57 extension,
58 csrc_count,
59 marker,
60 payload_type,
61 sequence,
62 timestamp,
63 ssrc,
64 })
65 }
66
67 pub fn header_len(&self) -> usize {
69 12 + 4 * self.csrc_count as usize
70 }
71}
72
73fn payload_type_name(pt: u8) -> &'static str {
75 match pt {
76 0 => "PCMU",
77 8 => "PCMA",
78 _ => "unknown",
79 }
80}
81
82pub async fn receive_rtp(socket: UdpSocket, cancel: CancellationToken) {
86 let mut buf = [0u8; 2048];
87 let mut count = 0u64;
88
89 let local = socket
90 .local_addr()
91 .map(|a| a.to_string())
92 .unwrap_or_else(|_| "<unknown>".into());
93 info!("RTP receiver started on {local}");
94
95 loop {
96 select! {
97 result = socket.recv_from(&mut buf) => {
98 match result {
99 Ok((len, from)) => {
100 if let Some(header) = RtpHeader::parse(&buf[..len]) {
101 count += 1;
102 let payload_len = len.saturating_sub(header.header_len());
103 debug!(
104 "RTP #{} | PT={} ({}) | TS={} | SSRC=0x{:08X} | {} bytes from {}",
105 header.sequence,
106 header.payload_type,
107 payload_type_name(header.payload_type),
108 header.timestamp,
109 header.ssrc,
110 payload_len,
111 from,
112 );
113
114 if count.is_multiple_of(100) {
115 info!("Received {count} RTP packets so far");
116 }
117 } else {
118 warn!("Non-RTP packet ({len} bytes) from {from}");
119 }
120 }
121 Err(e) => {
122 warn!("RTP recv error: {e}");
123 break;
124 }
125 }
126 }
127 _ = cancel.cancelled() => break,
128 }
129 }
130
131 info!("RTP receiver stopped. Total packets: {count}");
132}
133
134#[derive(Debug, Clone, Copy)]
138pub struct RtpSendConfig {
139 pub payload_type: u8,
141 pub ssrc: u32,
144 pub initial_seq: u16,
146 pub initial_timestamp: u32,
149 pub samples_per_frame: u32,
152}
153
154pub async fn send_loop(
173 socket: Arc<UdpSocket>,
174 remote: SocketAddr,
175 config: RtpSendConfig,
176 mut payloads: mpsc::Receiver<Vec<u8>>,
177 cancel: CancellationToken,
178) {
179 let mut seq = config.initial_seq;
180 let mut ts = config.initial_timestamp;
181 let mut count: u64 = 0;
182 let mut packet = Vec::with_capacity(12 + 256);
183
184 let local = socket
185 .local_addr()
186 .map(|a| a.to_string())
187 .unwrap_or_else(|_| "<unknown>".into());
188 info!(
189 "RTP sender started {local} → {remote} (PT={}, SSRC=0x{:08X})",
190 config.payload_type, config.ssrc
191 );
192
193 loop {
194 select! {
195 _ = cancel.cancelled() => break,
196 maybe = payloads.recv() => {
197 let Some(payload) = maybe else { break };
198 packet.clear();
199 packet.push(0x80);
201 packet.push(config.payload_type & 0x7F);
205 packet.extend_from_slice(&seq.to_be_bytes());
206 packet.extend_from_slice(&ts.to_be_bytes());
207 packet.extend_from_slice(&config.ssrc.to_be_bytes());
208 packet.extend_from_slice(&payload);
209
210 if let Err(err) = socket.send_to(&packet, remote).await {
211 warn!("RTP send error: {err}");
212 break;
213 }
214 count += 1;
215 seq = seq.wrapping_add(1);
216 ts = ts.wrapping_add(config.samples_per_frame);
217
218 if count.is_multiple_of(100) {
219 debug!("sent {count} RTP packets");
220 }
221 }
222 }
223 }
224
225 info!("RTP sender stopped. Total packets: {count}");
226}
227
228#[cfg(test)]
229mod tests {
230 use super::*;
231
232 fn make_packet(version: u8, pt: u8, seq: u16, ts: u32, ssrc: u32) -> Vec<u8> {
233 let mut buf = vec![0u8; 12];
234 buf[0] = (version << 6) & 0xC0; buf[1] = pt & 0x7F; buf[2..4].copy_from_slice(&seq.to_be_bytes());
237 buf[4..8].copy_from_slice(&ts.to_be_bytes());
238 buf[8..12].copy_from_slice(&ssrc.to_be_bytes());
239 buf
240 }
241
242 #[test]
243 fn parse_minimum_header() {
244 let buf = make_packet(2, 0, 1234, 5678, 0xDEADBEEF);
245 let h = RtpHeader::parse(&buf).unwrap();
246 assert_eq!(h.version, 2);
247 assert_eq!(h.payload_type, 0);
248 assert_eq!(h.sequence, 1234);
249 assert_eq!(h.timestamp, 5678);
250 assert_eq!(h.ssrc, 0xDEADBEEF);
251 assert_eq!(h.csrc_count, 0);
252 assert_eq!(h.header_len(), 12);
253 }
254
255 #[test]
256 fn parse_rejects_short_buffer() {
257 let buf = vec![0u8; 11];
258 assert!(RtpHeader::parse(&buf).is_none());
259 }
260
261 #[test]
262 fn parse_rejects_wrong_version() {
263 let buf = make_packet(1, 0, 0, 0, 0);
264 assert!(RtpHeader::parse(&buf).is_none());
265 }
266
267 #[test]
268 fn parse_extracts_marker_bit() {
269 let mut buf = make_packet(2, 8, 0, 0, 0);
270 buf[1] |= 0x80; let h = RtpHeader::parse(&buf).unwrap();
272 assert!(h.marker);
273 assert_eq!(h.payload_type, 8); }
275
276 #[test]
277 fn header_len_accounts_for_csrcs() {
278 let mut buf = make_packet(2, 0, 0, 0, 0);
279 buf[0] |= 0x03;
281 buf.extend(std::iter::repeat_n(0u8, 12));
282 let h = RtpHeader::parse(&buf).unwrap();
283 assert_eq!(h.csrc_count, 3);
284 assert_eq!(h.header_len(), 24);
285 }
286
287 #[test]
288 fn payload_type_names() {
289 assert_eq!(payload_type_name(0), "PCMU");
290 assert_eq!(payload_type_name(8), "PCMA");
291 assert_eq!(payload_type_name(127), "unknown");
292 }
293
294 async fn loopback_pair() -> (UdpSocket, UdpSocket) {
298 let a = UdpSocket::bind("127.0.0.1:0").await.unwrap();
299 let b = UdpSocket::bind("127.0.0.1:0").await.unwrap();
300 (a, b)
301 }
302
303 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
304 async fn send_loop_packetizes_payloads_into_rtp() {
305 let (sender, receiver) = loopback_pair().await;
308 let remote = receiver.local_addr().unwrap();
309 let sender = Arc::new(sender);
310
311 let (tx, rx) = mpsc::channel::<Vec<u8>>(4);
312 let cancel = CancellationToken::new();
313
314 let task = tokio::spawn({
315 let sender = sender.clone();
316 let cancel = cancel.clone();
317 async move {
318 send_loop(
319 sender,
320 remote,
321 RtpSendConfig {
322 payload_type: 0, ssrc: 0xCAFEBABE,
324 initial_seq: 1000,
325 initial_timestamp: 5000,
326 samples_per_frame: 160,
327 },
328 rx,
329 cancel,
330 )
331 .await;
332 }
333 });
334
335 let payload: Vec<u8> = (0..160).map(|i| i as u8).collect();
339 tx.send(payload.clone()).await.unwrap();
340
341 let mut buf = [0u8; 2048];
342 let (n, from) = tokio::time::timeout(
343 std::time::Duration::from_millis(500),
344 receiver.recv_from(&mut buf),
345 )
346 .await
347 .expect("receiver got packet in time")
348 .expect("recv_from ok");
349
350 assert_eq!(from, sender.local_addr().unwrap());
351 assert_eq!(n, 12 + 160);
352
353 let header = RtpHeader::parse(&buf[..n]).expect("parses as RTP");
354 assert_eq!(header.version, 2);
355 assert_eq!(header.payload_type, 0);
356 assert_eq!(header.sequence, 1000);
357 assert_eq!(header.timestamp, 5000);
358 assert_eq!(header.ssrc, 0xCAFEBABE);
359 assert_eq!(header.csrc_count, 0);
360 assert!(!header.marker);
361 assert!(!header.padding);
362 assert!(!header.extension);
363
364 assert_eq!(&buf[12..n], &payload[..]);
365
366 cancel.cancel();
367 let _ = task.await;
368 }
369
370 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
371 async fn send_loop_advances_seq_and_timestamp_per_packet() {
372 let (sender, receiver) = loopback_pair().await;
376 let remote = receiver.local_addr().unwrap();
377 let sender = Arc::new(sender);
378
379 let (tx, rx) = mpsc::channel::<Vec<u8>>(8);
380 let cancel = CancellationToken::new();
381
382 let task = tokio::spawn({
383 let sender = sender.clone();
384 let cancel = cancel.clone();
385 async move {
386 send_loop(
387 sender,
388 remote,
389 RtpSendConfig {
390 payload_type: 8, ssrc: 0xDEADBEEF,
392 initial_seq: u16::MAX, initial_timestamp: 100,
394 samples_per_frame: 160,
395 },
396 rx,
397 cancel,
398 )
399 .await;
400 }
401 });
402
403 for i in 0..3u8 {
404 tx.send(vec![i; 4]).await.unwrap();
405 }
406
407 let mut headers = Vec::new();
408 let mut buf = [0u8; 2048];
409 for _ in 0..3 {
410 let (n, _) = tokio::time::timeout(
411 std::time::Duration::from_millis(500),
412 receiver.recv_from(&mut buf),
413 )
414 .await
415 .unwrap()
416 .unwrap();
417 headers.push(RtpHeader::parse(&buf[..n]).unwrap());
418 }
419
420 assert_eq!(headers[0].sequence, u16::MAX);
422 assert_eq!(headers[1].sequence, 0);
423 assert_eq!(headers[2].sequence, 1);
424
425 assert_eq!(headers[0].timestamp, 100);
427 assert_eq!(headers[1].timestamp, 260);
428 assert_eq!(headers[2].timestamp, 420);
429
430 for h in &headers {
432 assert_eq!(h.payload_type, 8);
433 assert_eq!(h.ssrc, 0xDEADBEEF);
434 }
435
436 cancel.cancel();
437 let _ = task.await;
438 }
439
440 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
441 async fn send_loop_exits_when_payload_channel_closes() {
442 let (sender, _receiver) = loopback_pair().await;
447 let remote = "127.0.0.1:9".parse().unwrap();
448 let sender = Arc::new(sender);
449
450 let (tx, rx) = mpsc::channel::<Vec<u8>>(1);
451 let cancel = CancellationToken::new();
452
453 let task = tokio::spawn({
454 let sender = sender.clone();
455 let cancel = cancel.clone();
456 async move {
457 send_loop(
458 sender,
459 remote,
460 RtpSendConfig {
461 payload_type: 0,
462 ssrc: 1,
463 initial_seq: 0,
464 initial_timestamp: 0,
465 samples_per_frame: 160,
466 },
467 rx,
468 cancel,
469 )
470 .await;
471 }
472 });
473
474 drop(tx);
475
476 tokio::time::timeout(std::time::Duration::from_millis(500), task)
477 .await
478 .expect("send_loop exited within timeout")
479 .expect("task did not panic");
480 }
481
482 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
483 async fn send_loop_exits_on_cancel() {
484 let (sender, _receiver) = loopback_pair().await;
488 let remote = "127.0.0.1:9".parse().unwrap();
489 let sender = Arc::new(sender);
490
491 let (_tx, rx) = mpsc::channel::<Vec<u8>>(1);
492 let cancel = CancellationToken::new();
493
494 let task = tokio::spawn({
495 let sender = sender.clone();
496 let cancel = cancel.clone();
497 async move {
498 send_loop(
499 sender,
500 remote,
501 RtpSendConfig {
502 payload_type: 0,
503 ssrc: 1,
504 initial_seq: 0,
505 initial_timestamp: 0,
506 samples_per_frame: 160,
507 },
508 rx,
509 cancel,
510 )
511 .await;
512 }
513 });
514
515 cancel.cancel();
516 tokio::time::timeout(std::time::Duration::from_millis(500), task)
517 .await
518 .expect("send_loop exited within timeout")
519 .expect("task did not panic");
520 }
521}