1pub mod dtmf;
12
13use std::net::SocketAddr;
14use std::sync::Arc;
15
16use tokio::net::UdpSocket;
17use tokio::select;
18use tokio::sync::mpsc;
19use tokio_util::sync::CancellationToken;
20use tracing::{debug, info, warn};
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub struct RtpHeader {
25 pub version: u8,
26 pub padding: bool,
27 pub extension: bool,
28 pub csrc_count: u8,
29 pub marker: bool,
30 pub payload_type: u8,
31 pub sequence: u16,
32 pub timestamp: u32,
33 pub ssrc: u32,
34}
35
36impl RtpHeader {
37 pub fn parse(buf: &[u8]) -> Option<Self> {
40 if buf.len() < 12 {
41 return None;
42 }
43
44 let version = (buf[0] >> 6) & 0x03;
45 if version != 2 {
46 return None;
47 }
48
49 let padding = (buf[0] >> 5) & 0x01 != 0;
50 let extension = (buf[0] >> 4) & 0x01 != 0;
51 let csrc_count = buf[0] & 0x0F;
52 let marker = (buf[1] >> 7) & 0x01 != 0;
53 let payload_type = buf[1] & 0x7F;
54 let sequence = u16::from_be_bytes([buf[2], buf[3]]);
55 let timestamp = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
56 let ssrc = u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]);
57
58 Some(Self {
59 version,
60 padding,
61 extension,
62 csrc_count,
63 marker,
64 payload_type,
65 sequence,
66 timestamp,
67 ssrc,
68 })
69 }
70
71 pub fn header_len(&self) -> usize {
73 12 + 4 * self.csrc_count as usize
74 }
75}
76
77fn payload_type_name(pt: u8) -> &'static str {
79 match pt {
80 0 => "PCMU",
81 8 => "PCMA",
82 _ => "unknown",
83 }
84}
85
86pub async fn receive_rtp(socket: UdpSocket, cancel: CancellationToken) {
90 let mut buf = [0u8; 2048];
91 let mut count = 0u64;
92
93 let local = socket
94 .local_addr()
95 .map(|a| a.to_string())
96 .unwrap_or_else(|_| "<unknown>".into());
97 info!("RTP receiver started on {local}");
98
99 loop {
100 select! {
101 result = socket.recv_from(&mut buf) => {
102 match result {
103 Ok((len, from)) => {
104 if let Some(header) = RtpHeader::parse(&buf[..len]) {
105 count += 1;
106 let payload_len = len.saturating_sub(header.header_len());
107 debug!(
108 "RTP #{} | PT={} ({}) | TS={} | SSRC=0x{:08X} | {} bytes from {}",
109 header.sequence,
110 header.payload_type,
111 payload_type_name(header.payload_type),
112 header.timestamp,
113 header.ssrc,
114 payload_len,
115 from,
116 );
117
118 if count.is_multiple_of(100) {
119 info!("Received {count} RTP packets so far");
120 }
121 } else {
122 warn!("Non-RTP packet ({len} bytes) from {from}");
123 }
124 }
125 Err(e) => {
126 warn!("RTP recv error: {e}");
127 break;
128 }
129 }
130 }
131 _ = cancel.cancelled() => break,
132 }
133 }
134
135 info!("RTP receiver stopped. Total packets: {count}");
136}
137
138#[derive(Debug, Clone, Copy)]
142pub struct RtpSendConfig {
143 pub payload_type: u8,
145 pub ssrc: u32,
148 pub initial_seq: u16,
150 pub initial_timestamp: u32,
153 pub samples_per_frame: u32,
156}
157
158pub async fn send_loop(
177 socket: Arc<UdpSocket>,
178 remote: SocketAddr,
179 config: RtpSendConfig,
180 mut payloads: mpsc::Receiver<Vec<u8>>,
181 cancel: CancellationToken,
182) {
183 let mut seq = config.initial_seq;
184 let mut ts = config.initial_timestamp;
185 let mut count: u64 = 0;
186 let mut packet = Vec::with_capacity(12 + 256);
187
188 let local = socket
189 .local_addr()
190 .map(|a| a.to_string())
191 .unwrap_or_else(|_| "<unknown>".into());
192 info!(
193 "RTP sender started {local} → {remote} (PT={}, SSRC=0x{:08X})",
194 config.payload_type, config.ssrc
195 );
196
197 loop {
198 select! {
199 _ = cancel.cancelled() => break,
200 maybe = payloads.recv() => {
201 let Some(payload) = maybe else { break };
202 packet.clear();
203 packet.push(0x80);
205 packet.push(config.payload_type & 0x7F);
209 packet.extend_from_slice(&seq.to_be_bytes());
210 packet.extend_from_slice(&ts.to_be_bytes());
211 packet.extend_from_slice(&config.ssrc.to_be_bytes());
212 packet.extend_from_slice(&payload);
213
214 if let Err(err) = socket.send_to(&packet, remote).await {
215 warn!("RTP send error: {err}");
216 break;
217 }
218 count += 1;
219 seq = seq.wrapping_add(1);
220 ts = ts.wrapping_add(config.samples_per_frame);
221
222 if count.is_multiple_of(100) {
223 debug!("sent {count} RTP packets");
224 }
225 }
226 }
227 }
228
229 info!("RTP sender stopped. Total packets: {count}");
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235
236 fn make_packet(version: u8, pt: u8, seq: u16, ts: u32, ssrc: u32) -> Vec<u8> {
237 let mut buf = vec![0u8; 12];
238 buf[0] = (version << 6) & 0xC0; buf[1] = pt & 0x7F; buf[2..4].copy_from_slice(&seq.to_be_bytes());
241 buf[4..8].copy_from_slice(&ts.to_be_bytes());
242 buf[8..12].copy_from_slice(&ssrc.to_be_bytes());
243 buf
244 }
245
246 #[test]
247 fn parse_minimum_header() {
248 let buf = make_packet(2, 0, 1234, 5678, 0xDEADBEEF);
249 let h = RtpHeader::parse(&buf).unwrap();
250 assert_eq!(h.version, 2);
251 assert_eq!(h.payload_type, 0);
252 assert_eq!(h.sequence, 1234);
253 assert_eq!(h.timestamp, 5678);
254 assert_eq!(h.ssrc, 0xDEADBEEF);
255 assert_eq!(h.csrc_count, 0);
256 assert_eq!(h.header_len(), 12);
257 }
258
259 #[test]
260 fn parse_rejects_short_buffer() {
261 let buf = vec![0u8; 11];
262 assert!(RtpHeader::parse(&buf).is_none());
263 }
264
265 #[test]
266 fn parse_rejects_wrong_version() {
267 let buf = make_packet(1, 0, 0, 0, 0);
268 assert!(RtpHeader::parse(&buf).is_none());
269 }
270
271 #[test]
272 fn parse_extracts_marker_bit() {
273 let mut buf = make_packet(2, 8, 0, 0, 0);
274 buf[1] |= 0x80; let h = RtpHeader::parse(&buf).unwrap();
276 assert!(h.marker);
277 assert_eq!(h.payload_type, 8); }
279
280 #[test]
281 fn header_len_accounts_for_csrcs() {
282 let mut buf = make_packet(2, 0, 0, 0, 0);
283 buf[0] |= 0x03;
285 buf.extend(std::iter::repeat_n(0u8, 12));
286 let h = RtpHeader::parse(&buf).unwrap();
287 assert_eq!(h.csrc_count, 3);
288 assert_eq!(h.header_len(), 24);
289 }
290
291 #[test]
292 fn payload_type_names() {
293 assert_eq!(payload_type_name(0), "PCMU");
294 assert_eq!(payload_type_name(8), "PCMA");
295 assert_eq!(payload_type_name(127), "unknown");
296 }
297
298 async fn loopback_pair() -> (UdpSocket, UdpSocket) {
302 let a = UdpSocket::bind("127.0.0.1:0").await.unwrap();
303 let b = UdpSocket::bind("127.0.0.1:0").await.unwrap();
304 (a, b)
305 }
306
307 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
308 async fn send_loop_packetizes_payloads_into_rtp() {
309 let (sender, receiver) = loopback_pair().await;
312 let remote = receiver.local_addr().unwrap();
313 let sender = Arc::new(sender);
314
315 let (tx, rx) = mpsc::channel::<Vec<u8>>(4);
316 let cancel = CancellationToken::new();
317
318 let task = tokio::spawn({
319 let sender = sender.clone();
320 let cancel = cancel.clone();
321 async move {
322 send_loop(
323 sender,
324 remote,
325 RtpSendConfig {
326 payload_type: 0, ssrc: 0xCAFEBABE,
328 initial_seq: 1000,
329 initial_timestamp: 5000,
330 samples_per_frame: 160,
331 },
332 rx,
333 cancel,
334 )
335 .await;
336 }
337 });
338
339 let payload: Vec<u8> = (0..160).map(|i| i as u8).collect();
343 tx.send(payload.clone()).await.unwrap();
344
345 let mut buf = [0u8; 2048];
346 let (n, from) = tokio::time::timeout(
347 std::time::Duration::from_millis(500),
348 receiver.recv_from(&mut buf),
349 )
350 .await
351 .expect("receiver got packet in time")
352 .expect("recv_from ok");
353
354 assert_eq!(from, sender.local_addr().unwrap());
355 assert_eq!(n, 12 + 160);
356
357 let header = RtpHeader::parse(&buf[..n]).expect("parses as RTP");
358 assert_eq!(header.version, 2);
359 assert_eq!(header.payload_type, 0);
360 assert_eq!(header.sequence, 1000);
361 assert_eq!(header.timestamp, 5000);
362 assert_eq!(header.ssrc, 0xCAFEBABE);
363 assert_eq!(header.csrc_count, 0);
364 assert!(!header.marker);
365 assert!(!header.padding);
366 assert!(!header.extension);
367
368 assert_eq!(&buf[12..n], &payload[..]);
369
370 cancel.cancel();
371 let _ = task.await;
372 }
373
374 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
375 async fn send_loop_advances_seq_and_timestamp_per_packet() {
376 let (sender, receiver) = loopback_pair().await;
380 let remote = receiver.local_addr().unwrap();
381 let sender = Arc::new(sender);
382
383 let (tx, rx) = mpsc::channel::<Vec<u8>>(8);
384 let cancel = CancellationToken::new();
385
386 let task = tokio::spawn({
387 let sender = sender.clone();
388 let cancel = cancel.clone();
389 async move {
390 send_loop(
391 sender,
392 remote,
393 RtpSendConfig {
394 payload_type: 8, ssrc: 0xDEADBEEF,
396 initial_seq: u16::MAX, initial_timestamp: 100,
398 samples_per_frame: 160,
399 },
400 rx,
401 cancel,
402 )
403 .await;
404 }
405 });
406
407 for i in 0..3u8 {
408 tx.send(vec![i; 4]).await.unwrap();
409 }
410
411 let mut headers = Vec::new();
412 let mut buf = [0u8; 2048];
413 for _ in 0..3 {
414 let (n, _) = tokio::time::timeout(
415 std::time::Duration::from_millis(500),
416 receiver.recv_from(&mut buf),
417 )
418 .await
419 .unwrap()
420 .unwrap();
421 headers.push(RtpHeader::parse(&buf[..n]).unwrap());
422 }
423
424 assert_eq!(headers[0].sequence, u16::MAX);
426 assert_eq!(headers[1].sequence, 0);
427 assert_eq!(headers[2].sequence, 1);
428
429 assert_eq!(headers[0].timestamp, 100);
431 assert_eq!(headers[1].timestamp, 260);
432 assert_eq!(headers[2].timestamp, 420);
433
434 for h in &headers {
436 assert_eq!(h.payload_type, 8);
437 assert_eq!(h.ssrc, 0xDEADBEEF);
438 }
439
440 cancel.cancel();
441 let _ = task.await;
442 }
443
444 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
445 async fn send_loop_exits_when_payload_channel_closes() {
446 let (sender, _receiver) = loopback_pair().await;
451 let remote = "127.0.0.1:9".parse().unwrap();
452 let sender = Arc::new(sender);
453
454 let (tx, rx) = mpsc::channel::<Vec<u8>>(1);
455 let cancel = CancellationToken::new();
456
457 let task = tokio::spawn({
458 let sender = sender.clone();
459 let cancel = cancel.clone();
460 async move {
461 send_loop(
462 sender,
463 remote,
464 RtpSendConfig {
465 payload_type: 0,
466 ssrc: 1,
467 initial_seq: 0,
468 initial_timestamp: 0,
469 samples_per_frame: 160,
470 },
471 rx,
472 cancel,
473 )
474 .await;
475 }
476 });
477
478 drop(tx);
479
480 tokio::time::timeout(std::time::Duration::from_millis(500), task)
481 .await
482 .expect("send_loop exited within timeout")
483 .expect("task did not panic");
484 }
485
486 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
487 async fn send_loop_exits_on_cancel() {
488 let (sender, _receiver) = loopback_pair().await;
492 let remote = "127.0.0.1:9".parse().unwrap();
493 let sender = Arc::new(sender);
494
495 let (_tx, rx) = mpsc::channel::<Vec<u8>>(1);
496 let cancel = CancellationToken::new();
497
498 let task = tokio::spawn({
499 let sender = sender.clone();
500 let cancel = cancel.clone();
501 async move {
502 send_loop(
503 sender,
504 remote,
505 RtpSendConfig {
506 payload_type: 0,
507 ssrc: 1,
508 initial_seq: 0,
509 initial_timestamp: 0,
510 samples_per_frame: 160,
511 },
512 rx,
513 cancel,
514 )
515 .await;
516 }
517 });
518
519 cancel.cancel();
520 tokio::time::timeout(std::time::Duration::from_millis(500), task)
521 .await
522 .expect("send_loop exited within timeout")
523 .expect("task did not panic");
524 }
525}