Skip to main content

wavekat_sip/rtp/
mod.rs

1//! RTP header parsing, a debug receive loop, a codec-agnostic send loop,
2//! and RFC 4733 DTMF (`telephone-event`) packet construction + transmission.
3//!
4//! This module intentionally stops at the wire: it parses RTP headers
5//! (RFC 3550), exposes a debug-friendly receive loop, packetizes
6//! caller-supplied payloads onto the wire via [`send_loop`], and ships
7//! DTMF bursts via [`dtmf::send_dtmf_burst`]. Decoding payloads
8//! (G.711, Opus, …), jitter buffering, audio device routing, and pacing
9//! are consumer-layer concerns and live outside this crate.
10
11pub mod dtmf;
12
13use std::net::SocketAddr;
14use std::sync::Arc;
15
16use tokio::net::UdpSocket;
17use tokio::select;
18use tokio::sync::mpsc;
19use tokio_util::sync::CancellationToken;
20use tracing::{debug, info, warn};
21
22/// Parsed RTP packet header (RFC 3550, 12 bytes minimum).
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub struct RtpHeader {
25    pub version: u8,
26    pub padding: bool,
27    pub extension: bool,
28    pub csrc_count: u8,
29    pub marker: bool,
30    pub payload_type: u8,
31    pub sequence: u16,
32    pub timestamp: u32,
33    pub ssrc: u32,
34}
35
36impl RtpHeader {
37    /// Parse an RTP header from a buffer. Returns `None` if the buffer is
38    /// too short or the version field is not 2.
39    pub fn parse(buf: &[u8]) -> Option<Self> {
40        if buf.len() < 12 {
41            return None;
42        }
43
44        let version = (buf[0] >> 6) & 0x03;
45        if version != 2 {
46            return None;
47        }
48
49        let padding = (buf[0] >> 5) & 0x01 != 0;
50        let extension = (buf[0] >> 4) & 0x01 != 0;
51        let csrc_count = buf[0] & 0x0F;
52        let marker = (buf[1] >> 7) & 0x01 != 0;
53        let payload_type = buf[1] & 0x7F;
54        let sequence = u16::from_be_bytes([buf[2], buf[3]]);
55        let timestamp = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
56        let ssrc = u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]);
57
58        Some(Self {
59            version,
60            padding,
61            extension,
62            csrc_count,
63            marker,
64            payload_type,
65            sequence,
66            timestamp,
67            ssrc,
68        })
69    }
70
71    /// Total header length in bytes (12 + 4 * CSRC count).
72    pub fn header_len(&self) -> usize {
73        12 + 4 * self.csrc_count as usize
74    }
75}
76
77/// Human-friendly name for a static RTP payload type.
78fn payload_type_name(pt: u8) -> &'static str {
79    match pt {
80        0 => "PCMU",
81        8 => "PCMA",
82        _ => "unknown",
83    }
84}
85
86/// Receive RTP packets on the given socket and trace their headers until
87/// cancelled. Useful for smoke-testing inbound media without wiring up an
88/// audio path.
89pub async fn receive_rtp(socket: UdpSocket, cancel: CancellationToken) {
90    let mut buf = [0u8; 2048];
91    let mut count = 0u64;
92
93    let local = socket
94        .local_addr()
95        .map(|a| a.to_string())
96        .unwrap_or_else(|_| "<unknown>".into());
97    info!("RTP receiver started on {local}");
98
99    loop {
100        select! {
101            result = socket.recv_from(&mut buf) => {
102                match result {
103                    Ok((len, from)) => {
104                        if let Some(header) = RtpHeader::parse(&buf[..len]) {
105                            count += 1;
106                            let payload_len = len.saturating_sub(header.header_len());
107                            debug!(
108                                "RTP #{} | PT={} ({}) | TS={} | SSRC=0x{:08X} | {} bytes from {}",
109                                header.sequence,
110                                header.payload_type,
111                                payload_type_name(header.payload_type),
112                                header.timestamp,
113                                header.ssrc,
114                                payload_len,
115                                from,
116                            );
117
118                            if count.is_multiple_of(100) {
119                                info!("Received {count} RTP packets so far");
120                            }
121                        } else {
122                            warn!("Non-RTP packet ({len} bytes) from {from}");
123                        }
124                    }
125                    Err(e) => {
126                        warn!("RTP recv error: {e}");
127                        break;
128                    }
129                }
130            }
131            _ = cancel.cancelled() => break,
132        }
133    }
134
135    info!("RTP receiver stopped. Total packets: {count}");
136}
137
138/// Initial RTP state when starting a `send_loop`. Hold it across calls
139/// only if you intend to resume a stream — for a fresh call, pick a new
140/// `ssrc` and fresh `seq` / `timestamp` (random per RFC 3550 §5.1).
141#[derive(Debug, Clone, Copy)]
142pub struct RtpSendConfig {
143    /// Static RTP payload type — 0 for PCMU, 8 for PCMA, etc.
144    pub payload_type: u8,
145    /// Synchronization source. One per call; pick fresh randomness so
146    /// the remote can distinguish streams that re-use the same 5-tuple.
147    pub ssrc: u32,
148    /// Initial RTP sequence number. Wraps at u16 by spec.
149    pub initial_seq: u16,
150    /// Initial RTP timestamp. The clock domain is the *codec's* sample
151    /// rate — 8000 Hz for G.711 — not the audio device rate.
152    pub initial_timestamp: u32,
153    /// Amount to advance the RTP timestamp per packet. For 20 ms G.711
154    /// frames that's 160 (= 8 kHz × 20 ms).
155    pub samples_per_frame: u32,
156}
157
158/// Forward pre-encoded payloads out as RTP packets.
159///
160/// Each payload received on `payloads` becomes one RTP packet sent to
161/// `remote`. Sequence and timestamp counters advance by 1 and
162/// `samples_per_frame` respectively per packet; SSRC and payload type
163/// stay fixed for the lifetime of the loop.
164///
165/// **Pacing is the caller's job.** This loop sends as soon as a payload
166/// arrives; if the consumer pushes 20 ms G.711 frames as fast as it can
167/// encode them, the remote will hear chipmunk audio. Push at the
168/// codec's packetization cadence (a `tokio::time::interval` ticking the
169/// encoder is the usual shape).
170///
171/// `socket` is `Arc`-shared so the receive side can keep using the same
172/// bound port — RFC 3550 expects RTP send and receive to share a 5-tuple.
173///
174/// The loop exits when `cancel` is triggered, when `payloads` closes,
175/// or when a send fails (typically a closed socket).
176pub async fn send_loop(
177    socket: Arc<UdpSocket>,
178    remote: SocketAddr,
179    config: RtpSendConfig,
180    mut payloads: mpsc::Receiver<Vec<u8>>,
181    cancel: CancellationToken,
182) {
183    let mut seq = config.initial_seq;
184    let mut ts = config.initial_timestamp;
185    let mut count: u64 = 0;
186    let mut packet = Vec::with_capacity(12 + 256);
187
188    let local = socket
189        .local_addr()
190        .map(|a| a.to_string())
191        .unwrap_or_else(|_| "<unknown>".into());
192    info!(
193        "RTP sender started {local} → {remote} (PT={}, SSRC=0x{:08X})",
194        config.payload_type, config.ssrc
195    );
196
197    loop {
198        select! {
199            _ = cancel.cancelled() => break,
200            maybe = payloads.recv() => {
201                let Some(payload) = maybe else { break };
202                packet.clear();
203                // V=2, no padding, no extension, CSRC count = 0.
204                packet.push(0x80);
205                // Marker bit stays 0 — first-packet marker semantics are
206                // codec-specific and the caller can encode them into the
207                // payload stream if/when needed.
208                packet.push(config.payload_type & 0x7F);
209                packet.extend_from_slice(&seq.to_be_bytes());
210                packet.extend_from_slice(&ts.to_be_bytes());
211                packet.extend_from_slice(&config.ssrc.to_be_bytes());
212                packet.extend_from_slice(&payload);
213
214                if let Err(err) = socket.send_to(&packet, remote).await {
215                    warn!("RTP send error: {err}");
216                    break;
217                }
218                count += 1;
219                seq = seq.wrapping_add(1);
220                ts = ts.wrapping_add(config.samples_per_frame);
221
222                if count.is_multiple_of(100) {
223                    debug!("sent {count} RTP packets");
224                }
225            }
226        }
227    }
228
229    info!("RTP sender stopped. Total packets: {count}");
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235
236    fn make_packet(version: u8, pt: u8, seq: u16, ts: u32, ssrc: u32) -> Vec<u8> {
237        let mut buf = vec![0u8; 12];
238        buf[0] = (version << 6) & 0xC0; // version, no padding/ext/csrc
239        buf[1] = pt & 0x7F; // no marker, payload type
240        buf[2..4].copy_from_slice(&seq.to_be_bytes());
241        buf[4..8].copy_from_slice(&ts.to_be_bytes());
242        buf[8..12].copy_from_slice(&ssrc.to_be_bytes());
243        buf
244    }
245
246    #[test]
247    fn parse_minimum_header() {
248        let buf = make_packet(2, 0, 1234, 5678, 0xDEADBEEF);
249        let h = RtpHeader::parse(&buf).unwrap();
250        assert_eq!(h.version, 2);
251        assert_eq!(h.payload_type, 0);
252        assert_eq!(h.sequence, 1234);
253        assert_eq!(h.timestamp, 5678);
254        assert_eq!(h.ssrc, 0xDEADBEEF);
255        assert_eq!(h.csrc_count, 0);
256        assert_eq!(h.header_len(), 12);
257    }
258
259    #[test]
260    fn parse_rejects_short_buffer() {
261        let buf = vec![0u8; 11];
262        assert!(RtpHeader::parse(&buf).is_none());
263    }
264
265    #[test]
266    fn parse_rejects_wrong_version() {
267        let buf = make_packet(1, 0, 0, 0, 0);
268        assert!(RtpHeader::parse(&buf).is_none());
269    }
270
271    #[test]
272    fn parse_extracts_marker_bit() {
273        let mut buf = make_packet(2, 8, 0, 0, 0);
274        buf[1] |= 0x80; // set marker
275        let h = RtpHeader::parse(&buf).unwrap();
276        assert!(h.marker);
277        assert_eq!(h.payload_type, 8); // PCMA
278    }
279
280    #[test]
281    fn header_len_accounts_for_csrcs() {
282        let mut buf = make_packet(2, 0, 0, 0, 0);
283        // Set CSRC count to 3, then extend buffer so length check passes.
284        buf[0] |= 0x03;
285        buf.extend(std::iter::repeat_n(0u8, 12));
286        let h = RtpHeader::parse(&buf).unwrap();
287        assert_eq!(h.csrc_count, 3);
288        assert_eq!(h.header_len(), 24);
289    }
290
291    #[test]
292    fn payload_type_names() {
293        assert_eq!(payload_type_name(0), "PCMU");
294        assert_eq!(payload_type_name(8), "PCMA");
295        assert_eq!(payload_type_name(127), "unknown");
296    }
297
298    /// Bind a pair of UDP sockets on loopback. Returns (sender, receiver)
299    /// with each one's `connect`/`send_to` target derivable from the
300    /// other's `local_addr()`.
301    async fn loopback_pair() -> (UdpSocket, UdpSocket) {
302        let a = UdpSocket::bind("127.0.0.1:0").await.unwrap();
303        let b = UdpSocket::bind("127.0.0.1:0").await.unwrap();
304        (a, b)
305    }
306
307    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
308    async fn send_loop_packetizes_payloads_into_rtp() {
309        // One payload in, one wire packet out — header shaped per
310        // RtpSendConfig, payload appended verbatim.
311        let (sender, receiver) = loopback_pair().await;
312        let remote = receiver.local_addr().unwrap();
313        let sender = Arc::new(sender);
314
315        let (tx, rx) = mpsc::channel::<Vec<u8>>(4);
316        let cancel = CancellationToken::new();
317
318        let task = tokio::spawn({
319            let sender = sender.clone();
320            let cancel = cancel.clone();
321            async move {
322                send_loop(
323                    sender,
324                    remote,
325                    RtpSendConfig {
326                        payload_type: 0, // PCMU
327                        ssrc: 0xCAFEBABE,
328                        initial_seq: 1000,
329                        initial_timestamp: 5000,
330                        samples_per_frame: 160,
331                    },
332                    rx,
333                    cancel,
334                )
335                .await;
336            }
337        });
338
339        // The payload is the 160-byte G.711 shape, but the loop is
340        // codec-agnostic — what matters is that those bytes appear
341        // verbatim after the 12-byte header.
342        let payload: Vec<u8> = (0..160).map(|i| i as u8).collect();
343        tx.send(payload.clone()).await.unwrap();
344
345        let mut buf = [0u8; 2048];
346        let (n, from) = tokio::time::timeout(
347            std::time::Duration::from_millis(500),
348            receiver.recv_from(&mut buf),
349        )
350        .await
351        .expect("receiver got packet in time")
352        .expect("recv_from ok");
353
354        assert_eq!(from, sender.local_addr().unwrap());
355        assert_eq!(n, 12 + 160);
356
357        let header = RtpHeader::parse(&buf[..n]).expect("parses as RTP");
358        assert_eq!(header.version, 2);
359        assert_eq!(header.payload_type, 0);
360        assert_eq!(header.sequence, 1000);
361        assert_eq!(header.timestamp, 5000);
362        assert_eq!(header.ssrc, 0xCAFEBABE);
363        assert_eq!(header.csrc_count, 0);
364        assert!(!header.marker);
365        assert!(!header.padding);
366        assert!(!header.extension);
367
368        assert_eq!(&buf[12..n], &payload[..]);
369
370        cancel.cancel();
371        let _ = task.await;
372    }
373
374    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
375    async fn send_loop_advances_seq_and_timestamp_per_packet() {
376        // Regression guard: a future refactor that "simplifies" the
377        // counters (e.g. forgets wrapping_add, or advances ts by 1
378        // instead of samples_per_frame) breaks interop without this test.
379        let (sender, receiver) = loopback_pair().await;
380        let remote = receiver.local_addr().unwrap();
381        let sender = Arc::new(sender);
382
383        let (tx, rx) = mpsc::channel::<Vec<u8>>(8);
384        let cancel = CancellationToken::new();
385
386        let task = tokio::spawn({
387            let sender = sender.clone();
388            let cancel = cancel.clone();
389            async move {
390                send_loop(
391                    sender,
392                    remote,
393                    RtpSendConfig {
394                        payload_type: 8, // PCMA
395                        ssrc: 0xDEADBEEF,
396                        initial_seq: u16::MAX, // exercise wrap on second packet
397                        initial_timestamp: 100,
398                        samples_per_frame: 160,
399                    },
400                    rx,
401                    cancel,
402                )
403                .await;
404            }
405        });
406
407        for i in 0..3u8 {
408            tx.send(vec![i; 4]).await.unwrap();
409        }
410
411        let mut headers = Vec::new();
412        let mut buf = [0u8; 2048];
413        for _ in 0..3 {
414            let (n, _) = tokio::time::timeout(
415                std::time::Duration::from_millis(500),
416                receiver.recv_from(&mut buf),
417            )
418            .await
419            .unwrap()
420            .unwrap();
421            headers.push(RtpHeader::parse(&buf[..n]).unwrap());
422        }
423
424        // seq: MAX, 0 (wrap), 1
425        assert_eq!(headers[0].sequence, u16::MAX);
426        assert_eq!(headers[1].sequence, 0);
427        assert_eq!(headers[2].sequence, 1);
428
429        // ts advances by samples_per_frame each step
430        assert_eq!(headers[0].timestamp, 100);
431        assert_eq!(headers[1].timestamp, 260);
432        assert_eq!(headers[2].timestamp, 420);
433
434        // PT pinned across packets
435        for h in &headers {
436            assert_eq!(h.payload_type, 8);
437            assert_eq!(h.ssrc, 0xDEADBEEF);
438        }
439
440        cancel.cancel();
441        let _ = task.await;
442    }
443
444    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
445    async fn send_loop_exits_when_payload_channel_closes() {
446        // Producer-driven shutdown: dropping the sender half of the
447        // mpsc must let send_loop return on its own, so the consumer
448        // doesn't have to remember to fire `cancel` for the happy path
449        // of "encoder finished, nothing more to send."
450        let (sender, _receiver) = loopback_pair().await;
451        let remote = "127.0.0.1:9".parse().unwrap();
452        let sender = Arc::new(sender);
453
454        let (tx, rx) = mpsc::channel::<Vec<u8>>(1);
455        let cancel = CancellationToken::new();
456
457        let task = tokio::spawn({
458            let sender = sender.clone();
459            let cancel = cancel.clone();
460            async move {
461                send_loop(
462                    sender,
463                    remote,
464                    RtpSendConfig {
465                        payload_type: 0,
466                        ssrc: 1,
467                        initial_seq: 0,
468                        initial_timestamp: 0,
469                        samples_per_frame: 160,
470                    },
471                    rx,
472                    cancel,
473                )
474                .await;
475            }
476        });
477
478        drop(tx);
479
480        tokio::time::timeout(std::time::Duration::from_millis(500), task)
481            .await
482            .expect("send_loop exited within timeout")
483            .expect("task did not panic");
484    }
485
486    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
487    async fn send_loop_exits_on_cancel() {
488        // Cancel-driven shutdown for the "we're done with this call"
489        // path — the producer may still be alive (the encoder task is
490        // shared across calls) but this dialog's loop must stop.
491        let (sender, _receiver) = loopback_pair().await;
492        let remote = "127.0.0.1:9".parse().unwrap();
493        let sender = Arc::new(sender);
494
495        let (_tx, rx) = mpsc::channel::<Vec<u8>>(1);
496        let cancel = CancellationToken::new();
497
498        let task = tokio::spawn({
499            let sender = sender.clone();
500            let cancel = cancel.clone();
501            async move {
502                send_loop(
503                    sender,
504                    remote,
505                    RtpSendConfig {
506                        payload_type: 0,
507                        ssrc: 1,
508                        initial_seq: 0,
509                        initial_timestamp: 0,
510                        samples_per_frame: 160,
511                    },
512                    rx,
513                    cancel,
514                )
515                .await;
516            }
517        });
518
519        cancel.cancel();
520        tokio::time::timeout(std::time::Duration::from_millis(500), task)
521            .await
522            .expect("send_loop exited within timeout")
523            .expect("task did not panic");
524    }
525}