Skip to main content

arcly_stream/protocol/
rtp.rs

1//! Shared RTP/RTCP parsing and H.264 depacketization (RFC 3550 + RFC 6184).
2//!
3//! Gated behind the internal `_rtp` marker, pulled in by both [`rtsp`] and
4//! [`webrtc`]. The two transports differ only in how RTP packets reach the
5//! process (TCP-interleaved / UDP for RTSP, DTLS-SRTP for WebRTC); once a packet
6//! is in hand, reassembling NAL units into an Annex-B access unit is identical,
7//! so it lives here once.
8//!
9//! [`rtsp`]: crate::protocol::rtsp
10//! [`webrtc`]: crate::protocol::webrtc
11//!
12//! # What it does
13//!
14//! - [`RtpHeader::parse`] decodes the fixed RTP header (RFC 3550 §5.1), honoring
15//!   the CSRC count and the extension-header flag to locate the payload.
16//! - [`H264Depacketizer`] turns a sequence of RTP payloads into complete H.264
17//!   access units in Annex-B form, handling the three NALU packetization modes
18//!   defined by RFC 6184: single NAL units, STAP-A aggregation (type 24), and
19//!   FU-A fragmentation (type 28). An access unit is emitted when the RTP marker
20//!   bit is set or the RTP timestamp advances.
21//!
22//! # What it does not do
23//!
24//! Jitter-buffer reordering and loss concealment are the caller's concern — the
25//! depacketizer assumes in-order delivery (true for TCP-interleaved RTSP; for
26//! UDP/SRTP a small reorder buffer should sit in front of it). It reports a
27//! [`DepacketizeError::OutOfOrder`] gap so a handler can request a keyframe
28//! (PLI/FIR) rather than emit a corrupt access unit.
29
30use bytes::Bytes;
31
32/// Annex-B start code prefixed to every reassembled NAL unit.
33const ANNEXB_START: [u8; 4] = [0, 0, 0, 1];
34
35/// A parsed RTP fixed header (RFC 3550 §5.1).
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub struct RtpHeader {
38    /// Payload type (7 bits) — identifies the codec/format binding from SDP.
39    pub payload_type: u8,
40    /// Marker bit. For H.264 it flags the last packet of an access unit.
41    pub marker: bool,
42    /// 16-bit sequence number, increments by one per packet (wraps).
43    pub sequence: u16,
44    /// 32-bit media timestamp in the payload's clock (90 kHz for H.264 video).
45    pub timestamp: u32,
46    /// Synchronization source identifier.
47    pub ssrc: u32,
48    /// Byte offset at which the payload begins (past CSRCs and any extension).
49    pub payload_offset: usize,
50}
51
52impl RtpHeader {
53    /// Parse the fixed header from the front of `buf`, returning the header and
54    /// the payload offset. Returns `None` if `buf` is too short or the version
55    /// field is not 2.
56    pub fn parse(buf: &[u8]) -> Option<RtpHeader> {
57        if buf.len() < 12 {
58            return None;
59        }
60        let version = buf[0] >> 6;
61        if version != 2 {
62            return None;
63        }
64        let has_extension = buf[0] & 0x10 != 0;
65        let csrc_count = (buf[0] & 0x0F) as usize;
66        let marker = buf[1] & 0x80 != 0;
67        let payload_type = buf[1] & 0x7F;
68        let sequence = u16::from_be_bytes([buf[2], buf[3]]);
69        let timestamp = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
70        let ssrc = u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]);
71
72        let mut offset = 12 + csrc_count * 4;
73        if has_extension {
74            // Extension header: 2-byte profile id, 2-byte length (in 32-bit words).
75            if buf.len() < offset + 4 {
76                return None;
77            }
78            let ext_words = u16::from_be_bytes([buf[offset + 2], buf[offset + 3]]) as usize;
79            offset += 4 + ext_words * 4;
80        }
81        if buf.len() < offset {
82            return None;
83        }
84        Some(RtpHeader {
85            payload_type,
86            marker,
87            sequence,
88            timestamp,
89            ssrc,
90            payload_offset: offset,
91        })
92    }
93}
94
95/// Errors surfaced while depacketizing an RTP stream.
96#[derive(Debug, Clone, Copy, PartialEq, Eq)]
97#[non_exhaustive]
98pub enum DepacketizeError {
99    /// The packet was shorter than the format requires.
100    Truncated,
101    /// A sequence-number discontinuity was detected mid-fragment; the partial
102    /// access unit was dropped. The handler should request a keyframe.
103    OutOfOrder,
104    /// An unsupported NAL/aggregation type was encountered.
105    Unsupported(u8),
106}
107
108/// Depacketizes RFC 3640 AAC-hbr RTP payloads into raw AAC access units.
109///
110/// The common RTSP/SDP profile for AAC (`mode=AAC-hbr`, `sizelength=13`,
111/// `indexlength=3`) frames each payload as a 2-byte **AU-headers-length** (in
112/// bits), followed by one 2-byte AU-header per access unit (13-bit size +
113/// 3-bit index), followed by the access units concatenated. One RTP packet may
114/// carry several AAC frames; [`push`](Self::push) returns each as a separate
115/// raw (ADTS-less) [`bytes::Bytes`].
116#[derive(Debug, Clone, Copy, Default)]
117pub struct AacDepacketizer {
118    /// Bits per AU-header `size` field (13 for AAC-hbr).
119    size_length: u8,
120    /// Bits per AU-header `index`/`index-delta` field (3 for AAC-hbr).
121    index_length: u8,
122}
123
124impl AacDepacketizer {
125    /// A depacketizer for the standard AAC-hbr profile (`sizelength=13`,
126    /// `indexlength=3`).
127    pub fn new() -> Self {
128        Self {
129            size_length: 13,
130            index_length: 3,
131        }
132    }
133
134    /// A depacketizer with explicit AU-header field widths from the SDP `fmtp`.
135    pub fn with_lengths(size_length: u8, index_length: u8) -> Self {
136        Self {
137            size_length,
138            index_length,
139        }
140    }
141
142    /// Split one RTP AAC-hbr payload into its constituent access units.
143    pub fn push(&self, payload: &[u8]) -> Result<Vec<Bytes>, DepacketizeError> {
144        if payload.len() < 2 {
145            return Err(DepacketizeError::Truncated);
146        }
147        // Sizes wider than a 16-bit AU-header field are unsupported (and would
148        // otherwise over-shift below). `with_lengths` can supply arbitrary widths.
149        if self.size_length == 0 || self.size_length > 16 {
150            return Err(DepacketizeError::Unsupported(self.size_length));
151        }
152        let header_bits = u16::from_be_bytes([payload[0], payload[1]]) as usize;
153        let au_header_bits = self.size_length as usize + self.index_length as usize;
154        if au_header_bits == 0 {
155            return Err(DepacketizeError::Unsupported(0));
156        }
157        let header_bytes = header_bits.div_ceil(8);
158        let au_count = header_bits / au_header_bits;
159        let headers = payload
160            .get(2..2 + header_bytes)
161            .ok_or(DepacketizeError::Truncated)?;
162        let mut data_off = 2 + header_bytes;
163        let mut out = Vec::with_capacity(au_count);
164        for i in 0..au_count {
165            // Each AU-header is `au_header_bits` wide; for AAC-hbr that is 16
166            // bits, so the size is the top `size_length` bits of a 2-byte field.
167            let bit = i * au_header_bits;
168            let byte = bit / 8;
169            let hdr = headers
170                .get(byte..byte + 2)
171                .ok_or(DepacketizeError::Truncated)?;
172            let size = (u16::from_be_bytes([hdr[0], hdr[1]]) >> (16 - self.size_length)) as usize;
173            let end = data_off + size;
174            let au = payload
175                .get(data_off..end)
176                .ok_or(DepacketizeError::Truncated)?;
177            out.push(Bytes::copy_from_slice(au));
178            data_off = end;
179        }
180        Ok(out)
181    }
182}
183
184/// Reassembles RFC 6184 H.264 RTP payloads into Annex-B access units.
185///
186/// Feed each packet's payload (the bytes after [`RtpHeader::payload_offset`])
187/// with its marker bit and timestamp to [`push`](Self::push). When a complete
188/// access unit is ready the method returns `Ok(Some(au))`, where `au` is the
189/// concatenated NAL units each prefixed with a 4-byte Annex-B start code —
190/// exactly the shape the codec parsers and `annexb_to_avcc` expect.
191#[derive(Debug, Default)]
192pub struct H264Depacketizer {
193    /// Bytes accumulated for the current access unit (Annex-B framed).
194    au: Vec<u8>,
195    /// FU-A reassembly buffer for the NAL currently being defragmented.
196    fua: Vec<u8>,
197    /// `true` while an FU-A fragment is in progress (between Start and End bits).
198    in_fragment: bool,
199    /// Reconstructed NAL header byte for the in-progress FU-A NAL.
200    fua_header: u8,
201    /// Timestamp of the access unit currently being assembled.
202    current_ts: Option<u32>,
203    /// Last sequence number seen (for gap detection during fragmentation).
204    last_seq: Option<u16>,
205}
206
207impl H264Depacketizer {
208    /// A fresh depacketizer with no in-progress access unit.
209    pub fn new() -> Self {
210        Self::default()
211    }
212
213    /// Append one NAL unit (Annex-B framed) to the current access unit.
214    fn append_nal(&mut self, nal: &[u8]) {
215        self.au.extend_from_slice(&ANNEXB_START);
216        self.au.extend_from_slice(nal);
217    }
218
219    /// Whether the pending access unit holds an IDR (type 5) NAL — a keyframe.
220    fn pending_is_keyframe(&self) -> bool {
221        // Scan the assembled Annex-B for a NAL header with type 5.
222        let mut i = 0;
223        while i + 4 < self.au.len() {
224            if self.au[i..i + 4] == ANNEXB_START {
225                let nal_type = self.au[i + 4] & 0x1F;
226                if nal_type == 5 {
227                    return true;
228                }
229            }
230            i += 1;
231        }
232        false
233    }
234
235    /// Emit and reset the pending access unit, if any.
236    fn take_au(&mut self) -> Option<AccessUnit> {
237        if self.au.is_empty() {
238            return None;
239        }
240        let keyframe = self.pending_is_keyframe();
241        let timestamp = self.current_ts.unwrap_or(0);
242        let data = Bytes::from(std::mem::take(&mut self.au));
243        self.current_ts = None;
244        Some(AccessUnit {
245            data,
246            timestamp,
247            keyframe,
248        })
249    }
250
251    /// Push one RTP H.264 payload. Returns a completed [`AccessUnit`] when the
252    /// marker bit closes the frame (or the timestamp advances to a new one).
253    pub fn push(
254        &mut self,
255        payload: &[u8],
256        marker: bool,
257        timestamp: u32,
258        sequence: u16,
259    ) -> Result<Option<AccessUnit>, DepacketizeError> {
260        if payload.is_empty() {
261            return Err(DepacketizeError::Truncated);
262        }
263
264        // A timestamp change flushes the previous access unit before starting the
265        // new one (some encoders omit the marker bit).
266        let mut completed = None;
267        if let Some(ts) = self.current_ts {
268            if ts != timestamp && !self.in_fragment {
269                completed = self.take_au();
270            }
271        }
272        self.current_ts = Some(timestamp);
273
274        let nal_type = payload[0] & 0x1F;
275        match nal_type {
276            1..=23 => {
277                // Single NAL unit packet — the payload *is* the NAL.
278                self.append_nal(payload);
279            }
280            24 => {
281                // STAP-A: one byte type, then [u16 size][nal]… aggregates.
282                let mut i = 1;
283                while i + 2 <= payload.len() {
284                    let size = u16::from_be_bytes([payload[i], payload[i + 1]]) as usize;
285                    i += 2;
286                    if i + size > payload.len() {
287                        return Err(DepacketizeError::Truncated);
288                    }
289                    self.append_nal(&payload[i..i + size]);
290                    i += size;
291                }
292            }
293            28 => {
294                // FU-A: byte0 = FU indicator, byte1 = FU header (S|E|R|type).
295                if payload.len() < 2 {
296                    return Err(DepacketizeError::Truncated);
297                }
298                let fu_header = payload[1];
299                let start = fu_header & 0x80 != 0;
300                let end = fu_header & 0x40 != 0;
301                let frag_type = fu_header & 0x1F;
302
303                if start {
304                    // Reconstruct the original NAL header: F|NRI from the indicator,
305                    // type from the FU header.
306                    self.fua_header = (payload[0] & 0xE0) | frag_type;
307                    self.fua.clear();
308                    self.fua.push(self.fua_header);
309                    self.in_fragment = true;
310                } else if !self.in_fragment {
311                    // Mid/last fragment with no start — lost the head.
312                    return Err(DepacketizeError::OutOfOrder);
313                } else if self.seq_gap(sequence) {
314                    self.in_fragment = false;
315                    self.fua.clear();
316                    return Err(DepacketizeError::OutOfOrder);
317                }
318                self.fua.extend_from_slice(&payload[2..]);
319
320                if end && self.in_fragment {
321                    let nal = std::mem::take(&mut self.fua);
322                    self.append_nal(&nal);
323                    self.in_fragment = false;
324                }
325            }
326            other => return Err(DepacketizeError::Unsupported(other)),
327        }
328
329        self.last_seq = Some(sequence);
330
331        if completed.is_some() {
332            return Ok(completed);
333        }
334        if marker {
335            return Ok(self.take_au());
336        }
337        Ok(None)
338    }
339
340    /// Detect a one-step sequence-number gap relative to the previous packet.
341    fn seq_gap(&self, sequence: u16) -> bool {
342        match self.last_seq {
343            Some(prev) => sequence.wrapping_sub(prev) != 1,
344            None => false,
345        }
346    }
347}
348
349/// A reassembled H.264 access unit in Annex-B form.
350#[derive(Debug, Clone, PartialEq, Eq)]
351pub struct AccessUnit {
352    /// Concatenated NAL units, each prefixed with a 4-byte start code.
353    pub data: Bytes,
354    /// RTP media timestamp (90 kHz) of the access unit.
355    pub timestamp: u32,
356    /// Whether the access unit contains an IDR (keyframe) NAL.
357    pub keyframe: bool,
358}
359
360#[cfg(test)]
361mod tests {
362    use super::*;
363
364    /// Build a minimal 12-byte RTP packet with the given fields and payload.
365    fn rtp(seq: u16, ts: u32, marker: bool, payload: &[u8]) -> Vec<u8> {
366        let mut p = vec![0x80, if marker { 0x80 | 96 } else { 96 }];
367        p.extend_from_slice(&seq.to_be_bytes());
368        p.extend_from_slice(&ts.to_be_bytes());
369        p.extend_from_slice(&[0, 0, 0, 1]); // ssrc
370        p.extend_from_slice(payload);
371        p
372    }
373
374    #[test]
375    fn parses_fixed_header_and_payload_offset() {
376        let pkt = rtp(7, 9000, true, &[0x65, 0xAA]);
377        let h = RtpHeader::parse(&pkt).unwrap();
378        assert_eq!(h.sequence, 7);
379        assert_eq!(h.timestamp, 9000);
380        assert!(h.marker);
381        assert_eq!(h.payload_type, 96);
382        assert_eq!(h.payload_offset, 12);
383        assert_eq!(&pkt[h.payload_offset..], &[0x65, 0xAA]);
384    }
385
386    #[test]
387    fn rejects_wrong_version_and_short_buffers() {
388        assert!(RtpHeader::parse(&[0x00; 12]).is_none()); // version 0
389        assert!(RtpHeader::parse(&[0x80; 4]).is_none()); // too short
390    }
391
392    #[test]
393    fn honors_csrc_count_in_payload_offset() {
394        let mut pkt = rtp(1, 0, false, &[0x41]);
395        pkt[0] = 0x82; // version 2, CSRC count = 2
396        let mut with_csrc = pkt[..12].to_vec();
397        with_csrc.extend_from_slice(&[0xDE, 0xAD, 0xBE, 0xEF, 0, 0, 0, 0]); // 2 CSRCs
398        with_csrc.push(0x41);
399        let h = RtpHeader::parse(&with_csrc).unwrap();
400        assert_eq!(h.payload_offset, 20);
401    }
402
403    #[test]
404    fn aac_hbr_splits_two_access_units() {
405        // AU-headers-length = 32 bits → two 16-bit AU-headers.
406        // AU sizes 3 and 2 (top 13 bits of each 2-byte header).
407        let mut p = Vec::new();
408        p.extend_from_slice(&32u16.to_be_bytes()); // header bits
409        p.extend_from_slice(&((3u16) << 3).to_be_bytes()); // AU-header: size 3
410        p.extend_from_slice(&((2u16) << 3).to_be_bytes()); // AU-header: size 2
411        p.extend_from_slice(&[0xA1, 0xA2, 0xA3]); // AU 1
412        p.extend_from_slice(&[0xB1, 0xB2]); // AU 2
413        let aus = AacDepacketizer::new().push(&p).unwrap();
414        assert_eq!(aus.len(), 2);
415        assert_eq!(&aus[0][..], &[0xA1, 0xA2, 0xA3]);
416        assert_eq!(&aus[1][..], &[0xB1, 0xB2]);
417    }
418
419    #[test]
420    fn aac_hbr_single_au() {
421        let mut p = Vec::new();
422        p.extend_from_slice(&16u16.to_be_bytes()); // one 16-bit AU-header
423        p.extend_from_slice(&((4u16) << 3).to_be_bytes()); // size 4
424        p.extend_from_slice(&[1, 2, 3, 4]);
425        let aus = AacDepacketizer::new().push(&p).unwrap();
426        assert_eq!(aus.len(), 1);
427        assert_eq!(&aus[0][..], &[1, 2, 3, 4]);
428    }
429
430    #[test]
431    fn aac_truncated_payload_errors() {
432        assert_eq!(
433            AacDepacketizer::new().push(&[0x00]),
434            Err(DepacketizeError::Truncated)
435        );
436        // Declares one AU of size 8 but supplies only 2 data bytes.
437        let mut p = 16u16.to_be_bytes().to_vec();
438        p.extend_from_slice(&((8u16) << 3).to_be_bytes());
439        p.extend_from_slice(&[1, 2]);
440        assert_eq!(
441            AacDepacketizer::new().push(&p),
442            Err(DepacketizeError::Truncated)
443        );
444    }
445
446    #[test]
447    fn single_nal_packet_emits_annexb_on_marker() {
448        let mut d = H264Depacketizer::new();
449        // Type 1 (non-IDR slice), marker set → one access unit.
450        let out = d.push(&[0x41, 0x9A, 0xBC], true, 3000, 1).unwrap().unwrap();
451        assert_eq!(&out.data[..], &[0, 0, 0, 1, 0x41, 0x9A, 0xBC]);
452        assert!(!out.keyframe);
453        assert_eq!(out.timestamp, 3000);
454    }
455
456    #[test]
457    fn idr_single_nal_is_flagged_keyframe() {
458        let mut d = H264Depacketizer::new();
459        let out = d.push(&[0x65, 0x01], true, 0, 1).unwrap().unwrap();
460        assert!(out.keyframe);
461    }
462
463    #[test]
464    fn stap_a_splits_aggregated_nals() {
465        // STAP-A (24): [24][size=2][AA BB][size=3][CC DD EE]
466        let payload = [24, 0, 2, 0xAA, 0xBB, 0, 3, 0xCC, 0xDD, 0xEE];
467        let mut d = H264Depacketizer::new();
468        let out = d.push(&payload, true, 0, 1).unwrap().unwrap();
469        assert_eq!(
470            &out.data[..],
471            &[0, 0, 0, 1, 0xAA, 0xBB, 0, 0, 0, 1, 0xCC, 0xDD, 0xEE]
472        );
473    }
474
475    #[test]
476    fn fu_a_reassembles_fragmented_nal() {
477        let mut d = H264Depacketizer::new();
478        // FU indicator 0x7C (F=0,NRI=3,type=28), FU header start 0x85 (S=1,type=5).
479        assert!(d
480            .push(&[0x7C, 0x85, 0x11, 0x22], false, 0, 1)
481            .unwrap()
482            .is_none());
483        // Middle fragment (S=0,E=0).
484        assert!(d.push(&[0x7C, 0x05, 0x33], false, 0, 2).unwrap().is_none());
485        // End fragment (E=1), marker closes the AU.
486        let out = d.push(&[0x7C, 0x45, 0x44], true, 0, 3).unwrap().unwrap();
487        // Reconstructed NAL header: NRI 0x60 | type 5 = 0x65, then payload bytes.
488        assert_eq!(&out.data[..], &[0, 0, 0, 1, 0x65, 0x11, 0x22, 0x33, 0x44]);
489        assert!(out.keyframe);
490    }
491
492    #[test]
493    fn fu_a_sequence_gap_reports_out_of_order() {
494        let mut d = H264Depacketizer::new();
495        d.push(&[0x7C, 0x85, 0x11], false, 0, 1).unwrap();
496        // Jump from seq 1 to seq 5 mid-fragment.
497        assert_eq!(
498            d.push(&[0x7C, 0x05, 0x22], false, 0, 5),
499            Err(DepacketizeError::OutOfOrder)
500        );
501    }
502
503    #[test]
504    fn timestamp_change_flushes_previous_au_without_marker() {
505        let mut d = H264Depacketizer::new();
506        // First AU, no marker.
507        assert!(d.push(&[0x41, 0x01], false, 1000, 1).unwrap().is_none());
508        // New timestamp flushes the first AU.
509        let out = d.push(&[0x41, 0x02], false, 2000, 2).unwrap().unwrap();
510        assert_eq!(out.timestamp, 1000);
511        assert_eq!(&out.data[..], &[0, 0, 0, 1, 0x41, 0x01]);
512    }
513}