Skip to main content

wavekat_sip/
rtp.rs

1//! RTP header parsing, a debug receive loop, and a codec-agnostic send loop.
2//!
3//! This module intentionally stops at the wire: it parses RTP headers
4//! (RFC 3550), exposes a debug-friendly receive loop, and packetizes
5//! caller-supplied payloads onto the wire via [`send_loop`]. Decoding
6//! payloads (G.711, Opus, …), jitter buffering, audio device routing,
7//! and pacing are consumer-layer concerns and live outside this crate.
8
9use std::net::SocketAddr;
10use std::sync::Arc;
11
12use tokio::net::UdpSocket;
13use tokio::select;
14use tokio::sync::mpsc;
15use tokio_util::sync::CancellationToken;
16use tracing::{debug, info, warn};
17
18/// Parsed RTP packet header (RFC 3550, 12 bytes minimum).
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
20pub struct RtpHeader {
21    pub version: u8,
22    pub padding: bool,
23    pub extension: bool,
24    pub csrc_count: u8,
25    pub marker: bool,
26    pub payload_type: u8,
27    pub sequence: u16,
28    pub timestamp: u32,
29    pub ssrc: u32,
30}
31
32impl RtpHeader {
33    /// Parse an RTP header from a buffer. Returns `None` if the buffer is
34    /// too short or the version field is not 2.
35    pub fn parse(buf: &[u8]) -> Option<Self> {
36        if buf.len() < 12 {
37            return None;
38        }
39
40        let version = (buf[0] >> 6) & 0x03;
41        if version != 2 {
42            return None;
43        }
44
45        let padding = (buf[0] >> 5) & 0x01 != 0;
46        let extension = (buf[0] >> 4) & 0x01 != 0;
47        let csrc_count = buf[0] & 0x0F;
48        let marker = (buf[1] >> 7) & 0x01 != 0;
49        let payload_type = buf[1] & 0x7F;
50        let sequence = u16::from_be_bytes([buf[2], buf[3]]);
51        let timestamp = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
52        let ssrc = u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]);
53
54        Some(Self {
55            version,
56            padding,
57            extension,
58            csrc_count,
59            marker,
60            payload_type,
61            sequence,
62            timestamp,
63            ssrc,
64        })
65    }
66
67    /// Total header length in bytes (12 + 4 * CSRC count).
68    pub fn header_len(&self) -> usize {
69        12 + 4 * self.csrc_count as usize
70    }
71}
72
73/// Human-friendly name for a static RTP payload type.
74fn payload_type_name(pt: u8) -> &'static str {
75    match pt {
76        0 => "PCMU",
77        8 => "PCMA",
78        _ => "unknown",
79    }
80}
81
82/// Receive RTP packets on the given socket and trace their headers until
83/// cancelled. Useful for smoke-testing inbound media without wiring up an
84/// audio path.
85pub async fn receive_rtp(socket: UdpSocket, cancel: CancellationToken) {
86    let mut buf = [0u8; 2048];
87    let mut count = 0u64;
88
89    let local = socket
90        .local_addr()
91        .map(|a| a.to_string())
92        .unwrap_or_else(|_| "<unknown>".into());
93    info!("RTP receiver started on {local}");
94
95    loop {
96        select! {
97            result = socket.recv_from(&mut buf) => {
98                match result {
99                    Ok((len, from)) => {
100                        if let Some(header) = RtpHeader::parse(&buf[..len]) {
101                            count += 1;
102                            let payload_len = len.saturating_sub(header.header_len());
103                            debug!(
104                                "RTP #{} | PT={} ({}) | TS={} | SSRC=0x{:08X} | {} bytes from {}",
105                                header.sequence,
106                                header.payload_type,
107                                payload_type_name(header.payload_type),
108                                header.timestamp,
109                                header.ssrc,
110                                payload_len,
111                                from,
112                            );
113
114                            if count.is_multiple_of(100) {
115                                info!("Received {count} RTP packets so far");
116                            }
117                        } else {
118                            warn!("Non-RTP packet ({len} bytes) from {from}");
119                        }
120                    }
121                    Err(e) => {
122                        warn!("RTP recv error: {e}");
123                        break;
124                    }
125                }
126            }
127            _ = cancel.cancelled() => break,
128        }
129    }
130
131    info!("RTP receiver stopped. Total packets: {count}");
132}
133
134/// Initial RTP state when starting a `send_loop`. Hold it across calls
135/// only if you intend to resume a stream — for a fresh call, pick a new
136/// `ssrc` and fresh `seq` / `timestamp` (random per RFC 3550 §5.1).
137#[derive(Debug, Clone, Copy)]
138pub struct RtpSendConfig {
139    /// Static RTP payload type — 0 for PCMU, 8 for PCMA, etc.
140    pub payload_type: u8,
141    /// Synchronization source. One per call; pick fresh randomness so
142    /// the remote can distinguish streams that re-use the same 5-tuple.
143    pub ssrc: u32,
144    /// Initial RTP sequence number. Wraps at u16 by spec.
145    pub initial_seq: u16,
146    /// Initial RTP timestamp. The clock domain is the *codec's* sample
147    /// rate — 8000 Hz for G.711 — not the audio device rate.
148    pub initial_timestamp: u32,
149    /// Amount to advance the RTP timestamp per packet. For 20 ms G.711
150    /// frames that's 160 (= 8 kHz × 20 ms).
151    pub samples_per_frame: u32,
152}
153
154/// Forward pre-encoded payloads out as RTP packets.
155///
156/// Each payload received on `payloads` becomes one RTP packet sent to
157/// `remote`. Sequence and timestamp counters advance by 1 and
158/// `samples_per_frame` respectively per packet; SSRC and payload type
159/// stay fixed for the lifetime of the loop.
160///
161/// **Pacing is the caller's job.** This loop sends as soon as a payload
162/// arrives; if the consumer pushes 20 ms G.711 frames as fast as it can
163/// encode them, the remote will hear chipmunk audio. Push at the
164/// codec's packetization cadence (a `tokio::time::interval` ticking the
165/// encoder is the usual shape).
166///
167/// `socket` is `Arc`-shared so the receive side can keep using the same
168/// bound port — RFC 3550 expects RTP send and receive to share a 5-tuple.
169///
170/// The loop exits when `cancel` is triggered, when `payloads` closes,
171/// or when a send fails (typically a closed socket).
172pub async fn send_loop(
173    socket: Arc<UdpSocket>,
174    remote: SocketAddr,
175    config: RtpSendConfig,
176    mut payloads: mpsc::Receiver<Vec<u8>>,
177    cancel: CancellationToken,
178) {
179    let mut seq = config.initial_seq;
180    let mut ts = config.initial_timestamp;
181    let mut count: u64 = 0;
182    let mut packet = Vec::with_capacity(12 + 256);
183
184    let local = socket
185        .local_addr()
186        .map(|a| a.to_string())
187        .unwrap_or_else(|_| "<unknown>".into());
188    info!(
189        "RTP sender started {local} → {remote} (PT={}, SSRC=0x{:08X})",
190        config.payload_type, config.ssrc
191    );
192
193    loop {
194        select! {
195            _ = cancel.cancelled() => break,
196            maybe = payloads.recv() => {
197                let Some(payload) = maybe else { break };
198                packet.clear();
199                // V=2, no padding, no extension, CSRC count = 0.
200                packet.push(0x80);
201                // Marker bit stays 0 — first-packet marker semantics are
202                // codec-specific and the caller can encode them into the
203                // payload stream if/when needed.
204                packet.push(config.payload_type & 0x7F);
205                packet.extend_from_slice(&seq.to_be_bytes());
206                packet.extend_from_slice(&ts.to_be_bytes());
207                packet.extend_from_slice(&config.ssrc.to_be_bytes());
208                packet.extend_from_slice(&payload);
209
210                if let Err(err) = socket.send_to(&packet, remote).await {
211                    warn!("RTP send error: {err}");
212                    break;
213                }
214                count += 1;
215                seq = seq.wrapping_add(1);
216                ts = ts.wrapping_add(config.samples_per_frame);
217
218                if count.is_multiple_of(100) {
219                    debug!("sent {count} RTP packets");
220                }
221            }
222        }
223    }
224
225    info!("RTP sender stopped. Total packets: {count}");
226}
227
228#[cfg(test)]
229mod tests {
230    use super::*;
231
232    fn make_packet(version: u8, pt: u8, seq: u16, ts: u32, ssrc: u32) -> Vec<u8> {
233        let mut buf = vec![0u8; 12];
234        buf[0] = (version << 6) & 0xC0; // version, no padding/ext/csrc
235        buf[1] = pt & 0x7F; // no marker, payload type
236        buf[2..4].copy_from_slice(&seq.to_be_bytes());
237        buf[4..8].copy_from_slice(&ts.to_be_bytes());
238        buf[8..12].copy_from_slice(&ssrc.to_be_bytes());
239        buf
240    }
241
242    #[test]
243    fn parse_minimum_header() {
244        let buf = make_packet(2, 0, 1234, 5678, 0xDEADBEEF);
245        let h = RtpHeader::parse(&buf).unwrap();
246        assert_eq!(h.version, 2);
247        assert_eq!(h.payload_type, 0);
248        assert_eq!(h.sequence, 1234);
249        assert_eq!(h.timestamp, 5678);
250        assert_eq!(h.ssrc, 0xDEADBEEF);
251        assert_eq!(h.csrc_count, 0);
252        assert_eq!(h.header_len(), 12);
253    }
254
255    #[test]
256    fn parse_rejects_short_buffer() {
257        let buf = vec![0u8; 11];
258        assert!(RtpHeader::parse(&buf).is_none());
259    }
260
261    #[test]
262    fn parse_rejects_wrong_version() {
263        let buf = make_packet(1, 0, 0, 0, 0);
264        assert!(RtpHeader::parse(&buf).is_none());
265    }
266
267    #[test]
268    fn parse_extracts_marker_bit() {
269        let mut buf = make_packet(2, 8, 0, 0, 0);
270        buf[1] |= 0x80; // set marker
271        let h = RtpHeader::parse(&buf).unwrap();
272        assert!(h.marker);
273        assert_eq!(h.payload_type, 8); // PCMA
274    }
275
276    #[test]
277    fn header_len_accounts_for_csrcs() {
278        let mut buf = make_packet(2, 0, 0, 0, 0);
279        // Set CSRC count to 3, then extend buffer so length check passes.
280        buf[0] |= 0x03;
281        buf.extend(std::iter::repeat_n(0u8, 12));
282        let h = RtpHeader::parse(&buf).unwrap();
283        assert_eq!(h.csrc_count, 3);
284        assert_eq!(h.header_len(), 24);
285    }
286
287    #[test]
288    fn payload_type_names() {
289        assert_eq!(payload_type_name(0), "PCMU");
290        assert_eq!(payload_type_name(8), "PCMA");
291        assert_eq!(payload_type_name(127), "unknown");
292    }
293
294    /// Bind a pair of UDP sockets on loopback. Returns (sender, receiver)
295    /// with each one's `connect`/`send_to` target derivable from the
296    /// other's `local_addr()`.
297    async fn loopback_pair() -> (UdpSocket, UdpSocket) {
298        let a = UdpSocket::bind("127.0.0.1:0").await.unwrap();
299        let b = UdpSocket::bind("127.0.0.1:0").await.unwrap();
300        (a, b)
301    }
302
303    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
304    async fn send_loop_packetizes_payloads_into_rtp() {
305        // One payload in, one wire packet out — header shaped per
306        // RtpSendConfig, payload appended verbatim.
307        let (sender, receiver) = loopback_pair().await;
308        let remote = receiver.local_addr().unwrap();
309        let sender = Arc::new(sender);
310
311        let (tx, rx) = mpsc::channel::<Vec<u8>>(4);
312        let cancel = CancellationToken::new();
313
314        let task = tokio::spawn({
315            let sender = sender.clone();
316            let cancel = cancel.clone();
317            async move {
318                send_loop(
319                    sender,
320                    remote,
321                    RtpSendConfig {
322                        payload_type: 0, // PCMU
323                        ssrc: 0xCAFEBABE,
324                        initial_seq: 1000,
325                        initial_timestamp: 5000,
326                        samples_per_frame: 160,
327                    },
328                    rx,
329                    cancel,
330                )
331                .await;
332            }
333        });
334
335        // The payload is the 160-byte G.711 shape, but the loop is
336        // codec-agnostic — what matters is that those bytes appear
337        // verbatim after the 12-byte header.
338        let payload: Vec<u8> = (0..160).map(|i| i as u8).collect();
339        tx.send(payload.clone()).await.unwrap();
340
341        let mut buf = [0u8; 2048];
342        let (n, from) = tokio::time::timeout(
343            std::time::Duration::from_millis(500),
344            receiver.recv_from(&mut buf),
345        )
346        .await
347        .expect("receiver got packet in time")
348        .expect("recv_from ok");
349
350        assert_eq!(from, sender.local_addr().unwrap());
351        assert_eq!(n, 12 + 160);
352
353        let header = RtpHeader::parse(&buf[..n]).expect("parses as RTP");
354        assert_eq!(header.version, 2);
355        assert_eq!(header.payload_type, 0);
356        assert_eq!(header.sequence, 1000);
357        assert_eq!(header.timestamp, 5000);
358        assert_eq!(header.ssrc, 0xCAFEBABE);
359        assert_eq!(header.csrc_count, 0);
360        assert!(!header.marker);
361        assert!(!header.padding);
362        assert!(!header.extension);
363
364        assert_eq!(&buf[12..n], &payload[..]);
365
366        cancel.cancel();
367        let _ = task.await;
368    }
369
370    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
371    async fn send_loop_advances_seq_and_timestamp_per_packet() {
372        // Regression guard: a future refactor that "simplifies" the
373        // counters (e.g. forgets wrapping_add, or advances ts by 1
374        // instead of samples_per_frame) breaks interop without this test.
375        let (sender, receiver) = loopback_pair().await;
376        let remote = receiver.local_addr().unwrap();
377        let sender = Arc::new(sender);
378
379        let (tx, rx) = mpsc::channel::<Vec<u8>>(8);
380        let cancel = CancellationToken::new();
381
382        let task = tokio::spawn({
383            let sender = sender.clone();
384            let cancel = cancel.clone();
385            async move {
386                send_loop(
387                    sender,
388                    remote,
389                    RtpSendConfig {
390                        payload_type: 8, // PCMA
391                        ssrc: 0xDEADBEEF,
392                        initial_seq: u16::MAX, // exercise wrap on second packet
393                        initial_timestamp: 100,
394                        samples_per_frame: 160,
395                    },
396                    rx,
397                    cancel,
398                )
399                .await;
400            }
401        });
402
403        for i in 0..3u8 {
404            tx.send(vec![i; 4]).await.unwrap();
405        }
406
407        let mut headers = Vec::new();
408        let mut buf = [0u8; 2048];
409        for _ in 0..3 {
410            let (n, _) = tokio::time::timeout(
411                std::time::Duration::from_millis(500),
412                receiver.recv_from(&mut buf),
413            )
414            .await
415            .unwrap()
416            .unwrap();
417            headers.push(RtpHeader::parse(&buf[..n]).unwrap());
418        }
419
420        // seq: MAX, 0 (wrap), 1
421        assert_eq!(headers[0].sequence, u16::MAX);
422        assert_eq!(headers[1].sequence, 0);
423        assert_eq!(headers[2].sequence, 1);
424
425        // ts advances by samples_per_frame each step
426        assert_eq!(headers[0].timestamp, 100);
427        assert_eq!(headers[1].timestamp, 260);
428        assert_eq!(headers[2].timestamp, 420);
429
430        // PT pinned across packets
431        for h in &headers {
432            assert_eq!(h.payload_type, 8);
433            assert_eq!(h.ssrc, 0xDEADBEEF);
434        }
435
436        cancel.cancel();
437        let _ = task.await;
438    }
439
440    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
441    async fn send_loop_exits_when_payload_channel_closes() {
442        // Producer-driven shutdown: dropping the sender half of the
443        // mpsc must let send_loop return on its own, so the consumer
444        // doesn't have to remember to fire `cancel` for the happy path
445        // of "encoder finished, nothing more to send."
446        let (sender, _receiver) = loopback_pair().await;
447        let remote = "127.0.0.1:9".parse().unwrap();
448        let sender = Arc::new(sender);
449
450        let (tx, rx) = mpsc::channel::<Vec<u8>>(1);
451        let cancel = CancellationToken::new();
452
453        let task = tokio::spawn({
454            let sender = sender.clone();
455            let cancel = cancel.clone();
456            async move {
457                send_loop(
458                    sender,
459                    remote,
460                    RtpSendConfig {
461                        payload_type: 0,
462                        ssrc: 1,
463                        initial_seq: 0,
464                        initial_timestamp: 0,
465                        samples_per_frame: 160,
466                    },
467                    rx,
468                    cancel,
469                )
470                .await;
471            }
472        });
473
474        drop(tx);
475
476        tokio::time::timeout(std::time::Duration::from_millis(500), task)
477            .await
478            .expect("send_loop exited within timeout")
479            .expect("task did not panic");
480    }
481
482    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
483    async fn send_loop_exits_on_cancel() {
484        // Cancel-driven shutdown for the "we're done with this call"
485        // path — the producer may still be alive (the encoder task is
486        // shared across calls) but this dialog's loop must stop.
487        let (sender, _receiver) = loopback_pair().await;
488        let remote = "127.0.0.1:9".parse().unwrap();
489        let sender = Arc::new(sender);
490
491        let (_tx, rx) = mpsc::channel::<Vec<u8>>(1);
492        let cancel = CancellationToken::new();
493
494        let task = tokio::spawn({
495            let sender = sender.clone();
496            let cancel = cancel.clone();
497            async move {
498                send_loop(
499                    sender,
500                    remote,
501                    RtpSendConfig {
502                        payload_type: 0,
503                        ssrc: 1,
504                        initial_seq: 0,
505                        initial_timestamp: 0,
506                        samples_per_frame: 160,
507                    },
508                    rx,
509                    cancel,
510                )
511                .await;
512            }
513        });
514
515        cancel.cancel();
516        tokio::time::timeout(std::time::Duration::from_millis(500), task)
517            .await
518            .expect("send_loop exited within timeout")
519            .expect("task did not panic");
520    }
521}