1pub mod dtmf;
13pub mod dtmf_recv;
14
15use std::net::SocketAddr;
16use std::sync::Arc;
17
18use tokio::net::UdpSocket;
19use tokio::select;
20use tokio::sync::mpsc;
21use tokio_util::sync::CancellationToken;
22use tracing::{debug, info, warn};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub struct RtpHeader {
27 pub version: u8,
28 pub padding: bool,
29 pub extension: bool,
30 pub csrc_count: u8,
31 pub marker: bool,
32 pub payload_type: u8,
33 pub sequence: u16,
34 pub timestamp: u32,
35 pub ssrc: u32,
36}
37
38impl RtpHeader {
39 pub fn parse(buf: &[u8]) -> Option<Self> {
42 if buf.len() < 12 {
43 return None;
44 }
45
46 let version = (buf[0] >> 6) & 0x03;
47 if version != 2 {
48 return None;
49 }
50
51 let padding = (buf[0] >> 5) & 0x01 != 0;
52 let extension = (buf[0] >> 4) & 0x01 != 0;
53 let csrc_count = buf[0] & 0x0F;
54 let marker = (buf[1] >> 7) & 0x01 != 0;
55 let payload_type = buf[1] & 0x7F;
56 let sequence = u16::from_be_bytes([buf[2], buf[3]]);
57 let timestamp = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
58 let ssrc = u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]);
59
60 Some(Self {
61 version,
62 padding,
63 extension,
64 csrc_count,
65 marker,
66 payload_type,
67 sequence,
68 timestamp,
69 ssrc,
70 })
71 }
72
73 pub fn header_len(&self) -> usize {
75 12 + 4 * self.csrc_count as usize
76 }
77}
78
79fn payload_type_name(pt: u8) -> &'static str {
81 match pt {
82 0 => "PCMU",
83 8 => "PCMA",
84 _ => "unknown",
85 }
86}
87
88pub async fn receive_rtp(socket: UdpSocket, cancel: CancellationToken) {
92 let mut buf = [0u8; 2048];
93 let mut count = 0u64;
94
95 let local = socket
96 .local_addr()
97 .map(|a| a.to_string())
98 .unwrap_or_else(|_| "<unknown>".into());
99 info!("RTP receiver started on {local}");
100
101 loop {
102 select! {
103 result = socket.recv_from(&mut buf) => {
104 match result {
105 Ok((len, from)) => {
106 if let Some(header) = RtpHeader::parse(&buf[..len]) {
107 count += 1;
108 let payload_len = len.saturating_sub(header.header_len());
109 debug!(
110 "RTP #{} | PT={} ({}) | TS={} | SSRC=0x{:08X} | {} bytes from {}",
111 header.sequence,
112 header.payload_type,
113 payload_type_name(header.payload_type),
114 header.timestamp,
115 header.ssrc,
116 payload_len,
117 from,
118 );
119
120 if count.is_multiple_of(100) {
121 info!("Received {count} RTP packets so far");
122 }
123 } else {
124 warn!("Non-RTP packet ({len} bytes) from {from}");
125 }
126 }
127 Err(e) => {
128 warn!("RTP recv error: {e}");
129 break;
130 }
131 }
132 }
133 _ = cancel.cancelled() => break,
134 }
135 }
136
137 info!("RTP receiver stopped. Total packets: {count}");
138}
139
140#[derive(Debug, Clone, Copy)]
144pub struct RtpSendConfig {
145 pub payload_type: u8,
147 pub ssrc: u32,
150 pub initial_seq: u16,
152 pub initial_timestamp: u32,
155 pub samples_per_frame: u32,
158}
159
160pub async fn send_loop(
179 socket: Arc<UdpSocket>,
180 remote: SocketAddr,
181 config: RtpSendConfig,
182 mut payloads: mpsc::Receiver<Vec<u8>>,
183 cancel: CancellationToken,
184) {
185 let mut seq = config.initial_seq;
186 let mut ts = config.initial_timestamp;
187 let mut count: u64 = 0;
188 let mut packet = Vec::with_capacity(12 + 256);
189
190 let local = socket
191 .local_addr()
192 .map(|a| a.to_string())
193 .unwrap_or_else(|_| "<unknown>".into());
194 info!(
195 "RTP sender started {local} → {remote} (PT={}, SSRC=0x{:08X})",
196 config.payload_type, config.ssrc
197 );
198
199 loop {
200 select! {
201 _ = cancel.cancelled() => break,
202 maybe = payloads.recv() => {
203 let Some(payload) = maybe else { break };
204 packet.clear();
205 packet.push(0x80);
207 packet.push(config.payload_type & 0x7F);
211 packet.extend_from_slice(&seq.to_be_bytes());
212 packet.extend_from_slice(&ts.to_be_bytes());
213 packet.extend_from_slice(&config.ssrc.to_be_bytes());
214 packet.extend_from_slice(&payload);
215
216 if let Err(err) = socket.send_to(&packet, remote).await {
217 warn!("RTP send error: {err}");
218 break;
219 }
220 count += 1;
221 seq = seq.wrapping_add(1);
222 ts = ts.wrapping_add(config.samples_per_frame);
223
224 if count.is_multiple_of(100) {
225 debug!("sent {count} RTP packets");
226 }
227 }
228 }
229 }
230
231 info!("RTP sender stopped. Total packets: {count}");
232}
233
234#[cfg(test)]
235mod tests {
236 use super::*;
237
238 fn make_packet(version: u8, pt: u8, seq: u16, ts: u32, ssrc: u32) -> Vec<u8> {
239 let mut buf = vec![0u8; 12];
240 buf[0] = (version << 6) & 0xC0; buf[1] = pt & 0x7F; buf[2..4].copy_from_slice(&seq.to_be_bytes());
243 buf[4..8].copy_from_slice(&ts.to_be_bytes());
244 buf[8..12].copy_from_slice(&ssrc.to_be_bytes());
245 buf
246 }
247
248 #[test]
249 fn parse_minimum_header() {
250 let buf = make_packet(2, 0, 1234, 5678, 0xDEADBEEF);
251 let h = RtpHeader::parse(&buf).unwrap();
252 assert_eq!(h.version, 2);
253 assert_eq!(h.payload_type, 0);
254 assert_eq!(h.sequence, 1234);
255 assert_eq!(h.timestamp, 5678);
256 assert_eq!(h.ssrc, 0xDEADBEEF);
257 assert_eq!(h.csrc_count, 0);
258 assert_eq!(h.header_len(), 12);
259 }
260
261 #[test]
262 fn parse_rejects_short_buffer() {
263 let buf = vec![0u8; 11];
264 assert!(RtpHeader::parse(&buf).is_none());
265 }
266
267 #[test]
268 fn parse_rejects_wrong_version() {
269 let buf = make_packet(1, 0, 0, 0, 0);
270 assert!(RtpHeader::parse(&buf).is_none());
271 }
272
273 #[test]
274 fn parse_extracts_marker_bit() {
275 let mut buf = make_packet(2, 8, 0, 0, 0);
276 buf[1] |= 0x80; let h = RtpHeader::parse(&buf).unwrap();
278 assert!(h.marker);
279 assert_eq!(h.payload_type, 8); }
281
282 #[test]
283 fn header_len_accounts_for_csrcs() {
284 let mut buf = make_packet(2, 0, 0, 0, 0);
285 buf[0] |= 0x03;
287 buf.extend(std::iter::repeat_n(0u8, 12));
288 let h = RtpHeader::parse(&buf).unwrap();
289 assert_eq!(h.csrc_count, 3);
290 assert_eq!(h.header_len(), 24);
291 }
292
293 #[test]
294 fn payload_type_names() {
295 assert_eq!(payload_type_name(0), "PCMU");
296 assert_eq!(payload_type_name(8), "PCMA");
297 assert_eq!(payload_type_name(127), "unknown");
298 }
299
300 async fn loopback_pair() -> (UdpSocket, UdpSocket) {
304 let a = UdpSocket::bind("127.0.0.1:0").await.unwrap();
305 let b = UdpSocket::bind("127.0.0.1:0").await.unwrap();
306 (a, b)
307 }
308
309 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
310 async fn send_loop_packetizes_payloads_into_rtp() {
311 let (sender, receiver) = loopback_pair().await;
314 let remote = receiver.local_addr().unwrap();
315 let sender = Arc::new(sender);
316
317 let (tx, rx) = mpsc::channel::<Vec<u8>>(4);
318 let cancel = CancellationToken::new();
319
320 let task = tokio::spawn({
321 let sender = sender.clone();
322 let cancel = cancel.clone();
323 async move {
324 send_loop(
325 sender,
326 remote,
327 RtpSendConfig {
328 payload_type: 0, ssrc: 0xCAFEBABE,
330 initial_seq: 1000,
331 initial_timestamp: 5000,
332 samples_per_frame: 160,
333 },
334 rx,
335 cancel,
336 )
337 .await;
338 }
339 });
340
341 let payload: Vec<u8> = (0..160).map(|i| i as u8).collect();
345 tx.send(payload.clone()).await.unwrap();
346
347 let mut buf = [0u8; 2048];
348 let (n, from) = tokio::time::timeout(
349 std::time::Duration::from_millis(500),
350 receiver.recv_from(&mut buf),
351 )
352 .await
353 .expect("receiver got packet in time")
354 .expect("recv_from ok");
355
356 assert_eq!(from, sender.local_addr().unwrap());
357 assert_eq!(n, 12 + 160);
358
359 let header = RtpHeader::parse(&buf[..n]).expect("parses as RTP");
360 assert_eq!(header.version, 2);
361 assert_eq!(header.payload_type, 0);
362 assert_eq!(header.sequence, 1000);
363 assert_eq!(header.timestamp, 5000);
364 assert_eq!(header.ssrc, 0xCAFEBABE);
365 assert_eq!(header.csrc_count, 0);
366 assert!(!header.marker);
367 assert!(!header.padding);
368 assert!(!header.extension);
369
370 assert_eq!(&buf[12..n], &payload[..]);
371
372 cancel.cancel();
373 let _ = task.await;
374 }
375
376 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
377 async fn send_loop_advances_seq_and_timestamp_per_packet() {
378 let (sender, receiver) = loopback_pair().await;
382 let remote = receiver.local_addr().unwrap();
383 let sender = Arc::new(sender);
384
385 let (tx, rx) = mpsc::channel::<Vec<u8>>(8);
386 let cancel = CancellationToken::new();
387
388 let task = tokio::spawn({
389 let sender = sender.clone();
390 let cancel = cancel.clone();
391 async move {
392 send_loop(
393 sender,
394 remote,
395 RtpSendConfig {
396 payload_type: 8, ssrc: 0xDEADBEEF,
398 initial_seq: u16::MAX, initial_timestamp: 100,
400 samples_per_frame: 160,
401 },
402 rx,
403 cancel,
404 )
405 .await;
406 }
407 });
408
409 for i in 0..3u8 {
410 tx.send(vec![i; 4]).await.unwrap();
411 }
412
413 let mut headers = Vec::new();
414 let mut buf = [0u8; 2048];
415 for _ in 0..3 {
416 let (n, _) = tokio::time::timeout(
417 std::time::Duration::from_millis(500),
418 receiver.recv_from(&mut buf),
419 )
420 .await
421 .unwrap()
422 .unwrap();
423 headers.push(RtpHeader::parse(&buf[..n]).unwrap());
424 }
425
426 assert_eq!(headers[0].sequence, u16::MAX);
428 assert_eq!(headers[1].sequence, 0);
429 assert_eq!(headers[2].sequence, 1);
430
431 assert_eq!(headers[0].timestamp, 100);
433 assert_eq!(headers[1].timestamp, 260);
434 assert_eq!(headers[2].timestamp, 420);
435
436 for h in &headers {
438 assert_eq!(h.payload_type, 8);
439 assert_eq!(h.ssrc, 0xDEADBEEF);
440 }
441
442 cancel.cancel();
443 let _ = task.await;
444 }
445
446 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
447 async fn send_loop_exits_when_payload_channel_closes() {
448 let (sender, _receiver) = loopback_pair().await;
453 let remote = "127.0.0.1:9".parse().unwrap();
454 let sender = Arc::new(sender);
455
456 let (tx, rx) = mpsc::channel::<Vec<u8>>(1);
457 let cancel = CancellationToken::new();
458
459 let task = tokio::spawn({
460 let sender = sender.clone();
461 let cancel = cancel.clone();
462 async move {
463 send_loop(
464 sender,
465 remote,
466 RtpSendConfig {
467 payload_type: 0,
468 ssrc: 1,
469 initial_seq: 0,
470 initial_timestamp: 0,
471 samples_per_frame: 160,
472 },
473 rx,
474 cancel,
475 )
476 .await;
477 }
478 });
479
480 drop(tx);
481
482 tokio::time::timeout(std::time::Duration::from_millis(500), task)
483 .await
484 .expect("send_loop exited within timeout")
485 .expect("task did not panic");
486 }
487
488 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
489 async fn send_loop_exits_on_cancel() {
490 let (sender, _receiver) = loopback_pair().await;
494 let remote = "127.0.0.1:9".parse().unwrap();
495 let sender = Arc::new(sender);
496
497 let (_tx, rx) = mpsc::channel::<Vec<u8>>(1);
498 let cancel = CancellationToken::new();
499
500 let task = tokio::spawn({
501 let sender = sender.clone();
502 let cancel = cancel.clone();
503 async move {
504 send_loop(
505 sender,
506 remote,
507 RtpSendConfig {
508 payload_type: 0,
509 ssrc: 1,
510 initial_seq: 0,
511 initial_timestamp: 0,
512 samples_per_frame: 160,
513 },
514 rx,
515 cancel,
516 )
517 .await;
518 }
519 });
520
521 cancel.cancel();
522 tokio::time::timeout(std::time::Duration::from_millis(500), task)
523 .await
524 .expect("send_loop exited within timeout")
525 .expect("task did not panic");
526 }
527}