Skip to main content

arcly_stream/protocol/
rtp.rs

1//! Shared RTP/RTCP parsing and H.264 depacketization (RFC 3550 + RFC 6184).
2//!
3//! Gated behind the internal `_rtp` marker, pulled in by both [`rtsp`] and
4//! [`webrtc`]. The two transports differ only in how RTP packets reach the
5//! process (TCP-interleaved / UDP for RTSP, DTLS-SRTP for WebRTC); once a packet
6//! is in hand, reassembling NAL units into an Annex-B access unit is identical,
7//! so it lives here once.
8//!
9//! [`rtsp`]: crate::protocol::rtsp
10//! [`webrtc`]: crate::protocol::webrtc
11//!
12//! # What it does
13//!
14//! - [`RtpHeader::parse`] decodes the fixed RTP header (RFC 3550 §5.1), honoring
15//!   the CSRC count and the extension-header flag to locate the payload.
16//! - [`H264Depacketizer`] turns a sequence of RTP payloads into complete H.264
17//!   access units in Annex-B form, handling the three NALU packetization modes
18//!   defined by RFC 6184: single NAL units, STAP-A aggregation (type 24), and
19//!   FU-A fragmentation (type 28). An access unit is emitted when the RTP marker
20//!   bit is set or the RTP timestamp advances.
21//!
22//! # What it does not do
23//!
24//! Jitter-buffer reordering and loss concealment are the caller's concern — the
25//! depacketizer assumes in-order delivery (true for TCP-interleaved RTSP; for
26//! UDP/SRTP a small reorder buffer should sit in front of it). It reports a
27//! [`DepacketizeError::OutOfOrder`] gap so a handler can request a keyframe
28//! (PLI/FIR) rather than emit a corrupt access unit.
29
30use bytes::Bytes;
31
32/// Annex-B start code prefixed to every reassembled NAL unit.
33const ANNEXB_START: [u8; 4] = [0, 0, 0, 1];
34
35/// A parsed RTP fixed header (RFC 3550 §5.1).
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub struct RtpHeader {
38    /// Payload type (7 bits) — identifies the codec/format binding from SDP.
39    pub payload_type: u8,
40    /// Marker bit. For H.264 it flags the last packet of an access unit.
41    pub marker: bool,
42    /// 16-bit sequence number, increments by one per packet (wraps).
43    pub sequence: u16,
44    /// 32-bit media timestamp in the payload's clock (90 kHz for H.264 video).
45    pub timestamp: u32,
46    /// Synchronization source identifier.
47    pub ssrc: u32,
48    /// Byte offset at which the payload begins (past CSRCs and any extension).
49    pub payload_offset: usize,
50}
51
52impl RtpHeader {
53    /// Parse the fixed header from the front of `buf`, returning the header and
54    /// the payload offset. Returns `None` if `buf` is too short or the version
55    /// field is not 2.
56    pub fn parse(buf: &[u8]) -> Option<RtpHeader> {
57        use super::byteops::ByteReader;
58        let mut r = ByteReader::new(buf);
59        let b0 = r.u8()?;
60        if b0 >> 6 != 2 {
61            return None; // RTP version must be 2
62        }
63        let has_extension = b0 & 0x10 != 0;
64        let csrc_count = (b0 & 0x0F) as usize;
65        let b1 = r.u8()?;
66        let marker = b1 & 0x80 != 0;
67        let payload_type = b1 & 0x7F;
68        let sequence = r.u16_be()?;
69        let timestamp = r.u32_be()?;
70        let ssrc = r.u32_be()?;
71        r.skip(csrc_count * 4)?; // CSRC list
72
73        if has_extension {
74            // Extension header: 2-byte profile id, 2-byte length (in 32-bit words).
75            r.skip(2)?;
76            let ext_words = r.u16_be()? as usize;
77            r.skip(ext_words * 4)?;
78        }
79        Some(RtpHeader {
80            payload_type,
81            marker,
82            sequence,
83            timestamp,
84            ssrc,
85            payload_offset: r.position(),
86        })
87    }
88}
89
90/// Errors surfaced while depacketizing an RTP stream.
91#[derive(Debug, Clone, Copy, PartialEq, Eq)]
92#[non_exhaustive]
93pub enum DepacketizeError {
94    /// The packet was shorter than the format requires.
95    Truncated,
96    /// A sequence-number discontinuity was detected mid-fragment; the partial
97    /// access unit was dropped. The handler should request a keyframe.
98    OutOfOrder,
99    /// An unsupported NAL/aggregation type was encountered.
100    Unsupported(u8),
101}
102
103/// Depacketizes RFC 3640 AAC-hbr RTP payloads into raw AAC access units.
104///
105/// The common RTSP/SDP profile for AAC (`mode=AAC-hbr`, `sizelength=13`,
106/// `indexlength=3`) frames each payload as a 2-byte **AU-headers-length** (in
107/// bits), followed by one 2-byte AU-header per access unit (13-bit size +
108/// 3-bit index), followed by the access units concatenated. One RTP packet may
109/// carry several AAC frames; [`push`](Self::push) returns each as a separate
110/// raw (ADTS-less) [`bytes::Bytes`].
111#[derive(Debug, Clone, Copy, Default)]
112pub struct AacDepacketizer {
113    /// Bits per AU-header `size` field (13 for AAC-hbr).
114    size_length: u8,
115    /// Bits per AU-header `index`/`index-delta` field (3 for AAC-hbr).
116    index_length: u8,
117}
118
119impl AacDepacketizer {
120    /// A depacketizer for the standard AAC-hbr profile (`sizelength=13`,
121    /// `indexlength=3`).
122    pub fn new() -> Self {
123        Self {
124            size_length: 13,
125            index_length: 3,
126        }
127    }
128
129    /// A depacketizer with explicit AU-header field widths from the SDP `fmtp`.
130    pub fn with_lengths(size_length: u8, index_length: u8) -> Self {
131        Self {
132            size_length,
133            index_length,
134        }
135    }
136
137    /// Split one RTP AAC-hbr payload into its constituent access units.
138    pub fn push(&self, payload: &[u8]) -> Result<Vec<Bytes>, DepacketizeError> {
139        if payload.len() < 2 {
140            return Err(DepacketizeError::Truncated);
141        }
142        // Sizes wider than a 16-bit AU-header field are unsupported (and would
143        // otherwise over-shift below). `with_lengths` can supply arbitrary widths.
144        if self.size_length == 0 || self.size_length > 16 {
145            return Err(DepacketizeError::Unsupported(self.size_length));
146        }
147        let header_bits = u16::from_be_bytes([payload[0], payload[1]]) as usize;
148        let au_header_bits = self.size_length as usize + self.index_length as usize;
149        if au_header_bits == 0 {
150            return Err(DepacketizeError::Unsupported(0));
151        }
152        let header_bytes = header_bits.div_ceil(8);
153        let au_count = header_bits / au_header_bits;
154        let headers = payload
155            .get(2..2 + header_bytes)
156            .ok_or(DepacketizeError::Truncated)?;
157        let mut data_off = 2 + header_bytes;
158        let mut out = Vec::with_capacity(au_count);
159        for i in 0..au_count {
160            // Each AU-header is `au_header_bits` wide; for AAC-hbr that is 16
161            // bits, so the size is the top `size_length` bits of a 2-byte field.
162            let bit = i * au_header_bits;
163            let byte = bit / 8;
164            let hdr = headers
165                .get(byte..byte + 2)
166                .ok_or(DepacketizeError::Truncated)?;
167            let size = (u16::from_be_bytes([hdr[0], hdr[1]]) >> (16 - self.size_length)) as usize;
168            let end = data_off + size;
169            let au = payload
170                .get(data_off..end)
171                .ok_or(DepacketizeError::Truncated)?;
172            out.push(Bytes::copy_from_slice(au));
173            data_off = end;
174        }
175        Ok(out)
176    }
177}
178
179/// Packetizes H.264 Annex-B access units into RFC 6184 RTP packets — the inverse
180/// of [`H264Depacketizer`], used for WebRTC/WHEP egress.
181///
182/// Each NAL unit that fits the MTU is sent as a single-NAL packet; larger NALs
183/// are split into FU-A fragments. The RTP marker bit is set on the last packet
184/// of each access unit so the receiver knows the frame is complete.
185#[derive(Debug, Clone)]
186pub struct RtpPacketizer {
187    payload_type: u8,
188    ssrc: u32,
189    sequence: u16,
190    /// Maximum RTP payload size (excluding the 12-byte header).
191    max_payload: usize,
192}
193
194impl RtpPacketizer {
195    /// A packetizer for `payload_type`/`ssrc`. `mtu` is the maximum UDP payload
196    /// (1200 is the WebRTC-safe default); the 12-byte RTP header is subtracted.
197    pub fn new(payload_type: u8, ssrc: u32, mtu: usize) -> Self {
198        Self {
199            payload_type,
200            ssrc,
201            sequence: 0,
202            max_payload: mtu.saturating_sub(12).max(1),
203        }
204    }
205
206    /// Build the 12-byte RTP header for the next packet and advance the sequence.
207    fn header(&mut self, marker: bool, timestamp: u32, out: &mut Vec<u8>) {
208        out.push(0x80); // V=2, no padding/extension/CSRC
209        out.push(if marker { 0x80 } else { 0 } | (self.payload_type & 0x7F));
210        out.extend_from_slice(&self.sequence.to_be_bytes());
211        out.extend_from_slice(&timestamp.to_be_bytes());
212        out.extend_from_slice(&self.ssrc.to_be_bytes());
213        self.sequence = self.sequence.wrapping_add(1);
214    }
215
216    /// Packetize one Annex-B access unit at `timestamp` (90 kHz) into RTP packets.
217    pub fn packetize(&mut self, access_unit: &[u8], timestamp: u32) -> Vec<Vec<u8>> {
218        // Collect NAL units (without start codes) in order.
219        let nals: Vec<&[u8]> = crate::codec::h264::iter_nals_annexb(access_unit)
220            .filter(|n| !n.is_empty())
221            .collect();
222        let mut packets = Vec::new();
223        for (i, nal) in nals.iter().enumerate() {
224            let last_nal = i + 1 == nals.len();
225            if nal.len() <= self.max_payload {
226                // Single NAL unit packet; marker on the final NAL of the AU.
227                let mut pkt = Vec::with_capacity(12 + nal.len());
228                self.header(last_nal, timestamp, &mut pkt);
229                pkt.extend_from_slice(nal);
230                packets.push(pkt);
231            } else {
232                self.fragment_fua(nal, timestamp, last_nal, &mut packets);
233            }
234        }
235        packets
236    }
237
238    /// Split one oversized NAL into FU-A fragments (RFC 6184 §5.8).
239    fn fragment_fua(&mut self, nal: &[u8], timestamp: u32, last_nal: bool, out: &mut Vec<Vec<u8>>) {
240        let nal_header = nal[0];
241        let fu_indicator = (nal_header & 0xE0) | 28; // F|NRI from NAL, type 28
242        let nal_type = nal_header & 0x1F;
243        let body = &nal[1..];
244        // Each fragment carries a 2-byte FU header (indicator + FU header).
245        let chunk = self.max_payload.saturating_sub(2).max(1);
246        let n_chunks = body.len().div_ceil(chunk);
247        for (idx, part) in body.chunks(chunk).enumerate() {
248            let start = idx == 0;
249            let end = idx + 1 == n_chunks;
250            let mut fu_header = nal_type;
251            if start {
252                fu_header |= 0x80;
253            }
254            if end {
255                fu_header |= 0x40;
256            }
257            let mut pkt = Vec::with_capacity(12 + 2 + part.len());
258            // Marker only on the very last fragment of the final NAL of the AU.
259            self.header(last_nal && end, timestamp, &mut pkt);
260            pkt.push(fu_indicator);
261            pkt.push(fu_header);
262            pkt.extend_from_slice(part);
263            out.push(pkt);
264        }
265    }
266}
267
268/// Reassembles RFC 6184 H.264 RTP payloads into Annex-B access units.
269///
270/// Feed each packet's payload (the bytes after [`RtpHeader::payload_offset`])
271/// with its marker bit and timestamp to [`push`](Self::push). When a complete
272/// access unit is ready the method returns `Ok(Some(au))`, where `au` is the
273/// concatenated NAL units each prefixed with a 4-byte Annex-B start code —
274/// exactly the shape the codec parsers and `annexb_to_avcc` expect.
275#[derive(Debug, Default)]
276pub struct H264Depacketizer {
277    /// Bytes accumulated for the current access unit (Annex-B framed).
278    au: Vec<u8>,
279    /// FU-A reassembly buffer for the NAL currently being defragmented.
280    fua: Vec<u8>,
281    /// `true` while an FU-A fragment is in progress (between Start and End bits).
282    in_fragment: bool,
283    /// Reconstructed NAL header byte for the in-progress FU-A NAL.
284    fua_header: u8,
285    /// Timestamp of the access unit currently being assembled.
286    current_ts: Option<u32>,
287    /// Last sequence number seen (for gap detection during fragmentation).
288    last_seq: Option<u16>,
289}
290
291impl H264Depacketizer {
292    /// A fresh depacketizer with no in-progress access unit.
293    pub fn new() -> Self {
294        Self::default()
295    }
296
297    /// Append one NAL unit (Annex-B framed) to the current access unit.
298    fn append_nal(&mut self, nal: &[u8]) {
299        self.au.extend_from_slice(&ANNEXB_START);
300        self.au.extend_from_slice(nal);
301    }
302
303    /// Whether the pending access unit holds an IDR (type 5) NAL — a keyframe.
304    fn pending_is_keyframe(&self) -> bool {
305        // Scan the assembled Annex-B for a NAL header with type 5.
306        let mut i = 0;
307        while i + 4 < self.au.len() {
308            if self.au[i..i + 4] == ANNEXB_START {
309                let nal_type = self.au[i + 4] & 0x1F;
310                if nal_type == 5 {
311                    return true;
312                }
313            }
314            i += 1;
315        }
316        false
317    }
318
319    /// Emit and reset the pending access unit, if any.
320    fn take_au(&mut self) -> Option<AccessUnit> {
321        if self.au.is_empty() {
322            return None;
323        }
324        let keyframe = self.pending_is_keyframe();
325        let timestamp = self.current_ts.unwrap_or(0);
326        let data = Bytes::from(std::mem::take(&mut self.au));
327        self.current_ts = None;
328        Some(AccessUnit {
329            data,
330            timestamp,
331            keyframe,
332        })
333    }
334
335    /// Push one RTP H.264 payload. Returns a completed [`AccessUnit`] when the
336    /// marker bit closes the frame (or the timestamp advances to a new one).
337    pub fn push(
338        &mut self,
339        payload: &[u8],
340        marker: bool,
341        timestamp: u32,
342        sequence: u16,
343    ) -> Result<Option<AccessUnit>, DepacketizeError> {
344        if payload.is_empty() {
345            return Err(DepacketizeError::Truncated);
346        }
347
348        // A timestamp change flushes the previous access unit before starting the
349        // new one (some encoders omit the marker bit).
350        let mut completed = None;
351        if let Some(ts) = self.current_ts {
352            if ts != timestamp && !self.in_fragment {
353                completed = self.take_au();
354            }
355        }
356        self.current_ts = Some(timestamp);
357
358        let nal_type = payload[0] & 0x1F;
359        match nal_type {
360            1..=23 => {
361                // Single NAL unit packet — the payload *is* the NAL.
362                self.append_nal(payload);
363            }
364            24 => {
365                // STAP-A: one byte type, then [u16 size][nal]… aggregates.
366                let mut i = 1;
367                while i + 2 <= payload.len() {
368                    let size = u16::from_be_bytes([payload[i], payload[i + 1]]) as usize;
369                    i += 2;
370                    if i + size > payload.len() {
371                        return Err(DepacketizeError::Truncated);
372                    }
373                    self.append_nal(&payload[i..i + size]);
374                    i += size;
375                }
376            }
377            28 => {
378                // FU-A: byte0 = FU indicator, byte1 = FU header (S|E|R|type).
379                if payload.len() < 2 {
380                    return Err(DepacketizeError::Truncated);
381                }
382                let fu_header = payload[1];
383                let start = fu_header & 0x80 != 0;
384                let end = fu_header & 0x40 != 0;
385                let frag_type = fu_header & 0x1F;
386
387                if start {
388                    // Reconstruct the original NAL header: F|NRI from the indicator,
389                    // type from the FU header.
390                    self.fua_header = (payload[0] & 0xE0) | frag_type;
391                    self.fua.clear();
392                    self.fua.push(self.fua_header);
393                    self.in_fragment = true;
394                } else if !self.in_fragment {
395                    // Mid/last fragment with no start — lost the head.
396                    return Err(DepacketizeError::OutOfOrder);
397                } else if self.seq_gap(sequence) {
398                    self.in_fragment = false;
399                    self.fua.clear();
400                    return Err(DepacketizeError::OutOfOrder);
401                }
402                self.fua.extend_from_slice(&payload[2..]);
403
404                if end && self.in_fragment {
405                    let nal = std::mem::take(&mut self.fua);
406                    self.append_nal(&nal);
407                    self.in_fragment = false;
408                }
409            }
410            other => return Err(DepacketizeError::Unsupported(other)),
411        }
412
413        self.last_seq = Some(sequence);
414
415        if completed.is_some() {
416            return Ok(completed);
417        }
418        if marker {
419            return Ok(self.take_au());
420        }
421        Ok(None)
422    }
423
424    /// Detect a one-step sequence-number gap relative to the previous packet.
425    fn seq_gap(&self, sequence: u16) -> bool {
426        match self.last_seq {
427            Some(prev) => sequence.wrapping_sub(prev) != 1,
428            None => false,
429        }
430    }
431}
432
433/// A reassembled H.264 access unit in Annex-B form.
434#[derive(Debug, Clone, PartialEq, Eq)]
435pub struct AccessUnit {
436    /// Concatenated NAL units, each prefixed with a 4-byte start code.
437    pub data: Bytes,
438    /// RTP media timestamp (90 kHz) of the access unit.
439    pub timestamp: u32,
440    /// Whether the access unit contains an IDR (keyframe) NAL.
441    pub keyframe: bool,
442}
443
444#[cfg(test)]
445mod tests {
446    use super::*;
447
448    /// Build a minimal 12-byte RTP packet with the given fields and payload.
449    fn rtp(seq: u16, ts: u32, marker: bool, payload: &[u8]) -> Vec<u8> {
450        let mut p = vec![0x80, if marker { 0x80 | 96 } else { 96 }];
451        p.extend_from_slice(&seq.to_be_bytes());
452        p.extend_from_slice(&ts.to_be_bytes());
453        p.extend_from_slice(&[0, 0, 0, 1]); // ssrc
454        p.extend_from_slice(payload);
455        p
456    }
457
458    #[test]
459    fn parses_fixed_header_and_payload_offset() {
460        let pkt = rtp(7, 9000, true, &[0x65, 0xAA]);
461        let h = RtpHeader::parse(&pkt).unwrap();
462        assert_eq!(h.sequence, 7);
463        assert_eq!(h.timestamp, 9000);
464        assert!(h.marker);
465        assert_eq!(h.payload_type, 96);
466        assert_eq!(h.payload_offset, 12);
467        assert_eq!(&pkt[h.payload_offset..], &[0x65, 0xAA]);
468    }
469
470    #[test]
471    fn rejects_wrong_version_and_short_buffers() {
472        assert!(RtpHeader::parse(&[0x00; 12]).is_none()); // version 0
473        assert!(RtpHeader::parse(&[0x80; 4]).is_none()); // too short
474    }
475
476    #[test]
477    fn honors_csrc_count_in_payload_offset() {
478        let mut pkt = rtp(1, 0, false, &[0x41]);
479        pkt[0] = 0x82; // version 2, CSRC count = 2
480        let mut with_csrc = pkt[..12].to_vec();
481        with_csrc.extend_from_slice(&[0xDE, 0xAD, 0xBE, 0xEF, 0, 0, 0, 0]); // 2 CSRCs
482        with_csrc.push(0x41);
483        let h = RtpHeader::parse(&with_csrc).unwrap();
484        assert_eq!(h.payload_offset, 20);
485    }
486
487    #[test]
488    fn aac_hbr_splits_two_access_units() {
489        // AU-headers-length = 32 bits → two 16-bit AU-headers.
490        // AU sizes 3 and 2 (top 13 bits of each 2-byte header).
491        let mut p = Vec::new();
492        p.extend_from_slice(&32u16.to_be_bytes()); // header bits
493        p.extend_from_slice(&((3u16) << 3).to_be_bytes()); // AU-header: size 3
494        p.extend_from_slice(&((2u16) << 3).to_be_bytes()); // AU-header: size 2
495        p.extend_from_slice(&[0xA1, 0xA2, 0xA3]); // AU 1
496        p.extend_from_slice(&[0xB1, 0xB2]); // AU 2
497        let aus = AacDepacketizer::new().push(&p).unwrap();
498        assert_eq!(aus.len(), 2);
499        assert_eq!(&aus[0][..], &[0xA1, 0xA2, 0xA3]);
500        assert_eq!(&aus[1][..], &[0xB1, 0xB2]);
501    }
502
503    #[test]
504    fn aac_hbr_single_au() {
505        let mut p = Vec::new();
506        p.extend_from_slice(&16u16.to_be_bytes()); // one 16-bit AU-header
507        p.extend_from_slice(&((4u16) << 3).to_be_bytes()); // size 4
508        p.extend_from_slice(&[1, 2, 3, 4]);
509        let aus = AacDepacketizer::new().push(&p).unwrap();
510        assert_eq!(aus.len(), 1);
511        assert_eq!(&aus[0][..], &[1, 2, 3, 4]);
512    }
513
514    #[test]
515    fn aac_truncated_payload_errors() {
516        assert_eq!(
517            AacDepacketizer::new().push(&[0x00]),
518            Err(DepacketizeError::Truncated)
519        );
520        // Declares one AU of size 8 but supplies only 2 data bytes.
521        let mut p = 16u16.to_be_bytes().to_vec();
522        p.extend_from_slice(&((8u16) << 3).to_be_bytes());
523        p.extend_from_slice(&[1, 2]);
524        assert_eq!(
525            AacDepacketizer::new().push(&p),
526            Err(DepacketizeError::Truncated)
527        );
528    }
529
530    #[test]
531    fn single_nal_packet_emits_annexb_on_marker() {
532        let mut d = H264Depacketizer::new();
533        // Type 1 (non-IDR slice), marker set → one access unit.
534        let out = d.push(&[0x41, 0x9A, 0xBC], true, 3000, 1).unwrap().unwrap();
535        assert_eq!(&out.data[..], &[0, 0, 0, 1, 0x41, 0x9A, 0xBC]);
536        assert!(!out.keyframe);
537        assert_eq!(out.timestamp, 3000);
538    }
539
540    #[test]
541    fn idr_single_nal_is_flagged_keyframe() {
542        let mut d = H264Depacketizer::new();
543        let out = d.push(&[0x65, 0x01], true, 0, 1).unwrap().unwrap();
544        assert!(out.keyframe);
545    }
546
547    #[test]
548    fn packetizer_single_nal_round_trips_through_depacketizer() {
549        // A small AU (two NALs) → single-NAL packets → reassembled identically.
550        let au = [0, 0, 0, 1, 0x67, 0x42, 0x00, 0, 0, 0, 1, 0x65, 0x88, 0x99];
551        let mut pkt = RtpPacketizer::new(96, 0xABCD, 1200);
552        let packets = pkt.packetize(&au, 3000);
553        assert_eq!(packets.len(), 2, "one packet per NAL");
554
555        let mut depack = H264Depacketizer::new();
556        let mut out = None;
557        for p in &packets {
558            let h = RtpHeader::parse(p).unwrap();
559            if let Some(au) = depack
560                .push(&p[h.payload_offset..], h.marker, h.timestamp, h.sequence)
561                .unwrap()
562            {
563                out = Some(au);
564            }
565        }
566        let out = out.expect("AU completed on the marker packet");
567        assert_eq!(&out.data[..], &au);
568        assert!(out.keyframe);
569        assert_eq!(out.timestamp, 3000);
570    }
571
572    #[test]
573    fn packetizer_fragments_oversized_nal_and_round_trips() {
574        // One NAL larger than the MTU → FU-A fragments → reassembled identically.
575        let mut nal = vec![0, 0, 0, 1, 0x65]; // start code + IDR NAL header
576        nal.extend((0..600u16).map(|i| i as u8)); // long payload
577        let mut pkt = RtpPacketizer::new(96, 1, 100); // tiny MTU forces FU-A
578        let packets = pkt.packetize(&nal, 90);
579        assert!(packets.len() > 1, "oversized NAL is fragmented");
580        // Only the last packet carries the marker bit.
581        let markers: Vec<bool> = packets
582            .iter()
583            .map(|p| RtpHeader::parse(p).unwrap().marker)
584            .collect();
585        assert_eq!(markers.iter().filter(|m| **m).count(), 1);
586        assert!(markers.last().unwrap());
587
588        let mut depack = H264Depacketizer::new();
589        let mut out = None;
590        for p in &packets {
591            let h = RtpHeader::parse(p).unwrap();
592            if let Some(au) = depack
593                .push(&p[h.payload_offset..], h.marker, h.timestamp, h.sequence)
594                .unwrap()
595            {
596                out = Some(au);
597            }
598        }
599        assert_eq!(&out.unwrap().data[..], &nal[..]);
600    }
601
602    #[test]
603    fn stap_a_splits_aggregated_nals() {
604        // STAP-A (24): [24][size=2][AA BB][size=3][CC DD EE]
605        let payload = [24, 0, 2, 0xAA, 0xBB, 0, 3, 0xCC, 0xDD, 0xEE];
606        let mut d = H264Depacketizer::new();
607        let out = d.push(&payload, true, 0, 1).unwrap().unwrap();
608        assert_eq!(
609            &out.data[..],
610            &[0, 0, 0, 1, 0xAA, 0xBB, 0, 0, 0, 1, 0xCC, 0xDD, 0xEE]
611        );
612    }
613
614    #[test]
615    fn fu_a_reassembles_fragmented_nal() {
616        let mut d = H264Depacketizer::new();
617        // FU indicator 0x7C (F=0,NRI=3,type=28), FU header start 0x85 (S=1,type=5).
618        assert!(d
619            .push(&[0x7C, 0x85, 0x11, 0x22], false, 0, 1)
620            .unwrap()
621            .is_none());
622        // Middle fragment (S=0,E=0).
623        assert!(d.push(&[0x7C, 0x05, 0x33], false, 0, 2).unwrap().is_none());
624        // End fragment (E=1), marker closes the AU.
625        let out = d.push(&[0x7C, 0x45, 0x44], true, 0, 3).unwrap().unwrap();
626        // Reconstructed NAL header: NRI 0x60 | type 5 = 0x65, then payload bytes.
627        assert_eq!(&out.data[..], &[0, 0, 0, 1, 0x65, 0x11, 0x22, 0x33, 0x44]);
628        assert!(out.keyframe);
629    }
630
631    #[test]
632    fn fu_a_sequence_gap_reports_out_of_order() {
633        let mut d = H264Depacketizer::new();
634        d.push(&[0x7C, 0x85, 0x11], false, 0, 1).unwrap();
635        // Jump from seq 1 to seq 5 mid-fragment.
636        assert_eq!(
637            d.push(&[0x7C, 0x05, 0x22], false, 0, 5),
638            Err(DepacketizeError::OutOfOrder)
639        );
640    }
641
642    #[test]
643    fn timestamp_change_flushes_previous_au_without_marker() {
644        let mut d = H264Depacketizer::new();
645        // First AU, no marker.
646        assert!(d.push(&[0x41, 0x01], false, 1000, 1).unwrap().is_none());
647        // New timestamp flushes the first AU.
648        let out = d.push(&[0x41, 0x02], false, 2000, 2).unwrap().unwrap();
649        assert_eq!(out.timestamp, 1000);
650        assert_eq!(&out.data[..], &[0, 0, 0, 1, 0x41, 0x01]);
651    }
652}