Skip to main content

arcly_stream/protocol/
rtp.rs

1//! Shared RTP/RTCP parsing and H.264 depacketization (RFC 3550 + RFC 6184).
2//!
3//! Gated behind the internal `_rtp` marker, pulled in by both [`rtsp`] and
4//! [`webrtc`]. The two transports differ only in how RTP packets reach the
5//! process (TCP-interleaved / UDP for RTSP, DTLS-SRTP for WebRTC); once a packet
6//! is in hand, reassembling NAL units into an Annex-B access unit is identical,
7//! so it lives here once.
8//!
9//! [`rtsp`]: crate::protocol::rtsp
10//! [`webrtc`]: crate::protocol::webrtc
11//!
12//! # What it does
13//!
14//! - [`RtpHeader::parse`] decodes the fixed RTP header (RFC 3550 §5.1), honoring
15//!   the CSRC count and the extension-header flag to locate the payload.
16//! - [`H264Depacketizer`] turns a sequence of RTP payloads into complete H.264
17//!   access units in Annex-B form, handling the three NALU packetization modes
18//!   defined by RFC 6184: single NAL units, STAP-A aggregation (type 24), and
19//!   FU-A fragmentation (type 28). An access unit is emitted when the RTP marker
20//!   bit is set or the RTP timestamp advances.
21//!
22//! # What it does not do
23//!
24//! Jitter-buffer reordering and loss concealment are the caller's concern — the
25//! depacketizer assumes in-order delivery (true for TCP-interleaved RTSP; for
26//! UDP/SRTP a small reorder buffer should sit in front of it). It reports a
27//! [`DepacketizeError::OutOfOrder`] gap so a handler can request a keyframe
28//! (PLI/FIR) rather than emit a corrupt access unit.
29
30use bytes::Bytes;
31
32/// Annex-B start code prefixed to every reassembled NAL unit.
33const ANNEXB_START: [u8; 4] = [0, 0, 0, 1];
34
35/// A parsed RTP fixed header (RFC 3550 §5.1).
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub struct RtpHeader {
38    /// Payload type (7 bits) — identifies the codec/format binding from SDP.
39    pub payload_type: u8,
40    /// Marker bit. For H.264 it flags the last packet of an access unit.
41    pub marker: bool,
42    /// 16-bit sequence number, increments by one per packet (wraps).
43    pub sequence: u16,
44    /// 32-bit media timestamp in the payload's clock (90 kHz for H.264 video).
45    pub timestamp: u32,
46    /// Synchronization source identifier.
47    pub ssrc: u32,
48    /// Byte offset at which the payload begins (past CSRCs and any extension).
49    pub payload_offset: usize,
50}
51
52impl RtpHeader {
53    /// Parse the fixed header from the front of `buf`, returning the header and
54    /// the payload offset. Returns `None` if `buf` is too short or the version
55    /// field is not 2.
56    pub fn parse(buf: &[u8]) -> Option<RtpHeader> {
57        if buf.len() < 12 {
58            return None;
59        }
60        let version = buf[0] >> 6;
61        if version != 2 {
62            return None;
63        }
64        let has_extension = buf[0] & 0x10 != 0;
65        let csrc_count = (buf[0] & 0x0F) as usize;
66        let marker = buf[1] & 0x80 != 0;
67        let payload_type = buf[1] & 0x7F;
68        let sequence = u16::from_be_bytes([buf[2], buf[3]]);
69        let timestamp = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
70        let ssrc = u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]);
71
72        let mut offset = 12 + csrc_count * 4;
73        if has_extension {
74            // Extension header: 2-byte profile id, 2-byte length (in 32-bit words).
75            if buf.len() < offset + 4 {
76                return None;
77            }
78            let ext_words = u16::from_be_bytes([buf[offset + 2], buf[offset + 3]]) as usize;
79            offset += 4 + ext_words * 4;
80        }
81        if buf.len() < offset {
82            return None;
83        }
84        Some(RtpHeader {
85            payload_type,
86            marker,
87            sequence,
88            timestamp,
89            ssrc,
90            payload_offset: offset,
91        })
92    }
93}
94
95/// Errors surfaced while depacketizing an RTP stream.
96#[derive(Debug, Clone, Copy, PartialEq, Eq)]
97#[non_exhaustive]
98pub enum DepacketizeError {
99    /// The packet was shorter than the format requires.
100    Truncated,
101    /// A sequence-number discontinuity was detected mid-fragment; the partial
102    /// access unit was dropped. The handler should request a keyframe.
103    OutOfOrder,
104    /// An unsupported NAL/aggregation type was encountered.
105    Unsupported(u8),
106}
107
108/// Reassembles RFC 6184 H.264 RTP payloads into Annex-B access units.
109///
110/// Feed each packet's payload (the bytes after [`RtpHeader::payload_offset`])
111/// with its marker bit and timestamp to [`push`](Self::push). When a complete
112/// access unit is ready the method returns `Ok(Some(au))`, where `au` is the
113/// concatenated NAL units each prefixed with a 4-byte Annex-B start code —
114/// exactly the shape the codec parsers and `annexb_to_avcc` expect.
115#[derive(Debug, Default)]
116pub struct H264Depacketizer {
117    /// Bytes accumulated for the current access unit (Annex-B framed).
118    au: Vec<u8>,
119    /// FU-A reassembly buffer for the NAL currently being defragmented.
120    fua: Vec<u8>,
121    /// `true` while an FU-A fragment is in progress (between Start and End bits).
122    in_fragment: bool,
123    /// Reconstructed NAL header byte for the in-progress FU-A NAL.
124    fua_header: u8,
125    /// Timestamp of the access unit currently being assembled.
126    current_ts: Option<u32>,
127    /// Last sequence number seen (for gap detection during fragmentation).
128    last_seq: Option<u16>,
129}
130
131impl H264Depacketizer {
132    /// A fresh depacketizer with no in-progress access unit.
133    pub fn new() -> Self {
134        Self::default()
135    }
136
137    /// Append one NAL unit (Annex-B framed) to the current access unit.
138    fn append_nal(&mut self, nal: &[u8]) {
139        self.au.extend_from_slice(&ANNEXB_START);
140        self.au.extend_from_slice(nal);
141    }
142
143    /// Whether the pending access unit holds an IDR (type 5) NAL — a keyframe.
144    fn pending_is_keyframe(&self) -> bool {
145        // Scan the assembled Annex-B for a NAL header with type 5.
146        let mut i = 0;
147        while i + 4 < self.au.len() {
148            if self.au[i..i + 4] == ANNEXB_START {
149                let nal_type = self.au[i + 4] & 0x1F;
150                if nal_type == 5 {
151                    return true;
152                }
153            }
154            i += 1;
155        }
156        false
157    }
158
159    /// Emit and reset the pending access unit, if any.
160    fn take_au(&mut self) -> Option<AccessUnit> {
161        if self.au.is_empty() {
162            return None;
163        }
164        let keyframe = self.pending_is_keyframe();
165        let timestamp = self.current_ts.unwrap_or(0);
166        let data = Bytes::from(std::mem::take(&mut self.au));
167        self.current_ts = None;
168        Some(AccessUnit {
169            data,
170            timestamp,
171            keyframe,
172        })
173    }
174
175    /// Push one RTP H.264 payload. Returns a completed [`AccessUnit`] when the
176    /// marker bit closes the frame (or the timestamp advances to a new one).
177    pub fn push(
178        &mut self,
179        payload: &[u8],
180        marker: bool,
181        timestamp: u32,
182        sequence: u16,
183    ) -> Result<Option<AccessUnit>, DepacketizeError> {
184        if payload.is_empty() {
185            return Err(DepacketizeError::Truncated);
186        }
187
188        // A timestamp change flushes the previous access unit before starting the
189        // new one (some encoders omit the marker bit).
190        let mut completed = None;
191        if let Some(ts) = self.current_ts {
192            if ts != timestamp && !self.in_fragment {
193                completed = self.take_au();
194            }
195        }
196        self.current_ts = Some(timestamp);
197
198        let nal_type = payload[0] & 0x1F;
199        match nal_type {
200            1..=23 => {
201                // Single NAL unit packet — the payload *is* the NAL.
202                self.append_nal(payload);
203            }
204            24 => {
205                // STAP-A: one byte type, then [u16 size][nal]… aggregates.
206                let mut i = 1;
207                while i + 2 <= payload.len() {
208                    let size = u16::from_be_bytes([payload[i], payload[i + 1]]) as usize;
209                    i += 2;
210                    if i + size > payload.len() {
211                        return Err(DepacketizeError::Truncated);
212                    }
213                    self.append_nal(&payload[i..i + size]);
214                    i += size;
215                }
216            }
217            28 => {
218                // FU-A: byte0 = FU indicator, byte1 = FU header (S|E|R|type).
219                if payload.len() < 2 {
220                    return Err(DepacketizeError::Truncated);
221                }
222                let fu_header = payload[1];
223                let start = fu_header & 0x80 != 0;
224                let end = fu_header & 0x40 != 0;
225                let frag_type = fu_header & 0x1F;
226
227                if start {
228                    // Reconstruct the original NAL header: F|NRI from the indicator,
229                    // type from the FU header.
230                    self.fua_header = (payload[0] & 0xE0) | frag_type;
231                    self.fua.clear();
232                    self.fua.push(self.fua_header);
233                    self.in_fragment = true;
234                } else if !self.in_fragment {
235                    // Mid/last fragment with no start — lost the head.
236                    return Err(DepacketizeError::OutOfOrder);
237                } else if self.seq_gap(sequence) {
238                    self.in_fragment = false;
239                    self.fua.clear();
240                    return Err(DepacketizeError::OutOfOrder);
241                }
242                self.fua.extend_from_slice(&payload[2..]);
243
244                if end && self.in_fragment {
245                    let nal = std::mem::take(&mut self.fua);
246                    self.append_nal(&nal);
247                    self.in_fragment = false;
248                }
249            }
250            other => return Err(DepacketizeError::Unsupported(other)),
251        }
252
253        self.last_seq = Some(sequence);
254
255        if completed.is_some() {
256            return Ok(completed);
257        }
258        if marker {
259            return Ok(self.take_au());
260        }
261        Ok(None)
262    }
263
264    /// Detect a one-step sequence-number gap relative to the previous packet.
265    fn seq_gap(&self, sequence: u16) -> bool {
266        match self.last_seq {
267            Some(prev) => sequence.wrapping_sub(prev) != 1,
268            None => false,
269        }
270    }
271}
272
273/// A reassembled H.264 access unit in Annex-B form.
274#[derive(Debug, Clone, PartialEq, Eq)]
275pub struct AccessUnit {
276    /// Concatenated NAL units, each prefixed with a 4-byte start code.
277    pub data: Bytes,
278    /// RTP media timestamp (90 kHz) of the access unit.
279    pub timestamp: u32,
280    /// Whether the access unit contains an IDR (keyframe) NAL.
281    pub keyframe: bool,
282}
283
284#[cfg(test)]
285mod tests {
286    use super::*;
287
288    /// Build a minimal 12-byte RTP packet with the given fields and payload.
289    fn rtp(seq: u16, ts: u32, marker: bool, payload: &[u8]) -> Vec<u8> {
290        let mut p = vec![0x80, if marker { 0x80 | 96 } else { 96 }];
291        p.extend_from_slice(&seq.to_be_bytes());
292        p.extend_from_slice(&ts.to_be_bytes());
293        p.extend_from_slice(&[0, 0, 0, 1]); // ssrc
294        p.extend_from_slice(payload);
295        p
296    }
297
298    #[test]
299    fn parses_fixed_header_and_payload_offset() {
300        let pkt = rtp(7, 9000, true, &[0x65, 0xAA]);
301        let h = RtpHeader::parse(&pkt).unwrap();
302        assert_eq!(h.sequence, 7);
303        assert_eq!(h.timestamp, 9000);
304        assert!(h.marker);
305        assert_eq!(h.payload_type, 96);
306        assert_eq!(h.payload_offset, 12);
307        assert_eq!(&pkt[h.payload_offset..], &[0x65, 0xAA]);
308    }
309
310    #[test]
311    fn rejects_wrong_version_and_short_buffers() {
312        assert!(RtpHeader::parse(&[0x00; 12]).is_none()); // version 0
313        assert!(RtpHeader::parse(&[0x80; 4]).is_none()); // too short
314    }
315
316    #[test]
317    fn honors_csrc_count_in_payload_offset() {
318        let mut pkt = rtp(1, 0, false, &[0x41]);
319        pkt[0] = 0x82; // version 2, CSRC count = 2
320        let mut with_csrc = pkt[..12].to_vec();
321        with_csrc.extend_from_slice(&[0xDE, 0xAD, 0xBE, 0xEF, 0, 0, 0, 0]); // 2 CSRCs
322        with_csrc.push(0x41);
323        let h = RtpHeader::parse(&with_csrc).unwrap();
324        assert_eq!(h.payload_offset, 20);
325    }
326
327    #[test]
328    fn single_nal_packet_emits_annexb_on_marker() {
329        let mut d = H264Depacketizer::new();
330        // Type 1 (non-IDR slice), marker set → one access unit.
331        let out = d.push(&[0x41, 0x9A, 0xBC], true, 3000, 1).unwrap().unwrap();
332        assert_eq!(&out.data[..], &[0, 0, 0, 1, 0x41, 0x9A, 0xBC]);
333        assert!(!out.keyframe);
334        assert_eq!(out.timestamp, 3000);
335    }
336
337    #[test]
338    fn idr_single_nal_is_flagged_keyframe() {
339        let mut d = H264Depacketizer::new();
340        let out = d.push(&[0x65, 0x01], true, 0, 1).unwrap().unwrap();
341        assert!(out.keyframe);
342    }
343
344    #[test]
345    fn stap_a_splits_aggregated_nals() {
346        // STAP-A (24): [24][size=2][AA BB][size=3][CC DD EE]
347        let payload = [24, 0, 2, 0xAA, 0xBB, 0, 3, 0xCC, 0xDD, 0xEE];
348        let mut d = H264Depacketizer::new();
349        let out = d.push(&payload, true, 0, 1).unwrap().unwrap();
350        assert_eq!(
351            &out.data[..],
352            &[0, 0, 0, 1, 0xAA, 0xBB, 0, 0, 0, 1, 0xCC, 0xDD, 0xEE]
353        );
354    }
355
356    #[test]
357    fn fu_a_reassembles_fragmented_nal() {
358        let mut d = H264Depacketizer::new();
359        // FU indicator 0x7C (F=0,NRI=3,type=28), FU header start 0x85 (S=1,type=5).
360        assert!(d
361            .push(&[0x7C, 0x85, 0x11, 0x22], false, 0, 1)
362            .unwrap()
363            .is_none());
364        // Middle fragment (S=0,E=0).
365        assert!(d.push(&[0x7C, 0x05, 0x33], false, 0, 2).unwrap().is_none());
366        // End fragment (E=1), marker closes the AU.
367        let out = d.push(&[0x7C, 0x45, 0x44], true, 0, 3).unwrap().unwrap();
368        // Reconstructed NAL header: NRI 0x60 | type 5 = 0x65, then payload bytes.
369        assert_eq!(&out.data[..], &[0, 0, 0, 1, 0x65, 0x11, 0x22, 0x33, 0x44]);
370        assert!(out.keyframe);
371    }
372
373    #[test]
374    fn fu_a_sequence_gap_reports_out_of_order() {
375        let mut d = H264Depacketizer::new();
376        d.push(&[0x7C, 0x85, 0x11], false, 0, 1).unwrap();
377        // Jump from seq 1 to seq 5 mid-fragment.
378        assert_eq!(
379            d.push(&[0x7C, 0x05, 0x22], false, 0, 5),
380            Err(DepacketizeError::OutOfOrder)
381        );
382    }
383
384    #[test]
385    fn timestamp_change_flushes_previous_au_without_marker() {
386        let mut d = H264Depacketizer::new();
387        // First AU, no marker.
388        assert!(d.push(&[0x41, 0x01], false, 1000, 1).unwrap().is_none());
389        // New timestamp flushes the first AU.
390        let out = d.push(&[0x41, 0x02], false, 2000, 2).unwrap().unwrap();
391        assert_eq!(out.timestamp, 1000);
392        assert_eq!(&out.data[..], &[0, 0, 0, 1, 0x41, 0x01]);
393    }
394}