Skip to main content

wavekat_sip/
rtp.rs

1//! RTP header parsing and a basic receive loop.
2//!
3//! This module intentionally stops at the wire: it parses RTP headers
4//! (RFC 3550) and exposes a debug-friendly receive loop. Decoding payloads
5//! (G.711, Opus, …), jitter buffering, and audio device routing are
6//! consumer-layer concerns and live outside this crate.
7
8use tokio::net::UdpSocket;
9use tokio::select;
10use tokio_util::sync::CancellationToken;
11use tracing::{debug, info, warn};
12
13/// Parsed RTP packet header (RFC 3550, 12 bytes minimum).
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub struct RtpHeader {
16    pub version: u8,
17    pub padding: bool,
18    pub extension: bool,
19    pub csrc_count: u8,
20    pub marker: bool,
21    pub payload_type: u8,
22    pub sequence: u16,
23    pub timestamp: u32,
24    pub ssrc: u32,
25}
26
27impl RtpHeader {
28    /// Parse an RTP header from a buffer. Returns `None` if the buffer is
29    /// too short or the version field is not 2.
30    pub fn parse(buf: &[u8]) -> Option<Self> {
31        if buf.len() < 12 {
32            return None;
33        }
34
35        let version = (buf[0] >> 6) & 0x03;
36        if version != 2 {
37            return None;
38        }
39
40        let padding = (buf[0] >> 5) & 0x01 != 0;
41        let extension = (buf[0] >> 4) & 0x01 != 0;
42        let csrc_count = buf[0] & 0x0F;
43        let marker = (buf[1] >> 7) & 0x01 != 0;
44        let payload_type = buf[1] & 0x7F;
45        let sequence = u16::from_be_bytes([buf[2], buf[3]]);
46        let timestamp = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
47        let ssrc = u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]);
48
49        Some(Self {
50            version,
51            padding,
52            extension,
53            csrc_count,
54            marker,
55            payload_type,
56            sequence,
57            timestamp,
58            ssrc,
59        })
60    }
61
62    /// Total header length in bytes (12 + 4 * CSRC count).
63    pub fn header_len(&self) -> usize {
64        12 + 4 * self.csrc_count as usize
65    }
66}
67
68/// Human-friendly name for a static RTP payload type.
69fn payload_type_name(pt: u8) -> &'static str {
70    match pt {
71        0 => "PCMU",
72        8 => "PCMA",
73        _ => "unknown",
74    }
75}
76
77/// Receive RTP packets on the given socket and trace their headers until
78/// cancelled. Useful for smoke-testing inbound media without wiring up an
79/// audio path.
80pub async fn receive_rtp(socket: UdpSocket, cancel: CancellationToken) {
81    let mut buf = [0u8; 2048];
82    let mut count = 0u64;
83
84    let local = socket
85        .local_addr()
86        .map(|a| a.to_string())
87        .unwrap_or_else(|_| "<unknown>".into());
88    info!("RTP receiver started on {local}");
89
90    loop {
91        select! {
92            result = socket.recv_from(&mut buf) => {
93                match result {
94                    Ok((len, from)) => {
95                        if let Some(header) = RtpHeader::parse(&buf[..len]) {
96                            count += 1;
97                            let payload_len = len.saturating_sub(header.header_len());
98                            debug!(
99                                "RTP #{} | PT={} ({}) | TS={} | SSRC=0x{:08X} | {} bytes from {}",
100                                header.sequence,
101                                header.payload_type,
102                                payload_type_name(header.payload_type),
103                                header.timestamp,
104                                header.ssrc,
105                                payload_len,
106                                from,
107                            );
108
109                            if count.is_multiple_of(100) {
110                                info!("Received {count} RTP packets so far");
111                            }
112                        } else {
113                            warn!("Non-RTP packet ({len} bytes) from {from}");
114                        }
115                    }
116                    Err(e) => {
117                        warn!("RTP recv error: {e}");
118                        break;
119                    }
120                }
121            }
122            _ = cancel.cancelled() => break,
123        }
124    }
125
126    info!("RTP receiver stopped. Total packets: {count}");
127}
128
129#[cfg(test)]
130mod tests {
131    use super::*;
132
133    fn make_packet(version: u8, pt: u8, seq: u16, ts: u32, ssrc: u32) -> Vec<u8> {
134        let mut buf = vec![0u8; 12];
135        buf[0] = (version << 6) & 0xC0; // version, no padding/ext/csrc
136        buf[1] = pt & 0x7F; // no marker, payload type
137        buf[2..4].copy_from_slice(&seq.to_be_bytes());
138        buf[4..8].copy_from_slice(&ts.to_be_bytes());
139        buf[8..12].copy_from_slice(&ssrc.to_be_bytes());
140        buf
141    }
142
143    #[test]
144    fn parse_minimum_header() {
145        let buf = make_packet(2, 0, 1234, 5678, 0xDEADBEEF);
146        let h = RtpHeader::parse(&buf).unwrap();
147        assert_eq!(h.version, 2);
148        assert_eq!(h.payload_type, 0);
149        assert_eq!(h.sequence, 1234);
150        assert_eq!(h.timestamp, 5678);
151        assert_eq!(h.ssrc, 0xDEADBEEF);
152        assert_eq!(h.csrc_count, 0);
153        assert_eq!(h.header_len(), 12);
154    }
155
156    #[test]
157    fn parse_rejects_short_buffer() {
158        let buf = vec![0u8; 11];
159        assert!(RtpHeader::parse(&buf).is_none());
160    }
161
162    #[test]
163    fn parse_rejects_wrong_version() {
164        let buf = make_packet(1, 0, 0, 0, 0);
165        assert!(RtpHeader::parse(&buf).is_none());
166    }
167
168    #[test]
169    fn parse_extracts_marker_bit() {
170        let mut buf = make_packet(2, 8, 0, 0, 0);
171        buf[1] |= 0x80; // set marker
172        let h = RtpHeader::parse(&buf).unwrap();
173        assert!(h.marker);
174        assert_eq!(h.payload_type, 8); // PCMA
175    }
176
177    #[test]
178    fn header_len_accounts_for_csrcs() {
179        let mut buf = make_packet(2, 0, 0, 0, 0);
180        // Set CSRC count to 3, then extend buffer so length check passes.
181        buf[0] |= 0x03;
182        buf.extend(std::iter::repeat_n(0u8, 12));
183        let h = RtpHeader::parse(&buf).unwrap();
184        assert_eq!(h.csrc_count, 3);
185        assert_eq!(h.header_len(), 24);
186    }
187
188    #[test]
189    fn payload_type_names() {
190        assert_eq!(payload_type_name(0), "PCMU");
191        assert_eq!(payload_type_name(8), "PCMA");
192        assert_eq!(payload_type_name(127), "unknown");
193    }
194}