Skip to main content

arcly_stream/protocol/
rtp.rs

1//! Shared RTP/RTCP parsing and NAL-codec (de)packetization — RFC 3550 framing
2//! plus H.264 (RFC 6184) and H.265 (RFC 7798) payload formats.
3//!
4//! Gated behind the internal `_rtp` marker, pulled in by both [`rtsp`] and
5//! [`webrtc`]. The two transports differ only in how RTP packets reach the
6//! process (TCP-interleaved / UDP for RTSP, DTLS-SRTP for WebRTC); once a packet
7//! is in hand, reassembling NAL units into an Annex-B access unit is identical,
8//! so it lives here once.
9//!
10//! [`rtsp`]: crate::protocol::rtsp
11//! [`webrtc`]: crate::protocol::webrtc
12//!
13//! # What it does
14//!
15//! - [`RtpHeader::parse`] decodes the fixed RTP header (RFC 3550 §5.1), honoring
16//!   the CSRC count and the extension-header flag to locate the payload.
17//! - [`H264Depacketizer`] / [`H265Depacketizer`] turn a sequence of RTP payloads
18//!   into complete access units in Annex-B form. Each handles the three NALU
19//!   packetization modes for its codec — single NAL, aggregation (STAP-A type 24
20//!   / AP type 48), and fragmentation (FU-A type 28 / FU type 49) — emitting an
21//!   access unit when the marker bit is set or the timestamp advances.
22//! - [`RtpPacketizer`] performs the reverse for egress (e.g. WebRTC WHEP),
23//!   selecting the H.264 or H.265 payload format.
24//!
25//! # What it does not do
26//!
27//! Jitter-buffer reordering and loss concealment are the caller's concern — the
28//! depacketizer assumes in-order delivery (true for TCP-interleaved RTSP; for
29//! UDP/SRTP a small reorder buffer should sit in front of it). It reports a
30//! [`DepacketizeError::OutOfOrder`] gap so a handler can request a keyframe
31//! (PLI/FIR) rather than emit a corrupt access unit.
32
33use bytes::Bytes;
34
35/// Annex-B start code prefixed to every reassembled NAL unit.
36const ANNEXB_START: [u8; 4] = [0, 0, 0, 1];
37
38/// A parsed RTP fixed header (RFC 3550 §5.1).
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub struct RtpHeader {
41    /// Payload type (7 bits) — identifies the codec/format binding from SDP.
42    pub payload_type: u8,
43    /// Marker bit. For H.264 it flags the last packet of an access unit.
44    pub marker: bool,
45    /// 16-bit sequence number, increments by one per packet (wraps).
46    pub sequence: u16,
47    /// 32-bit media timestamp in the payload's clock (90 kHz for H.264 video).
48    pub timestamp: u32,
49    /// Synchronization source identifier.
50    pub ssrc: u32,
51    /// Byte offset at which the payload begins (past CSRCs and any extension).
52    pub payload_offset: usize,
53}
54
55impl RtpHeader {
56    /// Parse the fixed header from the front of `buf`, returning the header and
57    /// the payload offset. Returns `None` if `buf` is too short or the version
58    /// field is not 2.
59    pub fn parse(buf: &[u8]) -> Option<RtpHeader> {
60        use super::byteops::ByteReader;
61        let mut r = ByteReader::new(buf);
62        let b0 = r.u8()?;
63        if b0 >> 6 != 2 {
64            return None; // RTP version must be 2
65        }
66        let has_extension = b0 & 0x10 != 0;
67        let csrc_count = (b0 & 0x0F) as usize;
68        let b1 = r.u8()?;
69        let marker = b1 & 0x80 != 0;
70        let payload_type = b1 & 0x7F;
71        let sequence = r.u16_be()?;
72        let timestamp = r.u32_be()?;
73        let ssrc = r.u32_be()?;
74        r.skip(csrc_count * 4)?; // CSRC list
75
76        if has_extension {
77            // Extension header: 2-byte profile id, 2-byte length (in 32-bit words).
78            r.skip(2)?;
79            let ext_words = r.u16_be()? as usize;
80            r.skip(ext_words * 4)?;
81        }
82        Some(RtpHeader {
83            payload_type,
84            marker,
85            sequence,
86            timestamp,
87            ssrc,
88            payload_offset: r.position(),
89        })
90    }
91}
92
93/// Errors surfaced while depacketizing an RTP stream.
94#[derive(Debug, Clone, Copy, PartialEq, Eq)]
95#[non_exhaustive]
96pub enum DepacketizeError {
97    /// The packet was shorter than the format requires.
98    Truncated,
99    /// A sequence-number discontinuity was detected mid-fragment; the partial
100    /// access unit was dropped. The handler should request a keyframe.
101    OutOfOrder,
102    /// An unsupported NAL/aggregation type was encountered.
103    Unsupported(u8),
104}
105
106/// Depacketizes RFC 3640 AAC-hbr RTP payloads into raw AAC access units.
107///
108/// The common RTSP/SDP profile for AAC (`mode=AAC-hbr`, `sizelength=13`,
109/// `indexlength=3`) frames each payload as a 2-byte **AU-headers-length** (in
110/// bits), followed by one 2-byte AU-header per access unit (13-bit size +
111/// 3-bit index), followed by the access units concatenated. One RTP packet may
112/// carry several AAC frames; [`push`](Self::push) returns each as a separate
113/// raw (ADTS-less) [`bytes::Bytes`].
114#[derive(Debug, Clone, Copy, Default)]
115pub struct AacDepacketizer {
116    /// Bits per AU-header `size` field (13 for AAC-hbr).
117    size_length: u8,
118    /// Bits per AU-header `index`/`index-delta` field (3 for AAC-hbr).
119    index_length: u8,
120}
121
122impl AacDepacketizer {
123    /// A depacketizer for the standard AAC-hbr profile (`sizelength=13`,
124    /// `indexlength=3`).
125    pub fn new() -> Self {
126        Self {
127            size_length: 13,
128            index_length: 3,
129        }
130    }
131
132    /// A depacketizer with explicit AU-header field widths from the SDP `fmtp`.
133    pub fn with_lengths(size_length: u8, index_length: u8) -> Self {
134        Self {
135            size_length,
136            index_length,
137        }
138    }
139
140    /// Split one RTP AAC-hbr payload into its constituent access units.
141    pub fn push(&self, payload: &[u8]) -> Result<Vec<Bytes>, DepacketizeError> {
142        if payload.len() < 2 {
143            return Err(DepacketizeError::Truncated);
144        }
145        // Sizes wider than a 16-bit AU-header field are unsupported (and would
146        // otherwise over-shift below). `with_lengths` can supply arbitrary widths.
147        if self.size_length == 0 || self.size_length > 16 {
148            return Err(DepacketizeError::Unsupported(self.size_length));
149        }
150        let header_bits = u16::from_be_bytes([payload[0], payload[1]]) as usize;
151        let au_header_bits = self.size_length as usize + self.index_length as usize;
152        if au_header_bits == 0 {
153            return Err(DepacketizeError::Unsupported(0));
154        }
155        let header_bytes = header_bits.div_ceil(8);
156        let au_count = header_bits / au_header_bits;
157        let headers = payload
158            .get(2..2 + header_bytes)
159            .ok_or(DepacketizeError::Truncated)?;
160        let mut data_off = 2 + header_bytes;
161        let mut out = Vec::with_capacity(au_count);
162        for i in 0..au_count {
163            // Each AU-header is `au_header_bits` wide; for AAC-hbr that is 16
164            // bits, so the size is the top `size_length` bits of a 2-byte field.
165            let bit = i * au_header_bits;
166            let byte = bit / 8;
167            let hdr = headers
168                .get(byte..byte + 2)
169                .ok_or(DepacketizeError::Truncated)?;
170            let size = (u16::from_be_bytes([hdr[0], hdr[1]]) >> (16 - self.size_length)) as usize;
171            let end = data_off + size;
172            let au = payload
173                .get(data_off..end)
174                .ok_or(DepacketizeError::Truncated)?;
175            out.push(Bytes::copy_from_slice(au));
176            data_off = end;
177        }
178        Ok(out)
179    }
180}
181
182/// Packetizes H.264 Annex-B access units into RFC 6184 RTP packets — the inverse
183/// of [`H264Depacketizer`], used for WebRTC/WHEP egress.
184///
185/// Each NAL unit that fits the MTU is sent as a single-NAL packet; larger NALs
186/// are split into FU-A fragments. The RTP marker bit is set on the last packet
187/// of each access unit so the receiver knows the frame is complete.
188#[derive(Debug, Clone)]
189pub struct RtpPacketizer {
190    payload_type: u8,
191    ssrc: u32,
192    sequence: u16,
193    /// Maximum RTP payload size (excluding the 12-byte header).
194    max_payload: usize,
195    /// Which NAL-based RTP payload format to emit (H.264 RFC 6184 vs H.265
196    /// RFC 7798 — they differ in NAL-header width and FU framing).
197    codec: NalCodec,
198}
199
200/// The NAL-based codecs [`RtpPacketizer`] / the depacketizers understand. Both
201/// are Annex-B start-code framed; they differ in NAL-header width (1 vs 2 bytes)
202/// and fragmentation-unit layout.
203#[derive(Debug, Clone, Copy, PartialEq, Eq)]
204enum NalCodec {
205    H264,
206    H265,
207}
208
209impl RtpPacketizer {
210    /// An H.264 packetizer for `payload_type`/`ssrc`. `mtu` is the maximum UDP
211    /// payload (1200 is the WebRTC-safe default); the 12-byte RTP header is
212    /// subtracted.
213    pub fn new(payload_type: u8, ssrc: u32, mtu: usize) -> Self {
214        Self::with_codec(payload_type, ssrc, mtu, NalCodec::H264)
215    }
216
217    /// An H.265 (HEVC) packetizer, emitting the RFC 7798 payload format
218    /// (2-byte NAL header, FU type 49). Counterpart to [`new`](Self::new).
219    pub fn new_h265(payload_type: u8, ssrc: u32, mtu: usize) -> Self {
220        Self::with_codec(payload_type, ssrc, mtu, NalCodec::H265)
221    }
222
223    fn with_codec(payload_type: u8, ssrc: u32, mtu: usize, codec: NalCodec) -> Self {
224        Self {
225            payload_type,
226            ssrc,
227            sequence: 0,
228            max_payload: mtu.saturating_sub(12).max(1),
229            codec,
230        }
231    }
232
233    /// Emit one RTP packet: recycle a buffer from `recycle` (reusing its heap
234    /// allocation), write the 12-byte header, let `fill` append the payload, and
235    /// push it onto `out`. Advances the sequence number.
236    fn emit(
237        &mut self,
238        recycle: &mut Vec<Vec<u8>>,
239        out: &mut Vec<Vec<u8>>,
240        marker: bool,
241        timestamp: u32,
242        fill: impl FnOnce(&mut Vec<u8>),
243    ) {
244        let mut buf = recycle.pop().unwrap_or_default();
245        buf.clear();
246        write_rtp_header(
247            &mut buf,
248            self.payload_type,
249            marker,
250            self.sequence,
251            timestamp,
252            self.ssrc,
253        );
254        self.sequence = self.sequence.wrapping_add(1);
255        fill(&mut buf);
256        out.push(buf);
257    }
258
259    /// Packetize one Annex-B access unit at `timestamp` (90 kHz) into RTP packets.
260    ///
261    /// Each NAL that fits the MTU is sent as a single-NAL packet; larger NALs are
262    /// fragmented (FU-A for H.264, FU for H.265). The marker bit is set on the
263    /// last packet of the access unit.
264    ///
265    /// Allocating variant; prefer [`packetize_into`](Self::packetize_into) on the
266    /// egress hot path to recycle packet buffers across frames.
267    pub fn packetize(&mut self, access_unit: &[u8], timestamp: u32) -> Vec<Vec<u8>> {
268        let mut out = Vec::new();
269        self.packetize_into(access_unit, timestamp, &mut out);
270        out
271    }
272
273    /// Packetize into a caller-owned buffer, recycling the `Vec<u8>` allocations
274    /// it already holds from a previous frame.
275    ///
276    /// The hot-path contract: pass the *same* `out` every frame. On entry the
277    /// previously-produced (already-sent) packet buffers are reclaimed as a free
278    /// pool and refilled in place, so steady-state egress performs no per-packet
279    /// heap allocation.
280    pub fn packetize_into(&mut self, access_unit: &[u8], timestamp: u32, out: &mut Vec<Vec<u8>>) {
281        // Reclaim last frame's buffers (now consumed by the caller) as a pool.
282        let mut recycle = std::mem::take(out);
283        // Both codecs are Annex-B start-code framed; the shared scanner yields the
284        // NAL units (without start codes) in order.
285        let nals: Vec<&[u8]> = crate::codec::nal::iter_nals(access_unit)
286            .filter(|n| !n.is_empty())
287            .collect();
288        for (i, nal) in nals.iter().enumerate() {
289            let last_nal = i + 1 == nals.len();
290            if nal.len() <= self.max_payload {
291                // Single NAL unit packet; marker on the final NAL of the AU.
292                self.emit(&mut recycle, out, last_nal, timestamp, |b| {
293                    b.extend_from_slice(nal)
294                });
295            } else {
296                match self.codec {
297                    NalCodec::H264 => {
298                        self.fragment_fua(nal, timestamp, last_nal, &mut recycle, out)
299                    }
300                    NalCodec::H265 => {
301                        self.fragment_fu_h265(nal, timestamp, last_nal, &mut recycle, out)
302                    }
303                }
304            }
305        }
306    }
307
308    /// Split one oversized NAL into FU-A fragments (RFC 6184 §5.8).
309    fn fragment_fua(
310        &mut self,
311        nal: &[u8],
312        timestamp: u32,
313        last_nal: bool,
314        recycle: &mut Vec<Vec<u8>>,
315        out: &mut Vec<Vec<u8>>,
316    ) {
317        let nal_header = nal[0];
318        let fu_indicator = (nal_header & 0xE0) | 28; // F|NRI from NAL, type 28
319        let nal_type = nal_header & 0x1F;
320        let body = &nal[1..];
321        // Each fragment carries a 2-byte FU header (indicator + FU header).
322        let chunk = self.max_payload.saturating_sub(2).max(1);
323        let n_chunks = body.len().div_ceil(chunk);
324        for (idx, part) in body.chunks(chunk).enumerate() {
325            let start = idx == 0;
326            let end = idx + 1 == n_chunks;
327            let mut fu_header = nal_type;
328            if start {
329                fu_header |= 0x80;
330            }
331            if end {
332                fu_header |= 0x40;
333            }
334            // Marker only on the very last fragment of the final NAL of the AU.
335            self.emit(recycle, out, last_nal && end, timestamp, |pkt| {
336                pkt.push(fu_indicator);
337                pkt.push(fu_header);
338                pkt.extend_from_slice(part);
339            });
340        }
341    }
342
343    /// Split one oversized H.265 NAL into FU fragments (RFC 7798 §4.4.3).
344    ///
345    /// The 2-byte NAL header becomes a PayloadHdr with its type field set to 49
346    /// (the F/LayerId/TID bits are preserved); each fragment then carries a
347    /// 1-byte FU header (`S | E | FuType`) where `FuType` is the original type.
348    fn fragment_fu_h265(
349        &mut self,
350        nal: &[u8],
351        timestamp: u32,
352        last_nal: bool,
353        recycle: &mut Vec<Vec<u8>>,
354        out: &mut Vec<Vec<u8>>,
355    ) {
356        // A well-formed H.265 NAL has a 2-byte header; anything shorter can't be
357        // fragmented meaningfully, so emit it as a single packet.
358        if nal.len() < 2 {
359            self.emit(recycle, out, last_nal, timestamp, |pkt| {
360                pkt.extend_from_slice(nal)
361            });
362            return;
363        }
364        let nal_type = (nal[0] >> 1) & 0x3F;
365        // PayloadHdr: keep F (bit 15) + LayerId + TID, overwrite the 6-bit type
366        // field with 49 (FU). Type occupies bits 9..14, i.e. (byte0 >> 1) & 0x3F.
367        let payload_hdr0 = (nal[0] & 0x81) | (49 << 1);
368        let payload_hdr1 = nal[1];
369        let body = &nal[2..];
370        // Each fragment carries the 2-byte PayloadHdr + 1-byte FU header.
371        let chunk = self.max_payload.saturating_sub(3).max(1);
372        let n_chunks = body.len().div_ceil(chunk);
373        for (idx, part) in body.chunks(chunk).enumerate() {
374            let start = idx == 0;
375            let end = idx + 1 == n_chunks;
376            let mut fu_header = nal_type;
377            if start {
378                fu_header |= 0x80;
379            }
380            if end {
381                fu_header |= 0x40;
382            }
383            self.emit(recycle, out, last_nal && end, timestamp, |pkt| {
384                pkt.push(payload_hdr0);
385                pkt.push(payload_hdr1);
386                pkt.push(fu_header);
387                pkt.extend_from_slice(part);
388            });
389        }
390    }
391}
392
393/// Reassembles RFC 6184 H.264 RTP payloads into Annex-B access units.
394///
395/// Feed each packet's payload (the bytes after [`RtpHeader::payload_offset`])
396/// with its marker bit and timestamp to [`push`](Self::push). When a complete
397/// access unit is ready the method returns `Ok(Some(au))`, where `au` is the
398/// concatenated NAL units each prefixed with a 4-byte Annex-B start code —
399/// exactly the shape the codec parsers and `annexb_to_avcc` expect.
400#[derive(Debug, Default)]
401pub struct H264Depacketizer {
402    /// Bytes accumulated for the current access unit (Annex-B framed).
403    au: Vec<u8>,
404    /// FU-A reassembly buffer for the NAL currently being defragmented.
405    fua: Vec<u8>,
406    /// `true` while an FU-A fragment is in progress (between Start and End bits).
407    in_fragment: bool,
408    /// Reconstructed NAL header byte for the in-progress FU-A NAL.
409    fua_header: u8,
410    /// Timestamp of the access unit currently being assembled.
411    current_ts: Option<u32>,
412    /// Last sequence number seen (for gap detection during fragmentation).
413    last_seq: Option<u16>,
414}
415
416impl H264Depacketizer {
417    /// A fresh depacketizer with no in-progress access unit.
418    pub fn new() -> Self {
419        Self::default()
420    }
421
422    /// Append one NAL unit (Annex-B framed) to the current access unit.
423    fn append_nal(&mut self, nal: &[u8]) {
424        self.au.extend_from_slice(&ANNEXB_START);
425        self.au.extend_from_slice(nal);
426    }
427
428    /// Whether the pending access unit holds an IDR (type 5) NAL — a keyframe.
429    fn pending_is_keyframe(&self) -> bool {
430        // Scan the assembled Annex-B for a NAL header with type 5.
431        let mut i = 0;
432        while i + 4 < self.au.len() {
433            if self.au[i..i + 4] == ANNEXB_START {
434                let nal_type = self.au[i + 4] & 0x1F;
435                if nal_type == 5 {
436                    return true;
437                }
438            }
439            i += 1;
440        }
441        false
442    }
443
444    /// Emit and reset the pending access unit, if any.
445    fn take_au(&mut self) -> Option<AccessUnit> {
446        if self.au.is_empty() {
447            return None;
448        }
449        let keyframe = self.pending_is_keyframe();
450        let timestamp = self.current_ts.unwrap_or(0);
451        let data = Bytes::from(std::mem::take(&mut self.au));
452        self.current_ts = None;
453        Some(AccessUnit {
454            data,
455            timestamp,
456            keyframe,
457        })
458    }
459
460    /// Push one RTP H.264 payload. Returns a completed [`AccessUnit`] when the
461    /// marker bit closes the frame (or the timestamp advances to a new one).
462    pub fn push(
463        &mut self,
464        payload: &[u8],
465        marker: bool,
466        timestamp: u32,
467        sequence: u16,
468    ) -> Result<Option<AccessUnit>, DepacketizeError> {
469        if payload.is_empty() {
470            return Err(DepacketizeError::Truncated);
471        }
472
473        // A timestamp change flushes the previous access unit before starting the
474        // new one (some encoders omit the marker bit).
475        let mut completed = None;
476        if let Some(ts) = self.current_ts {
477            if ts != timestamp && !self.in_fragment {
478                completed = self.take_au();
479            }
480        }
481        self.current_ts = Some(timestamp);
482
483        let nal_type = payload[0] & 0x1F;
484        match nal_type {
485            1..=23 => {
486                // Single NAL unit packet — the payload *is* the NAL.
487                self.append_nal(payload);
488            }
489            24 => {
490                // STAP-A: one byte type, then [u16 size][nal]… aggregates.
491                let mut i = 1;
492                while i + 2 <= payload.len() {
493                    let size = u16::from_be_bytes([payload[i], payload[i + 1]]) as usize;
494                    i += 2;
495                    if i + size > payload.len() {
496                        return Err(DepacketizeError::Truncated);
497                    }
498                    self.append_nal(&payload[i..i + size]);
499                    i += size;
500                }
501            }
502            28 => {
503                // FU-A: byte0 = FU indicator, byte1 = FU header (S|E|R|type).
504                if payload.len() < 2 {
505                    return Err(DepacketizeError::Truncated);
506                }
507                let fu_header = payload[1];
508                let start = fu_header & 0x80 != 0;
509                let end = fu_header & 0x40 != 0;
510                let frag_type = fu_header & 0x1F;
511
512                if start {
513                    // Reconstruct the original NAL header: F|NRI from the indicator,
514                    // type from the FU header.
515                    self.fua_header = (payload[0] & 0xE0) | frag_type;
516                    self.fua.clear();
517                    self.fua.push(self.fua_header);
518                    self.in_fragment = true;
519                } else if !self.in_fragment {
520                    // Mid/last fragment with no start — lost the head.
521                    return Err(DepacketizeError::OutOfOrder);
522                } else if self.seq_gap(sequence) {
523                    self.in_fragment = false;
524                    self.fua.clear();
525                    return Err(DepacketizeError::OutOfOrder);
526                }
527                self.fua.extend_from_slice(&payload[2..]);
528
529                if end && self.in_fragment {
530                    let nal = std::mem::take(&mut self.fua);
531                    self.append_nal(&nal);
532                    self.in_fragment = false;
533                }
534            }
535            other => return Err(DepacketizeError::Unsupported(other)),
536        }
537
538        self.last_seq = Some(sequence);
539
540        if completed.is_some() {
541            return Ok(completed);
542        }
543        if marker {
544            return Ok(self.take_au());
545        }
546        Ok(None)
547    }
548
549    /// Detect a one-step sequence-number gap relative to the previous packet.
550    fn seq_gap(&self, sequence: u16) -> bool {
551        match self.last_seq {
552            Some(prev) => sequence.wrapping_sub(prev) != 1,
553            None => false,
554        }
555    }
556}
557
558/// Reassembles RFC 7798 H.265 (HEVC) RTP payloads into Annex-B access units.
559///
560/// The H.265 counterpart to [`H264Depacketizer`], used for ingesting HEVC IP
561/// cameras and encoders over RTSP/WebRTC. It handles the three packetization
562/// modes: single NAL units, aggregation packets (AP, type 48), and fragmentation
563/// units (FU, type 49). The output shape — NAL units each prefixed with a 4-byte
564/// Annex-B start code — matches [`H264Depacketizer`] and the codec parsers.
565///
566/// DONL/DOND fields (only present when `sprop-max-don-diff > 0` is negotiated in
567/// SDP) are not consumed; the common single-stream profile does not use them.
568#[derive(Debug, Default)]
569pub struct H265Depacketizer {
570    /// Bytes accumulated for the current access unit (Annex-B framed).
571    au: Vec<u8>,
572    /// FU reassembly buffer for the NAL currently being defragmented.
573    fu: Vec<u8>,
574    /// `true` while an FU is in progress (between Start and End bits).
575    in_fragment: bool,
576    /// Timestamp of the access unit currently being assembled.
577    current_ts: Option<u32>,
578    /// Last sequence number seen (for gap detection during fragmentation).
579    last_seq: Option<u16>,
580}
581
582impl H265Depacketizer {
583    /// A fresh depacketizer with no in-progress access unit.
584    pub fn new() -> Self {
585        Self::default()
586    }
587
588    /// Append one NAL unit (Annex-B framed) to the current access unit.
589    fn append_nal(&mut self, nal: &[u8]) {
590        self.au.extend_from_slice(&ANNEXB_START);
591        self.au.extend_from_slice(nal);
592    }
593
594    /// Whether the pending access unit holds an IRAP (BLA/IDR/CRA, types 16–23)
595    /// VCL NAL — i.e. a random-access point / keyframe.
596    fn pending_is_keyframe(&self) -> bool {
597        let mut i = 0;
598        while i + 4 < self.au.len() {
599            if self.au[i..i + 4] == ANNEXB_START {
600                let nal_type = (self.au[i + 4] >> 1) & 0x3F;
601                if (16..=23).contains(&nal_type) {
602                    return true;
603                }
604            }
605            i += 1;
606        }
607        false
608    }
609
610    /// Emit and reset the pending access unit, if any.
611    fn take_au(&mut self) -> Option<AccessUnit> {
612        if self.au.is_empty() {
613            return None;
614        }
615        let keyframe = self.pending_is_keyframe();
616        let timestamp = self.current_ts.unwrap_or(0);
617        let data = Bytes::from(std::mem::take(&mut self.au));
618        self.current_ts = None;
619        Some(AccessUnit {
620            data,
621            timestamp,
622            keyframe,
623        })
624    }
625
626    /// Detect a one-step sequence-number gap relative to the previous packet.
627    fn seq_gap(&self, sequence: u16) -> bool {
628        match self.last_seq {
629            Some(prev) => sequence.wrapping_sub(prev) != 1,
630            None => false,
631        }
632    }
633
634    /// Push one RTP H.265 payload. Returns a completed [`AccessUnit`] when the
635    /// marker bit closes the frame (or the timestamp advances to a new one).
636    pub fn push(
637        &mut self,
638        payload: &[u8],
639        marker: bool,
640        timestamp: u32,
641        sequence: u16,
642    ) -> Result<Option<AccessUnit>, DepacketizeError> {
643        // The H.265 NAL header is two bytes; a single byte cannot carry a type.
644        if payload.len() < 2 {
645            return Err(DepacketizeError::Truncated);
646        }
647
648        // A timestamp change flushes the previous access unit (some encoders omit
649        // the marker bit).
650        let mut completed = None;
651        if let Some(ts) = self.current_ts {
652            if ts != timestamp && !self.in_fragment {
653                completed = self.take_au();
654            }
655        }
656        self.current_ts = Some(timestamp);
657
658        let nal_type = (payload[0] >> 1) & 0x3F;
659        match nal_type {
660            // Single NAL unit packet — the payload *is* the NAL (header included).
661            0..=47 => self.append_nal(payload),
662            48 => {
663                // AP: 2-byte header, then [u16 size][nal]… aggregates.
664                let mut i = 2;
665                while i + 2 <= payload.len() {
666                    let size = u16::from_be_bytes([payload[i], payload[i + 1]]) as usize;
667                    i += 2;
668                    if i + size > payload.len() {
669                        return Err(DepacketizeError::Truncated);
670                    }
671                    self.append_nal(&payload[i..i + size]);
672                    i += size;
673                }
674            }
675            49 => {
676                // FU: 2-byte PayloadHdr, 1-byte FU header (S|E|FuType), then body.
677                if payload.len() < 3 {
678                    return Err(DepacketizeError::Truncated);
679                }
680                let fu_header = payload[2];
681                let start = fu_header & 0x80 != 0;
682                let end = fu_header & 0x40 != 0;
683                let fu_type = fu_header & 0x3F;
684
685                if start {
686                    // Reconstruct the original 2-byte NAL header: restore the type
687                    // field (bits 9..14) from FuType, keep F/LayerId/TID.
688                    let hdr0 = (payload[0] & 0x81) | (fu_type << 1);
689                    let hdr1 = payload[1];
690                    self.fu.clear();
691                    self.fu.push(hdr0);
692                    self.fu.push(hdr1);
693                    self.in_fragment = true;
694                } else if !self.in_fragment {
695                    return Err(DepacketizeError::OutOfOrder);
696                } else if self.seq_gap(sequence) {
697                    self.in_fragment = false;
698                    self.fu.clear();
699                    return Err(DepacketizeError::OutOfOrder);
700                }
701                self.fu.extend_from_slice(&payload[3..]);
702
703                if end && self.in_fragment {
704                    let nal = std::mem::take(&mut self.fu);
705                    self.append_nal(&nal);
706                    self.in_fragment = false;
707                }
708            }
709            other => return Err(DepacketizeError::Unsupported(other)),
710        }
711
712        self.last_seq = Some(sequence);
713
714        if completed.is_some() {
715            return Ok(completed);
716        }
717        if marker {
718            return Ok(self.take_au());
719        }
720        Ok(None)
721    }
722}
723
724/// A reassembled coded video frame from the RTP bus.
725///
726/// For the NAL codecs (H.264/H.265) `data` is the access unit in Annex-B form
727/// (each NAL prefixed with a 4-byte start code); for VP9/AV1 it is the raw coded
728/// frame / temporal unit. `keyframe` marks a decodable random-access point.
729#[derive(Debug, Clone, PartialEq, Eq)]
730pub struct AccessUnit {
731    /// The coded frame bytes (Annex-B NALs for H.26x; raw frame for VP9/AV1).
732    pub data: Bytes,
733    /// RTP media timestamp (90 kHz) of the frame.
734    pub timestamp: u32,
735    /// Whether this is a keyframe / random-access point.
736    pub keyframe: bool,
737}
738
739/// Write the 12-byte RTP fixed header (V=2, no padding/extension/CSRC) for one
740/// packet. Shared by every packetizer in this module.
741fn write_rtp_header(out: &mut Vec<u8>, pt: u8, marker: bool, seq: u16, ts: u32, ssrc: u32) {
742    out.push(0x80); // V=2, P=0, X=0, CC=0
743    out.push(if marker { 0x80 } else { 0 } | (pt & 0x7F));
744    out.extend_from_slice(&seq.to_be_bytes());
745    out.extend_from_slice(&ts.to_be_bytes());
746    out.extend_from_slice(&ssrc.to_be_bytes());
747}
748
749// ── Opus (RFC 7587) ──────────────────────────────────────────────────────────
750
751/// Packetizes Opus audio into RTP: each Opus packet is carried verbatim as the
752/// payload of one RTP packet (RFC 7587 — Opus is self-delimiting, so there is no
753/// payload descriptor). The 48 kHz media clock means the caller passes a
754/// timestamp already scaled to 48 kHz.
755#[derive(Debug, Clone)]
756pub struct OpusPacketizer {
757    payload_type: u8,
758    ssrc: u32,
759    sequence: u16,
760}
761
762impl OpusPacketizer {
763    /// An Opus packetizer for `payload_type`/`ssrc`.
764    pub fn new(payload_type: u8, ssrc: u32) -> Self {
765        Self {
766            payload_type,
767            ssrc,
768            sequence: 0,
769        }
770    }
771
772    /// Packetize one Opus frame at `timestamp` (48 kHz) into a single RTP packet,
773    /// appended to `out` (recycling a buffer it already holds). The RTP marker is
774    /// left clear — continuous audio is not a talkspurt boundary.
775    pub fn packetize_into(&mut self, opus: &[u8], timestamp: u32, out: &mut Vec<Vec<u8>>) {
776        let mut recycle = std::mem::take(out);
777        let mut pkt = recycle.pop().unwrap_or_default();
778        pkt.clear();
779        write_rtp_header(
780            &mut pkt,
781            self.payload_type,
782            false,
783            self.sequence,
784            timestamp,
785            self.ssrc,
786        );
787        self.sequence = self.sequence.wrapping_add(1);
788        pkt.extend_from_slice(opus);
789        out.push(pkt);
790    }
791}
792
793// ── VP9 (draft-ietf-payload-vp9) ─────────────────────────────────────────────
794
795/// Packetizes VP9 coded frames into RTP, using a flexible-mode-off payload
796/// descriptor for the common single-layer (non-scalable) case.
797///
798/// Each frame is carried verbatim after a VP9 payload descriptor (the bytes are
799/// not transformed — VP9 RTP carries the frame opaquely), split across the MTU
800/// with the B (begin) bit on the first packet and E (end) + RTP marker on the
801/// last. A 15-bit picture ID increments per frame. Spatial/temporal scalability
802/// and flexible mode are out of scope.
803#[derive(Debug, Clone)]
804pub struct Vp9Packetizer {
805    payload_type: u8,
806    ssrc: u32,
807    sequence: u16,
808    max_payload: usize,
809    picture_id: u16,
810}
811
812impl Vp9Packetizer {
813    /// A VP9 packetizer for `payload_type`/`ssrc`. `mtu` is the maximum UDP
814    /// payload; the 12-byte RTP header and a 3-byte descriptor are subtracted.
815    pub fn new(payload_type: u8, ssrc: u32, mtu: usize) -> Self {
816        Self {
817            payload_type,
818            ssrc,
819            sequence: 0,
820            // 12-byte RTP header + up to 3-byte descriptor (1 flags + 2 picture id).
821            max_payload: mtu.saturating_sub(12 + 3).max(1),
822            picture_id: 0,
823        }
824    }
825
826    /// Packetize one VP9 frame at `timestamp` (90 kHz). `keyframe` clears the P
827    /// (inter-predicted) bit so receivers can identify random-access points.
828    pub fn packetize(&mut self, frame: &[u8], timestamp: u32, keyframe: bool) -> Vec<Vec<u8>> {
829        let mut out = Vec::new();
830        self.packetize_into(frame, timestamp, keyframe, &mut out);
831        out
832    }
833
834    /// Recycling variant of [`packetize`](Self::packetize): pass the same `out`
835    /// every frame to reuse the packet-buffer allocations across frames.
836    pub fn packetize_into(
837        &mut self,
838        frame: &[u8],
839        timestamp: u32,
840        keyframe: bool,
841        out: &mut Vec<Vec<u8>>,
842    ) {
843        let pid = self.picture_id & 0x7FFF;
844        self.picture_id = self.picture_id.wrapping_add(1);
845
846        let mut recycle = std::mem::take(out);
847        let chunks: Vec<&[u8]> = if frame.is_empty() {
848            vec![&[]]
849        } else {
850            frame.chunks(self.max_payload).collect()
851        };
852        let n = chunks.len();
853        for (i, chunk) in chunks.into_iter().enumerate() {
854            let begin = i == 0;
855            let end = i + 1 == n;
856            let mut pkt = recycle.pop().unwrap_or_default();
857            pkt.clear();
858            write_rtp_header(
859                &mut pkt,
860                self.payload_type,
861                end,
862                self.sequence,
863                timestamp,
864                self.ssrc,
865            );
866            self.sequence = self.sequence.wrapping_add(1);
867
868            // Descriptor octet: I=1, P=!keyframe, L=0, F=0, B, E, V=0, Z=0.
869            let mut desc0 = 0x80; // I = 1 (picture ID present)
870            if !keyframe {
871                desc0 |= 0x40; // P (inter-predicted)
872            }
873            if begin {
874                desc0 |= 0x08; // B (start of frame)
875            }
876            if end {
877                desc0 |= 0x04; // E (end of frame)
878            }
879            pkt.push(desc0);
880            // 15-bit picture ID (M=1): 0x80|hi, lo.
881            pkt.push(0x80 | (pid >> 8) as u8);
882            pkt.push((pid & 0xFF) as u8);
883            pkt.extend_from_slice(chunk);
884            out.push(pkt);
885        }
886    }
887}
888
889/// Reassembles VP9 RTP payloads (draft-ietf-payload-vp9, non-flexible single
890/// layer) into coded frames. Counterpart to [`Vp9Packetizer`].
891#[derive(Debug, Default)]
892pub struct Vp9Depacketizer {
893    frame: Vec<u8>,
894    in_frame: bool,
895    keyframe: bool,
896    current_ts: Option<u32>,
897}
898
899impl Vp9Depacketizer {
900    /// A fresh depacketizer with no in-progress frame.
901    pub fn new() -> Self {
902        Self::default()
903    }
904
905    /// Push one VP9 RTP payload. Returns a completed frame when the E (end) bit
906    /// and RTP marker close it.
907    pub fn push(
908        &mut self,
909        payload: &[u8],
910        marker: bool,
911        timestamp: u32,
912    ) -> Result<Option<AccessUnit>, DepacketizeError> {
913        if payload.is_empty() {
914            return Err(DepacketizeError::Truncated);
915        }
916        let desc0 = payload[0];
917        let has_pid = desc0 & 0x80 != 0;
918        let has_layer = desc0 & 0x20 != 0;
919        let flexible = desc0 & 0x10 != 0;
920        let begin = desc0 & 0x08 != 0;
921        let end = desc0 & 0x04 != 0;
922        let predicted = desc0 & 0x40 != 0;
923
924        // Walk past the variable-length descriptor fields we recognize.
925        let mut off = 1;
926        if has_pid {
927            // M bit selects a 1- or 2-byte picture ID.
928            let m = payload.get(off).ok_or(DepacketizeError::Truncated)? & 0x80 != 0;
929            off += if m { 2 } else { 1 };
930        }
931        if has_layer {
932            off += 1; // TID/U/SID/D byte
933            if !flexible {
934                off += 1; // TL0PICIDX (non-flexible mode)
935            }
936        }
937        if off > payload.len() {
938            return Err(DepacketizeError::Truncated);
939        }
940
941        if begin {
942            self.frame.clear();
943            self.in_frame = true;
944            self.keyframe = !predicted;
945            self.current_ts = Some(timestamp);
946        } else if !self.in_frame {
947            return Err(DepacketizeError::OutOfOrder);
948        }
949        self.frame.extend_from_slice(&payload[off..]);
950
951        if end && marker && self.in_frame {
952            self.in_frame = false;
953            return Ok(Some(AccessUnit {
954                data: Bytes::from(std::mem::take(&mut self.frame)),
955                timestamp: self.current_ts.unwrap_or(timestamp),
956                keyframe: self.keyframe,
957            }));
958        }
959        Ok(None)
960    }
961}
962
963// ── AV1 (AOMedia "RTP Payload Format For AV1") ───────────────────────────────
964
965/// Encode `v` as unsigned LEB128 into `out`.
966#[cfg(feature = "codec-av1")]
967fn leb128_encode(mut v: u64, out: &mut Vec<u8>) {
968    loop {
969        let mut byte = (v & 0x7F) as u8;
970        v >>= 7;
971        if v != 0 {
972            byte |= 0x80;
973        }
974        out.push(byte);
975        if v == 0 {
976            break;
977        }
978    }
979}
980
981#[cfg(feature = "codec-av1")]
982const AV1_OBU_SEQUENCE_HEADER: u8 = 1;
983#[cfg(feature = "codec-av1")]
984const AV1_OBU_TEMPORAL_DELIMITER: u8 = 2;
985
986/// Packetizes an AV1 temporal unit into RTP using the AOMedia payload format.
987///
988/// Each non-temporal-delimiter OBU is re-framed as a length-delimited *OBU
989/// element* (the W=0 form, with the OBU's `obu_has_size_field` cleared), the
990/// elements are concatenated, and the resulting stream is split across the MTU
991/// with a one-byte aggregation header per packet (Z/Y continuation bits, N on a
992/// new coded video sequence). The temporal delimiter is dropped per the spec;
993/// frame boundaries are conveyed by the RTP marker. Scalability structures are
994/// not emitted.
995#[cfg(feature = "codec-av1")]
996#[derive(Debug, Clone)]
997pub struct Av1Packetizer {
998    payload_type: u8,
999    ssrc: u32,
1000    sequence: u16,
1001    max_payload: usize,
1002}
1003
1004#[cfg(feature = "codec-av1")]
1005impl Av1Packetizer {
1006    /// An AV1 packetizer for `payload_type`/`ssrc`. `mtu` is the maximum UDP
1007    /// payload; the 12-byte RTP header and 1-byte aggregation header are removed.
1008    pub fn new(payload_type: u8, ssrc: u32, mtu: usize) -> Self {
1009        Self {
1010            payload_type,
1011            ssrc,
1012            sequence: 0,
1013            max_payload: mtu.saturating_sub(12 + 1).max(1),
1014        }
1015    }
1016
1017    /// Packetize one AV1 temporal unit (low-overhead OBUs) at `timestamp`.
1018    pub fn packetize(&mut self, temporal_unit: &[u8], timestamp: u32) -> Vec<Vec<u8>> {
1019        let mut out = Vec::new();
1020        self.packetize_into(temporal_unit, timestamp, &mut out);
1021        out
1022    }
1023
1024    /// Recycling variant of [`packetize`](Self::packetize): pass the same `out`
1025    /// every temporal unit to reuse the packet-buffer allocations.
1026    pub fn packetize_into(&mut self, temporal_unit: &[u8], timestamp: u32, out: &mut Vec<Vec<u8>>) {
1027        // Re-frame each OBU (minus the temporal delimiter) as a length-delimited
1028        // OBU element with obu_has_size_field cleared.
1029        let mut stream = Vec::with_capacity(temporal_unit.len());
1030        let mut new_cvs = false;
1031        for obu in crate::codec::obu::iter_obus(temporal_unit) {
1032            if obu.obu_type == AV1_OBU_TEMPORAL_DELIMITER {
1033                continue;
1034            }
1035            if obu.obu_type == AV1_OBU_SEQUENCE_HEADER {
1036                new_cvs = true;
1037            }
1038            let header_len = 1 + obu.has_extension as usize;
1039            let mut element = Vec::with_capacity(header_len + obu.payload.len());
1040            element.push(obu.raw[0] & !0x02); // clear obu_has_size_field
1041            if obu.has_extension {
1042                element.push(obu.raw[1]);
1043            }
1044            element.extend_from_slice(obu.payload);
1045            leb128_encode(element.len() as u64, &mut stream);
1046            stream.extend_from_slice(&element);
1047        }
1048
1049        let mut recycle = std::mem::take(out);
1050        let chunks: Vec<&[u8]> = if stream.is_empty() {
1051            vec![&[]]
1052        } else {
1053            stream.chunks(self.max_payload).collect()
1054        };
1055        let n = chunks.len();
1056        for (i, chunk) in chunks.into_iter().enumerate() {
1057            let last = i + 1 == n;
1058            let mut pkt = recycle.pop().unwrap_or_default();
1059            pkt.clear();
1060            write_rtp_header(
1061                &mut pkt,
1062                self.payload_type,
1063                last,
1064                self.sequence,
1065                timestamp,
1066                self.ssrc,
1067            );
1068            self.sequence = self.sequence.wrapping_add(1);
1069
1070            // Aggregation header: Z (continues previous packet) | Y (continues in
1071            // next) | W=0 (length-delimited elements) | N (new coded video seq).
1072            let mut agg = 0u8;
1073            if i > 0 {
1074                agg |= 0x80; // Z
1075            }
1076            if !last {
1077                agg |= 0x40; // Y
1078            }
1079            if i == 0 && new_cvs {
1080                agg |= 0x08; // N
1081            }
1082            pkt.push(agg);
1083            pkt.extend_from_slice(chunk);
1084            out.push(pkt);
1085        }
1086    }
1087}
1088
1089/// Reassembles AV1 RTP payloads into temporal units. Counterpart to
1090/// [`Av1Packetizer`]: it concatenates each packet's OBU-element bytes (past the
1091/// aggregation header) and, on the RTP marker, parses the length-delimited
1092/// elements back into low-overhead OBUs (re-adding each `obu_has_size_field`).
1093#[cfg(feature = "codec-av1")]
1094#[derive(Debug, Default)]
1095pub struct Av1Depacketizer {
1096    stream: Vec<u8>,
1097    new_cvs: bool,
1098    current_ts: Option<u32>,
1099}
1100
1101#[cfg(feature = "codec-av1")]
1102impl Av1Depacketizer {
1103    /// A fresh depacketizer with no in-progress temporal unit.
1104    pub fn new() -> Self {
1105        Self::default()
1106    }
1107
1108    /// Push one AV1 RTP payload. Returns a completed temporal unit when the RTP
1109    /// marker closes it.
1110    pub fn push(
1111        &mut self,
1112        payload: &[u8],
1113        marker: bool,
1114        timestamp: u32,
1115    ) -> Result<Option<AccessUnit>, DepacketizeError> {
1116        if payload.is_empty() {
1117            return Err(DepacketizeError::Truncated);
1118        }
1119        let agg = payload[0];
1120        if agg & 0x08 != 0 {
1121            self.new_cvs = true; // N: new coded video sequence
1122        }
1123        if self.current_ts.is_none() {
1124            self.current_ts = Some(timestamp);
1125        }
1126        self.stream.extend_from_slice(&payload[1..]);
1127
1128        if !marker {
1129            return Ok(None);
1130        }
1131
1132        // Marker: rebuild the temporal unit from length-delimited OBU elements.
1133        let stream = std::mem::take(&mut self.stream);
1134        let mut tu = Vec::with_capacity(stream.len() + 8);
1135        let mut pos = 0;
1136        while pos < stream.len() {
1137            let len = leb128_decode(&stream, &mut pos).ok_or(DepacketizeError::Truncated)?;
1138            let end = pos.checked_add(len).ok_or(DepacketizeError::Truncated)?;
1139            let element = stream.get(pos..end).ok_or(DepacketizeError::Truncated)?;
1140            pos = end;
1141            // Element → low-overhead OBU: set obu_has_size_field, insert the size.
1142            let hdr0 = *element.first().ok_or(DepacketizeError::Truncated)?;
1143            let has_ext = (hdr0 >> 2) & 1 == 1;
1144            let header_len = 1 + has_ext as usize;
1145            let obu_payload = element
1146                .get(header_len..)
1147                .ok_or(DepacketizeError::Truncated)?;
1148            tu.push(hdr0 | 0x02);
1149            if has_ext {
1150                tu.push(element[1]);
1151            }
1152            leb128_encode(obu_payload.len() as u64, &mut tu);
1153            tu.extend_from_slice(obu_payload);
1154        }
1155
1156        let keyframe = std::mem::take(&mut self.new_cvs);
1157        let ts = self.current_ts.take().unwrap_or(timestamp);
1158        Ok(Some(AccessUnit {
1159            data: Bytes::from(tu),
1160            timestamp: ts,
1161            keyframe,
1162        }))
1163    }
1164}
1165
1166/// Decode an unsigned LEB128 integer at `*pos` (advancing it), returning `None`
1167/// on truncation or overflow. Mirrors the codec-side decoder for RTP carriage.
1168#[cfg(feature = "codec-av1")]
1169fn leb128_decode(data: &[u8], pos: &mut usize) -> Option<usize> {
1170    let mut value: u64 = 0;
1171    for i in 0..8 {
1172        let byte = *data.get(*pos)?;
1173        *pos += 1;
1174        value |= ((byte & 0x7F) as u64) << (i * 7);
1175        if byte & 0x80 == 0 {
1176            return usize::try_from(value).ok();
1177        }
1178    }
1179    None
1180}
1181
1182#[cfg(test)]
1183mod tests {
1184    use super::*;
1185
1186    /// Build a minimal 12-byte RTP packet with the given fields and payload.
1187    fn rtp(seq: u16, ts: u32, marker: bool, payload: &[u8]) -> Vec<u8> {
1188        let mut p = vec![0x80, if marker { 0x80 | 96 } else { 96 }];
1189        p.extend_from_slice(&seq.to_be_bytes());
1190        p.extend_from_slice(&ts.to_be_bytes());
1191        p.extend_from_slice(&[0, 0, 0, 1]); // ssrc
1192        p.extend_from_slice(payload);
1193        p
1194    }
1195
1196    #[test]
1197    fn parses_fixed_header_and_payload_offset() {
1198        let pkt = rtp(7, 9000, true, &[0x65, 0xAA]);
1199        let h = RtpHeader::parse(&pkt).unwrap();
1200        assert_eq!(h.sequence, 7);
1201        assert_eq!(h.timestamp, 9000);
1202        assert!(h.marker);
1203        assert_eq!(h.payload_type, 96);
1204        assert_eq!(h.payload_offset, 12);
1205        assert_eq!(&pkt[h.payload_offset..], &[0x65, 0xAA]);
1206    }
1207
1208    #[test]
1209    fn rejects_wrong_version_and_short_buffers() {
1210        assert!(RtpHeader::parse(&[0x00; 12]).is_none()); // version 0
1211        assert!(RtpHeader::parse(&[0x80; 4]).is_none()); // too short
1212    }
1213
1214    #[test]
1215    fn honors_csrc_count_in_payload_offset() {
1216        let mut pkt = rtp(1, 0, false, &[0x41]);
1217        pkt[0] = 0x82; // version 2, CSRC count = 2
1218        let mut with_csrc = pkt[..12].to_vec();
1219        with_csrc.extend_from_slice(&[0xDE, 0xAD, 0xBE, 0xEF, 0, 0, 0, 0]); // 2 CSRCs
1220        with_csrc.push(0x41);
1221        let h = RtpHeader::parse(&with_csrc).unwrap();
1222        assert_eq!(h.payload_offset, 20);
1223    }
1224
1225    #[test]
1226    fn aac_hbr_splits_two_access_units() {
1227        // AU-headers-length = 32 bits → two 16-bit AU-headers.
1228        // AU sizes 3 and 2 (top 13 bits of each 2-byte header).
1229        let mut p = Vec::new();
1230        p.extend_from_slice(&32u16.to_be_bytes()); // header bits
1231        p.extend_from_slice(&((3u16) << 3).to_be_bytes()); // AU-header: size 3
1232        p.extend_from_slice(&((2u16) << 3).to_be_bytes()); // AU-header: size 2
1233        p.extend_from_slice(&[0xA1, 0xA2, 0xA3]); // AU 1
1234        p.extend_from_slice(&[0xB1, 0xB2]); // AU 2
1235        let aus = AacDepacketizer::new().push(&p).unwrap();
1236        assert_eq!(aus.len(), 2);
1237        assert_eq!(&aus[0][..], &[0xA1, 0xA2, 0xA3]);
1238        assert_eq!(&aus[1][..], &[0xB1, 0xB2]);
1239    }
1240
1241    #[test]
1242    fn aac_hbr_single_au() {
1243        let mut p = Vec::new();
1244        p.extend_from_slice(&16u16.to_be_bytes()); // one 16-bit AU-header
1245        p.extend_from_slice(&((4u16) << 3).to_be_bytes()); // size 4
1246        p.extend_from_slice(&[1, 2, 3, 4]);
1247        let aus = AacDepacketizer::new().push(&p).unwrap();
1248        assert_eq!(aus.len(), 1);
1249        assert_eq!(&aus[0][..], &[1, 2, 3, 4]);
1250    }
1251
1252    #[test]
1253    fn aac_truncated_payload_errors() {
1254        assert_eq!(
1255            AacDepacketizer::new().push(&[0x00]),
1256            Err(DepacketizeError::Truncated)
1257        );
1258        // Declares one AU of size 8 but supplies only 2 data bytes.
1259        let mut p = 16u16.to_be_bytes().to_vec();
1260        p.extend_from_slice(&((8u16) << 3).to_be_bytes());
1261        p.extend_from_slice(&[1, 2]);
1262        assert_eq!(
1263            AacDepacketizer::new().push(&p),
1264            Err(DepacketizeError::Truncated)
1265        );
1266    }
1267
1268    #[test]
1269    fn single_nal_packet_emits_annexb_on_marker() {
1270        let mut d = H264Depacketizer::new();
1271        // Type 1 (non-IDR slice), marker set → one access unit.
1272        let out = d.push(&[0x41, 0x9A, 0xBC], true, 3000, 1).unwrap().unwrap();
1273        assert_eq!(&out.data[..], &[0, 0, 0, 1, 0x41, 0x9A, 0xBC]);
1274        assert!(!out.keyframe);
1275        assert_eq!(out.timestamp, 3000);
1276    }
1277
1278    #[test]
1279    fn idr_single_nal_is_flagged_keyframe() {
1280        let mut d = H264Depacketizer::new();
1281        let out = d.push(&[0x65, 0x01], true, 0, 1).unwrap().unwrap();
1282        assert!(out.keyframe);
1283    }
1284
1285    #[test]
1286    fn packetizer_single_nal_round_trips_through_depacketizer() {
1287        // A small AU (two NALs) → single-NAL packets → reassembled identically.
1288        let au = [0, 0, 0, 1, 0x67, 0x42, 0x00, 0, 0, 0, 1, 0x65, 0x88, 0x99];
1289        let mut pkt = RtpPacketizer::new(96, 0xABCD, 1200);
1290        let packets = pkt.packetize(&au, 3000);
1291        assert_eq!(packets.len(), 2, "one packet per NAL");
1292
1293        let mut depack = H264Depacketizer::new();
1294        let mut out = None;
1295        for p in &packets {
1296            let h = RtpHeader::parse(p).unwrap();
1297            if let Some(au) = depack
1298                .push(&p[h.payload_offset..], h.marker, h.timestamp, h.sequence)
1299                .unwrap()
1300            {
1301                out = Some(au);
1302            }
1303        }
1304        let out = out.expect("AU completed on the marker packet");
1305        assert_eq!(&out.data[..], &au);
1306        assert!(out.keyframe);
1307        assert_eq!(out.timestamp, 3000);
1308    }
1309
1310    #[test]
1311    fn packetize_into_recycles_buffers_without_changing_output() {
1312        // The recycling hot-path API must produce byte-identical packets to the
1313        // allocating `packetize`, frame after frame, including correct sequence
1314        // numbers carried across the reused buffer.
1315        let au1 = [0, 0, 0, 1, 0x67, 0x42, 0x00, 0, 0, 0, 1, 0x65, 0x88, 0x99];
1316        let au2 = [0, 0, 0, 1, 0x65, 0x11, 0x22, 0x33];
1317
1318        let mut a = RtpPacketizer::new(96, 0xABCD, 1200);
1319        let mut b = RtpPacketizer::new(96, 0xABCD, 1200);
1320        let mut reused: Vec<Vec<u8>> = Vec::new();
1321
1322        for au in [&au1[..], &au2[..], &au1[..]] {
1323            let expected = a.packetize(au, 3000);
1324            // Capture the backing pointers to prove buffers are reused, not freed.
1325            b.packetize_into(au, 3000, &mut reused);
1326            assert_eq!(
1327                reused, expected,
1328                "recycled output matches allocating output"
1329            );
1330        }
1331    }
1332
1333    #[test]
1334    fn packetizer_fragments_oversized_nal_and_round_trips() {
1335        // One NAL larger than the MTU → FU-A fragments → reassembled identically.
1336        let mut nal = vec![0, 0, 0, 1, 0x65]; // start code + IDR NAL header
1337        nal.extend((0..600u16).map(|i| i as u8)); // long payload
1338        let mut pkt = RtpPacketizer::new(96, 1, 100); // tiny MTU forces FU-A
1339        let packets = pkt.packetize(&nal, 90);
1340        assert!(packets.len() > 1, "oversized NAL is fragmented");
1341        // Only the last packet carries the marker bit.
1342        let markers: Vec<bool> = packets
1343            .iter()
1344            .map(|p| RtpHeader::parse(p).unwrap().marker)
1345            .collect();
1346        assert_eq!(markers.iter().filter(|m| **m).count(), 1);
1347        assert!(markers.last().unwrap());
1348
1349        let mut depack = H264Depacketizer::new();
1350        let mut out = None;
1351        for p in &packets {
1352            let h = RtpHeader::parse(p).unwrap();
1353            if let Some(au) = depack
1354                .push(&p[h.payload_offset..], h.marker, h.timestamp, h.sequence)
1355                .unwrap()
1356            {
1357                out = Some(au);
1358            }
1359        }
1360        assert_eq!(&out.unwrap().data[..], &nal[..]);
1361    }
1362
1363    #[test]
1364    fn stap_a_splits_aggregated_nals() {
1365        // STAP-A (24): [24][size=2][AA BB][size=3][CC DD EE]
1366        let payload = [24, 0, 2, 0xAA, 0xBB, 0, 3, 0xCC, 0xDD, 0xEE];
1367        let mut d = H264Depacketizer::new();
1368        let out = d.push(&payload, true, 0, 1).unwrap().unwrap();
1369        assert_eq!(
1370            &out.data[..],
1371            &[0, 0, 0, 1, 0xAA, 0xBB, 0, 0, 0, 1, 0xCC, 0xDD, 0xEE]
1372        );
1373    }
1374
1375    #[test]
1376    fn fu_a_reassembles_fragmented_nal() {
1377        let mut d = H264Depacketizer::new();
1378        // FU indicator 0x7C (F=0,NRI=3,type=28), FU header start 0x85 (S=1,type=5).
1379        assert!(d
1380            .push(&[0x7C, 0x85, 0x11, 0x22], false, 0, 1)
1381            .unwrap()
1382            .is_none());
1383        // Middle fragment (S=0,E=0).
1384        assert!(d.push(&[0x7C, 0x05, 0x33], false, 0, 2).unwrap().is_none());
1385        // End fragment (E=1), marker closes the AU.
1386        let out = d.push(&[0x7C, 0x45, 0x44], true, 0, 3).unwrap().unwrap();
1387        // Reconstructed NAL header: NRI 0x60 | type 5 = 0x65, then payload bytes.
1388        assert_eq!(&out.data[..], &[0, 0, 0, 1, 0x65, 0x11, 0x22, 0x33, 0x44]);
1389        assert!(out.keyframe);
1390    }
1391
1392    #[test]
1393    fn fu_a_sequence_gap_reports_out_of_order() {
1394        let mut d = H264Depacketizer::new();
1395        d.push(&[0x7C, 0x85, 0x11], false, 0, 1).unwrap();
1396        // Jump from seq 1 to seq 5 mid-fragment.
1397        assert_eq!(
1398            d.push(&[0x7C, 0x05, 0x22], false, 0, 5),
1399            Err(DepacketizeError::OutOfOrder)
1400        );
1401    }
1402
1403    #[test]
1404    fn timestamp_change_flushes_previous_au_without_marker() {
1405        let mut d = H264Depacketizer::new();
1406        // First AU, no marker.
1407        assert!(d.push(&[0x41, 0x01], false, 1000, 1).unwrap().is_none());
1408        // New timestamp flushes the first AU.
1409        let out = d.push(&[0x41, 0x02], false, 2000, 2).unwrap().unwrap();
1410        assert_eq!(out.timestamp, 1000);
1411        assert_eq!(&out.data[..], &[0, 0, 0, 1, 0x41, 0x01]);
1412    }
1413
1414    // ── H.265 (RFC 7798) ────────────────────────────────────────────────────
1415    // H.265 NAL headers are two bytes; type = (byte0 >> 1) & 0x3F. Examples used
1416    // below: VPS=32 (0x40,0x01), IDR_W_RADL=19 (0x26,0x01).
1417
1418    #[test]
1419    fn h265_single_nal_round_trips_through_depacketizer() {
1420        // VPS (non-VCL) + IDR (VCL keyframe), each a single-NAL packet.
1421        let au = [
1422            0, 0, 0, 1, 0x40, 0x01, 0xAA, // VPS (type 32)
1423            0, 0, 0, 1, 0x26, 0x01, 0x88, 0x99, // IDR (type 19)
1424        ];
1425        let mut pkt = RtpPacketizer::new_h265(96, 0xABCD, 1200);
1426        let packets = pkt.packetize(&au, 3000);
1427        assert_eq!(packets.len(), 2, "one packet per NAL");
1428
1429        let mut depack = H265Depacketizer::new();
1430        let mut out = None;
1431        for p in &packets {
1432            let h = RtpHeader::parse(p).unwrap();
1433            if let Some(au) = depack
1434                .push(&p[h.payload_offset..], h.marker, h.timestamp, h.sequence)
1435                .unwrap()
1436            {
1437                out = Some(au);
1438            }
1439        }
1440        let out = out.expect("AU completed on the marker packet");
1441        assert_eq!(&out.data[..], &au);
1442        assert!(out.keyframe, "IRAP type 19 is a keyframe");
1443        assert_eq!(out.timestamp, 3000);
1444    }
1445
1446    #[test]
1447    fn h265_fragments_oversized_nal_and_round_trips() {
1448        // One IDR NAL larger than the MTU → FU fragments → reassembled identically.
1449        let mut nal = vec![0, 0, 0, 1, 0x26, 0x01]; // start code + 2-byte IDR header
1450        nal.extend((0..600u16).map(|i| i as u8));
1451        let mut pkt = RtpPacketizer::new_h265(96, 1, 100); // tiny MTU forces FU
1452        let packets = pkt.packetize(&nal, 90);
1453        assert!(packets.len() > 1, "oversized NAL is fragmented");
1454        // Exactly one marker, on the last fragment.
1455        let markers: Vec<bool> = packets
1456            .iter()
1457            .map(|p| RtpHeader::parse(p).unwrap().marker)
1458            .collect();
1459        assert_eq!(markers.iter().filter(|m| **m).count(), 1);
1460        assert!(markers.last().unwrap());
1461        // Each FU packet carries a type-49 PayloadHdr.
1462        for p in &packets {
1463            let h = RtpHeader::parse(p).unwrap();
1464            let pt = (p[h.payload_offset] >> 1) & 0x3F;
1465            assert_eq!(pt, 49, "FU payload type");
1466        }
1467
1468        let mut depack = H265Depacketizer::new();
1469        let mut out = None;
1470        for p in &packets {
1471            let h = RtpHeader::parse(p).unwrap();
1472            if let Some(au) = depack
1473                .push(&p[h.payload_offset..], h.marker, h.timestamp, h.sequence)
1474                .unwrap()
1475            {
1476                out = Some(au);
1477            }
1478        }
1479        assert_eq!(&out.unwrap().data[..], &nal[..]);
1480    }
1481
1482    #[test]
1483    fn h265_ap_splits_aggregated_nals() {
1484        // AP (type 48): [0x60,0x01][size=2][AA BB][size=3][CC DD EE]
1485        let payload = [0x60, 0x01, 0, 2, 0xAA, 0xBB, 0, 3, 0xCC, 0xDD, 0xEE];
1486        let mut d = H265Depacketizer::new();
1487        let out = d.push(&payload, true, 0, 1).unwrap().unwrap();
1488        assert_eq!(
1489            &out.data[..],
1490            &[0, 0, 0, 1, 0xAA, 0xBB, 0, 0, 0, 1, 0xCC, 0xDD, 0xEE]
1491        );
1492    }
1493
1494    #[test]
1495    fn h265_rejects_truncated_and_unsupported() {
1496        let mut d = H265Depacketizer::new();
1497        // One byte cannot hold a 2-byte NAL header.
1498        assert_eq!(
1499            d.push(&[0x26], true, 0, 1),
1500            Err(DepacketizeError::Truncated)
1501        );
1502        // PACI (type 50) is not supported.
1503        assert_eq!(
1504            d.push(&[50 << 1, 0x01, 0x00], true, 0, 2),
1505            Err(DepacketizeError::Unsupported(50))
1506        );
1507    }
1508
1509    // ── VP9 ───────────────────────────────────────────────────────────────────
1510
1511    fn vp9_depacketize(packets: &[Vec<u8>]) -> Option<AccessUnit> {
1512        let mut d = Vp9Depacketizer::new();
1513        let mut out = None;
1514        for p in packets {
1515            let h = RtpHeader::parse(p).unwrap();
1516            if let Some(f) = d
1517                .push(&p[h.payload_offset..], h.marker, h.timestamp)
1518                .unwrap()
1519            {
1520                out = Some(f);
1521            }
1522        }
1523        out
1524    }
1525
1526    #[test]
1527    fn vp9_fragmented_frame_round_trips() {
1528        let frame: Vec<u8> = (0..500u16).map(|i| i as u8).collect();
1529        let mut pkt = Vp9Packetizer::new(98, 0x1234, 100); // small MTU → fragments
1530        let packets = pkt.packetize(&frame, 9000, true);
1531        assert!(packets.len() > 1, "frame fragmented");
1532
1533        // Exactly one marker, on the last packet.
1534        let markers: Vec<bool> = packets
1535            .iter()
1536            .map(|p| RtpHeader::parse(p).unwrap().marker)
1537            .collect();
1538        assert_eq!(markers.iter().filter(|m| **m).count(), 1);
1539        assert!(markers.last().unwrap());
1540
1541        let out = vp9_depacketize(&packets).expect("frame completed");
1542        assert_eq!(&out.data[..], &frame[..]);
1543        assert!(out.keyframe, "keyframe → P bit clear");
1544        assert_eq!(out.timestamp, 9000);
1545    }
1546
1547    #[test]
1548    fn vp9_inter_frame_is_not_a_keyframe() {
1549        let mut pkt = Vp9Packetizer::new(98, 1, 1200);
1550        let packets = pkt.packetize(&[1, 2, 3], 0, false);
1551        assert_eq!(packets.len(), 1);
1552        let out = vp9_depacketize(&packets).expect("frame");
1553        assert_eq!(&out.data[..], &[1, 2, 3]);
1554        assert!(!out.keyframe, "P bit set → inter frame");
1555    }
1556
1557    // ── AV1 ───────────────────────────────────────────────────────────────────
1558
1559    #[cfg(feature = "codec-av1")]
1560    fn av1_depacketize(packets: &[Vec<u8>]) -> Option<AccessUnit> {
1561        let mut d = Av1Depacketizer::new();
1562        let mut out = None;
1563        for p in packets {
1564            let h = RtpHeader::parse(p).unwrap();
1565            if let Some(f) = d
1566                .push(&p[h.payload_offset..], h.marker, h.timestamp)
1567                .unwrap()
1568            {
1569                out = Some(f);
1570            }
1571        }
1572        out
1573    }
1574
1575    #[cfg(feature = "codec-av1")]
1576    #[test]
1577    fn av1_temporal_unit_round_trips_without_delimiter() {
1578        // Low-overhead OBUs: temporal delimiter + sequence header + frame.
1579        let td = [0x12u8, 0x00];
1580        let seq = [0x0Au8, 0x02, 0xAA, 0xBB];
1581        let frame = [0x32u8, 0x03, 0x11, 0x22, 0x33];
1582        let mut tu = Vec::new();
1583        tu.extend_from_slice(&td);
1584        tu.extend_from_slice(&seq);
1585        tu.extend_from_slice(&frame);
1586
1587        let mut pkt = Av1Packetizer::new(99, 7, 1200);
1588        let packets = pkt.packetize(&tu, 1000);
1589        let out = av1_depacketize(&packets).expect("TU completed");
1590
1591        // The temporal delimiter is dropped; seq + frame survive, low-overhead.
1592        let mut expected = Vec::new();
1593        expected.extend_from_slice(&seq);
1594        expected.extend_from_slice(&frame);
1595        assert_eq!(&out.data[..], &expected[..]);
1596        assert!(out.keyframe, "sequence header → new coded video sequence");
1597        assert_eq!(out.timestamp, 1000);
1598    }
1599
1600    #[cfg(feature = "codec-av1")]
1601    #[test]
1602    fn av1_large_temporal_unit_fragments_and_round_trips() {
1603        // A frame OBU with a 300-byte payload (size field leb128(300) = AC 02).
1604        let mut frame = vec![0x32u8, 0xAC, 0x02];
1605        frame.extend((0..300u16).map(|i| i as u8));
1606        let mut tu = vec![0x12u8, 0x00]; // temporal delimiter
1607        tu.extend_from_slice(&frame);
1608
1609        let mut pkt = Av1Packetizer::new(99, 1, 64); // tiny MTU forces fragmentation
1610        let packets = pkt.packetize(&tu, 0);
1611        assert!(packets.len() > 1, "large TU fragmented");
1612        // Z set on every packet but the first; Y on every packet but the last.
1613        for (i, p) in packets.iter().enumerate() {
1614            let agg = p[RtpHeader::parse(p).unwrap().payload_offset];
1615            assert_eq!((agg & 0x80 != 0), i > 0, "Z continuation bit");
1616            assert_eq!(
1617                (agg & 0x40 != 0),
1618                i + 1 < packets.len(),
1619                "Y continuation bit"
1620            );
1621        }
1622
1623        let out = av1_depacketize(&packets).expect("TU completed");
1624        assert_eq!(&out.data[..], &frame[..], "frame OBU reconstructed");
1625    }
1626}