Skip to main content

wavekat_sip/rtp/
mod.rs

1//! RTP header parsing, a debug receive loop, a codec-agnostic send loop,
2//! and RFC 4733 DTMF (`telephone-event`) in both directions.
3//!
4//! This module intentionally stops at the wire: it parses RTP headers
5//! (RFC 3550), exposes a debug-friendly receive loop, packetizes
6//! caller-supplied payloads onto the wire via [`send_loop`], ships
7//! DTMF bursts via [`dtmf::send_dtmf_burst`], and decodes inbound
8//! digit presses via [`dtmf_recv::DtmfReceiver`]. Decoding audio
9//! payloads (G.711, Opus, …), jitter buffering, audio device routing,
10//! and pacing are consumer-layer concerns and live outside this crate.
11
12pub mod dtmf;
13pub mod dtmf_recv;
14
15use std::net::SocketAddr;
16use std::sync::Arc;
17
18use tokio::net::UdpSocket;
19use tokio::select;
20use tokio::sync::mpsc;
21use tokio_util::sync::CancellationToken;
22use tracing::{debug, info, warn};
23
24/// Parsed RTP packet header (RFC 3550, 12 bytes minimum).
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub struct RtpHeader {
27    pub version: u8,
28    pub padding: bool,
29    pub extension: bool,
30    pub csrc_count: u8,
31    pub marker: bool,
32    pub payload_type: u8,
33    pub sequence: u16,
34    pub timestamp: u32,
35    pub ssrc: u32,
36}
37
38impl RtpHeader {
39    /// Parse an RTP header from a buffer. Returns `None` if the buffer is
40    /// too short or the version field is not 2.
41    pub fn parse(buf: &[u8]) -> Option<Self> {
42        if buf.len() < 12 {
43            return None;
44        }
45
46        let version = (buf[0] >> 6) & 0x03;
47        if version != 2 {
48            return None;
49        }
50
51        let padding = (buf[0] >> 5) & 0x01 != 0;
52        let extension = (buf[0] >> 4) & 0x01 != 0;
53        let csrc_count = buf[0] & 0x0F;
54        let marker = (buf[1] >> 7) & 0x01 != 0;
55        let payload_type = buf[1] & 0x7F;
56        let sequence = u16::from_be_bytes([buf[2], buf[3]]);
57        let timestamp = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
58        let ssrc = u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]);
59
60        Some(Self {
61            version,
62            padding,
63            extension,
64            csrc_count,
65            marker,
66            payload_type,
67            sequence,
68            timestamp,
69            ssrc,
70        })
71    }
72
73    /// Total header length in bytes (12 + 4 * CSRC count).
74    pub fn header_len(&self) -> usize {
75        12 + 4 * self.csrc_count as usize
76    }
77}
78
79/// Human-friendly name for a static RTP payload type.
80fn payload_type_name(pt: u8) -> &'static str {
81    match pt {
82        0 => "PCMU",
83        8 => "PCMA",
84        _ => "unknown",
85    }
86}
87
88/// Receive RTP packets on the given socket and trace their headers until
89/// cancelled. Useful for smoke-testing inbound media without wiring up an
90/// audio path.
91pub async fn receive_rtp(socket: UdpSocket, cancel: CancellationToken) {
92    let mut buf = [0u8; 2048];
93    let mut count = 0u64;
94
95    let local = socket
96        .local_addr()
97        .map(|a| a.to_string())
98        .unwrap_or_else(|_| "<unknown>".into());
99    info!("RTP receiver started on {local}");
100
101    loop {
102        select! {
103            result = socket.recv_from(&mut buf) => {
104                match result {
105                    Ok((len, from)) => {
106                        if let Some(header) = RtpHeader::parse(&buf[..len]) {
107                            count += 1;
108                            let payload_len = len.saturating_sub(header.header_len());
109                            debug!(
110                                "RTP #{} | PT={} ({}) | TS={} | SSRC=0x{:08X} | {} bytes from {}",
111                                header.sequence,
112                                header.payload_type,
113                                payload_type_name(header.payload_type),
114                                header.timestamp,
115                                header.ssrc,
116                                payload_len,
117                                from,
118                            );
119
120                            if count.is_multiple_of(100) {
121                                info!("Received {count} RTP packets so far");
122                            }
123                        } else {
124                            warn!("Non-RTP packet ({len} bytes) from {from}");
125                        }
126                    }
127                    Err(e) => {
128                        warn!("RTP recv error: {e}");
129                        break;
130                    }
131                }
132            }
133            _ = cancel.cancelled() => break,
134        }
135    }
136
137    info!("RTP receiver stopped. Total packets: {count}");
138}
139
140/// Initial RTP state when starting a `send_loop`. Hold it across calls
141/// only if you intend to resume a stream — for a fresh call, pick a new
142/// `ssrc` and fresh `seq` / `timestamp` (random per RFC 3550 §5.1).
143#[derive(Debug, Clone, Copy)]
144pub struct RtpSendConfig {
145    /// Static RTP payload type — 0 for PCMU, 8 for PCMA, etc.
146    pub payload_type: u8,
147    /// Synchronization source. One per call; pick fresh randomness so
148    /// the remote can distinguish streams that re-use the same 5-tuple.
149    pub ssrc: u32,
150    /// Initial RTP sequence number. Wraps at u16 by spec.
151    pub initial_seq: u16,
152    /// Initial RTP timestamp. The clock domain is the *codec's* sample
153    /// rate — 8000 Hz for G.711 — not the audio device rate.
154    pub initial_timestamp: u32,
155    /// Amount to advance the RTP timestamp per packet. For 20 ms G.711
156    /// frames that's 160 (= 8 kHz × 20 ms).
157    pub samples_per_frame: u32,
158}
159
160/// Forward pre-encoded payloads out as RTP packets.
161///
162/// Each payload received on `payloads` becomes one RTP packet sent to
163/// `remote`. Sequence and timestamp counters advance by 1 and
164/// `samples_per_frame` respectively per packet; SSRC and payload type
165/// stay fixed for the lifetime of the loop.
166///
167/// **Pacing is the caller's job.** This loop sends as soon as a payload
168/// arrives; if the consumer pushes 20 ms G.711 frames as fast as it can
169/// encode them, the remote will hear chipmunk audio. Push at the
170/// codec's packetization cadence (a `tokio::time::interval` ticking the
171/// encoder is the usual shape).
172///
173/// `socket` is `Arc`-shared so the receive side can keep using the same
174/// bound port — RFC 3550 expects RTP send and receive to share a 5-tuple.
175///
176/// The loop exits when `cancel` is triggered, when `payloads` closes,
177/// or when a send fails (typically a closed socket).
178pub async fn send_loop(
179    socket: Arc<UdpSocket>,
180    remote: SocketAddr,
181    config: RtpSendConfig,
182    mut payloads: mpsc::Receiver<Vec<u8>>,
183    cancel: CancellationToken,
184) {
185    let mut seq = config.initial_seq;
186    let mut ts = config.initial_timestamp;
187    let mut count: u64 = 0;
188    let mut packet = Vec::with_capacity(12 + 256);
189
190    let local = socket
191        .local_addr()
192        .map(|a| a.to_string())
193        .unwrap_or_else(|_| "<unknown>".into());
194    info!(
195        "RTP sender started {local} → {remote} (PT={}, SSRC=0x{:08X})",
196        config.payload_type, config.ssrc
197    );
198
199    loop {
200        select! {
201            _ = cancel.cancelled() => break,
202            maybe = payloads.recv() => {
203                let Some(payload) = maybe else { break };
204                packet.clear();
205                // V=2, no padding, no extension, CSRC count = 0.
206                packet.push(0x80);
207                // Marker bit stays 0 — first-packet marker semantics are
208                // codec-specific and the caller can encode them into the
209                // payload stream if/when needed.
210                packet.push(config.payload_type & 0x7F);
211                packet.extend_from_slice(&seq.to_be_bytes());
212                packet.extend_from_slice(&ts.to_be_bytes());
213                packet.extend_from_slice(&config.ssrc.to_be_bytes());
214                packet.extend_from_slice(&payload);
215
216                if let Err(err) = socket.send_to(&packet, remote).await {
217                    warn!("RTP send error: {err}");
218                    break;
219                }
220                count += 1;
221                seq = seq.wrapping_add(1);
222                ts = ts.wrapping_add(config.samples_per_frame);
223
224                if count.is_multiple_of(100) {
225                    debug!("sent {count} RTP packets");
226                }
227            }
228        }
229    }
230
231    info!("RTP sender stopped. Total packets: {count}");
232}
233
234#[cfg(test)]
235mod tests {
236    use super::*;
237
238    fn make_packet(version: u8, pt: u8, seq: u16, ts: u32, ssrc: u32) -> Vec<u8> {
239        let mut buf = vec![0u8; 12];
240        buf[0] = (version << 6) & 0xC0; // version, no padding/ext/csrc
241        buf[1] = pt & 0x7F; // no marker, payload type
242        buf[2..4].copy_from_slice(&seq.to_be_bytes());
243        buf[4..8].copy_from_slice(&ts.to_be_bytes());
244        buf[8..12].copy_from_slice(&ssrc.to_be_bytes());
245        buf
246    }
247
248    #[test]
249    fn parse_minimum_header() {
250        let buf = make_packet(2, 0, 1234, 5678, 0xDEADBEEF);
251        let h = RtpHeader::parse(&buf).unwrap();
252        assert_eq!(h.version, 2);
253        assert_eq!(h.payload_type, 0);
254        assert_eq!(h.sequence, 1234);
255        assert_eq!(h.timestamp, 5678);
256        assert_eq!(h.ssrc, 0xDEADBEEF);
257        assert_eq!(h.csrc_count, 0);
258        assert_eq!(h.header_len(), 12);
259    }
260
261    #[test]
262    fn parse_rejects_short_buffer() {
263        let buf = vec![0u8; 11];
264        assert!(RtpHeader::parse(&buf).is_none());
265    }
266
267    #[test]
268    fn parse_rejects_wrong_version() {
269        let buf = make_packet(1, 0, 0, 0, 0);
270        assert!(RtpHeader::parse(&buf).is_none());
271    }
272
273    #[test]
274    fn parse_extracts_marker_bit() {
275        let mut buf = make_packet(2, 8, 0, 0, 0);
276        buf[1] |= 0x80; // set marker
277        let h = RtpHeader::parse(&buf).unwrap();
278        assert!(h.marker);
279        assert_eq!(h.payload_type, 8); // PCMA
280    }
281
282    #[test]
283    fn header_len_accounts_for_csrcs() {
284        let mut buf = make_packet(2, 0, 0, 0, 0);
285        // Set CSRC count to 3, then extend buffer so length check passes.
286        buf[0] |= 0x03;
287        buf.extend(std::iter::repeat_n(0u8, 12));
288        let h = RtpHeader::parse(&buf).unwrap();
289        assert_eq!(h.csrc_count, 3);
290        assert_eq!(h.header_len(), 24);
291    }
292
293    #[test]
294    fn payload_type_names() {
295        assert_eq!(payload_type_name(0), "PCMU");
296        assert_eq!(payload_type_name(8), "PCMA");
297        assert_eq!(payload_type_name(127), "unknown");
298    }
299
300    /// Bind a pair of UDP sockets on loopback. Returns (sender, receiver)
301    /// with each one's `connect`/`send_to` target derivable from the
302    /// other's `local_addr()`.
303    async fn loopback_pair() -> (UdpSocket, UdpSocket) {
304        let a = UdpSocket::bind("127.0.0.1:0").await.unwrap();
305        let b = UdpSocket::bind("127.0.0.1:0").await.unwrap();
306        (a, b)
307    }
308
309    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
310    async fn send_loop_packetizes_payloads_into_rtp() {
311        // One payload in, one wire packet out — header shaped per
312        // RtpSendConfig, payload appended verbatim.
313        let (sender, receiver) = loopback_pair().await;
314        let remote = receiver.local_addr().unwrap();
315        let sender = Arc::new(sender);
316
317        let (tx, rx) = mpsc::channel::<Vec<u8>>(4);
318        let cancel = CancellationToken::new();
319
320        let task = tokio::spawn({
321            let sender = sender.clone();
322            let cancel = cancel.clone();
323            async move {
324                send_loop(
325                    sender,
326                    remote,
327                    RtpSendConfig {
328                        payload_type: 0, // PCMU
329                        ssrc: 0xCAFEBABE,
330                        initial_seq: 1000,
331                        initial_timestamp: 5000,
332                        samples_per_frame: 160,
333                    },
334                    rx,
335                    cancel,
336                )
337                .await;
338            }
339        });
340
341        // The payload is the 160-byte G.711 shape, but the loop is
342        // codec-agnostic — what matters is that those bytes appear
343        // verbatim after the 12-byte header.
344        let payload: Vec<u8> = (0..160).map(|i| i as u8).collect();
345        tx.send(payload.clone()).await.unwrap();
346
347        let mut buf = [0u8; 2048];
348        let (n, from) = tokio::time::timeout(
349            std::time::Duration::from_millis(500),
350            receiver.recv_from(&mut buf),
351        )
352        .await
353        .expect("receiver got packet in time")
354        .expect("recv_from ok");
355
356        assert_eq!(from, sender.local_addr().unwrap());
357        assert_eq!(n, 12 + 160);
358
359        let header = RtpHeader::parse(&buf[..n]).expect("parses as RTP");
360        assert_eq!(header.version, 2);
361        assert_eq!(header.payload_type, 0);
362        assert_eq!(header.sequence, 1000);
363        assert_eq!(header.timestamp, 5000);
364        assert_eq!(header.ssrc, 0xCAFEBABE);
365        assert_eq!(header.csrc_count, 0);
366        assert!(!header.marker);
367        assert!(!header.padding);
368        assert!(!header.extension);
369
370        assert_eq!(&buf[12..n], &payload[..]);
371
372        cancel.cancel();
373        let _ = task.await;
374    }
375
376    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
377    async fn send_loop_advances_seq_and_timestamp_per_packet() {
378        // Regression guard: a future refactor that "simplifies" the
379        // counters (e.g. forgets wrapping_add, or advances ts by 1
380        // instead of samples_per_frame) breaks interop without this test.
381        let (sender, receiver) = loopback_pair().await;
382        let remote = receiver.local_addr().unwrap();
383        let sender = Arc::new(sender);
384
385        let (tx, rx) = mpsc::channel::<Vec<u8>>(8);
386        let cancel = CancellationToken::new();
387
388        let task = tokio::spawn({
389            let sender = sender.clone();
390            let cancel = cancel.clone();
391            async move {
392                send_loop(
393                    sender,
394                    remote,
395                    RtpSendConfig {
396                        payload_type: 8, // PCMA
397                        ssrc: 0xDEADBEEF,
398                        initial_seq: u16::MAX, // exercise wrap on second packet
399                        initial_timestamp: 100,
400                        samples_per_frame: 160,
401                    },
402                    rx,
403                    cancel,
404                )
405                .await;
406            }
407        });
408
409        for i in 0..3u8 {
410            tx.send(vec![i; 4]).await.unwrap();
411        }
412
413        let mut headers = Vec::new();
414        let mut buf = [0u8; 2048];
415        for _ in 0..3 {
416            let (n, _) = tokio::time::timeout(
417                std::time::Duration::from_millis(500),
418                receiver.recv_from(&mut buf),
419            )
420            .await
421            .unwrap()
422            .unwrap();
423            headers.push(RtpHeader::parse(&buf[..n]).unwrap());
424        }
425
426        // seq: MAX, 0 (wrap), 1
427        assert_eq!(headers[0].sequence, u16::MAX);
428        assert_eq!(headers[1].sequence, 0);
429        assert_eq!(headers[2].sequence, 1);
430
431        // ts advances by samples_per_frame each step
432        assert_eq!(headers[0].timestamp, 100);
433        assert_eq!(headers[1].timestamp, 260);
434        assert_eq!(headers[2].timestamp, 420);
435
436        // PT pinned across packets
437        for h in &headers {
438            assert_eq!(h.payload_type, 8);
439            assert_eq!(h.ssrc, 0xDEADBEEF);
440        }
441
442        cancel.cancel();
443        let _ = task.await;
444    }
445
446    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
447    async fn send_loop_exits_when_payload_channel_closes() {
448        // Producer-driven shutdown: dropping the sender half of the
449        // mpsc must let send_loop return on its own, so the consumer
450        // doesn't have to remember to fire `cancel` for the happy path
451        // of "encoder finished, nothing more to send."
452        let (sender, _receiver) = loopback_pair().await;
453        let remote = "127.0.0.1:9".parse().unwrap();
454        let sender = Arc::new(sender);
455
456        let (tx, rx) = mpsc::channel::<Vec<u8>>(1);
457        let cancel = CancellationToken::new();
458
459        let task = tokio::spawn({
460            let sender = sender.clone();
461            let cancel = cancel.clone();
462            async move {
463                send_loop(
464                    sender,
465                    remote,
466                    RtpSendConfig {
467                        payload_type: 0,
468                        ssrc: 1,
469                        initial_seq: 0,
470                        initial_timestamp: 0,
471                        samples_per_frame: 160,
472                    },
473                    rx,
474                    cancel,
475                )
476                .await;
477            }
478        });
479
480        drop(tx);
481
482        tokio::time::timeout(std::time::Duration::from_millis(500), task)
483            .await
484            .expect("send_loop exited within timeout")
485            .expect("task did not panic");
486    }
487
488    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
489    async fn send_loop_exits_on_cancel() {
490        // Cancel-driven shutdown for the "we're done with this call"
491        // path — the producer may still be alive (the encoder task is
492        // shared across calls) but this dialog's loop must stop.
493        let (sender, _receiver) = loopback_pair().await;
494        let remote = "127.0.0.1:9".parse().unwrap();
495        let sender = Arc::new(sender);
496
497        let (_tx, rx) = mpsc::channel::<Vec<u8>>(1);
498        let cancel = CancellationToken::new();
499
500        let task = tokio::spawn({
501            let sender = sender.clone();
502            let cancel = cancel.clone();
503            async move {
504                send_loop(
505                    sender,
506                    remote,
507                    RtpSendConfig {
508                        payload_type: 0,
509                        ssrc: 1,
510                        initial_seq: 0,
511                        initial_timestamp: 0,
512                        samples_per_frame: 160,
513                    },
514                    rx,
515                    cancel,
516                )
517                .await;
518            }
519        });
520
521        cancel.cancel();
522        tokio::time::timeout(std::time::Duration::from_millis(500), task)
523            .await
524            .expect("send_loop exited within timeout")
525            .expect("task did not panic");
526    }
527}