Skip to main content

openipc_core/
rtp.rs

1use std::collections::BTreeMap;
2
3const DEFAULT_RTP_REORDER_WINDOW: usize = 15;
4const DEFAULT_MAX_ACCESS_UNIT_SIZE: usize = 8 * 1024 * 1024;
5
6/// Error returned while parsing or depacketizing RTP video.
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum RtpError {
9    /// Packet is shorter than the RTP fixed header or declared extension.
10    TooShort,
11    /// RTP version was not 2.
12    InvalidVersion(u8),
13    /// RTP extension header length is malformed.
14    InvalidExtension,
15    /// RTP padding length is malformed.
16    InvalidPadding,
17    /// Packet has no payload after header/extension/padding.
18    EmptyPayload,
19    /// Payload could not be interpreted as H.264 or H.265.
20    UnsupportedPayload,
21    /// Fragmented access unit exceeded the configured size guard.
22    FragmentOverflow,
23}
24
25/// Encoded video codec carried by a depacketized frame.
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub enum Codec {
28    /// H.264/AVC video.
29    H264,
30    /// H.265/HEVC video.
31    H265,
32}
33
34/// Dynamic RTP payload type used by OpenIPC for H.264.
35pub const RTP_PAYLOAD_TYPE_H264: u8 = 96;
36/// Dynamic RTP payload type used by OpenIPC for H.265.
37pub const RTP_PAYLOAD_TYPE_H265: u8 = 97;
38/// Dynamic RTP payload type used by OpenIPC/Majestic for Opus audio.
39pub const RTP_PAYLOAD_TYPE_OPUS: u8 = 98;
40
41/// Decoder configuration NAL units observed by the RTP depacketizer.
42///
43/// H.264 needs SPS and PPS before a decoder can be configured. H.265 needs
44/// VPS, SPS and PPS. PixelPilot starts its decoder as soon as these parameter
45/// sets have been observed, then feeds subsequent NAL units without requiring a
46/// fresh IDR for every startup path.
47#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
48pub struct CodecConfigState {
49    /// H.264 sequence parameter set has been seen.
50    pub h264_sps: bool,
51    /// H.264 picture parameter set has been seen.
52    pub h264_pps: bool,
53    /// H.265 video parameter set has been seen.
54    pub h265_vps: bool,
55    /// H.265 sequence parameter set has been seen.
56    pub h265_sps: bool,
57    /// H.265 picture parameter set has been seen.
58    pub h265_pps: bool,
59}
60
61impl CodecConfigState {
62    /// Return true when all parameter sets required for `codec` are cached.
63    pub const fn is_complete_for(self, codec: Codec) -> bool {
64        match codec {
65            Codec::H264 => self.h264_sps && self.h264_pps,
66            Codec::H265 => self.h265_vps && self.h265_sps && self.h265_pps,
67        }
68    }
69}
70
71/// Cumulative RTP depacketizer diagnostics.
72#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
73pub struct RtpDepacketizerStatus {
74    /// RTP packets submitted to the depacketizer.
75    pub packets: u64,
76    /// Annex-B video frames emitted.
77    pub frames_emitted: u64,
78    /// Video NAL units dropped because decoder config was not complete yet.
79    pub config_wait_drops: u64,
80    /// Keyframes emitted with cached decoder config prepended.
81    pub keyframes_with_prepended_config: u64,
82    /// Cached SPS/PPS/VPS parameter-set NAL units prepended to keyframes.
83    pub parameter_sets_prepended: u64,
84    /// Fragment chains dropped because an RTP sequence gap was observed.
85    pub fragment_sequence_gaps: u64,
86    /// Fragment chains that exceeded the configured size guard.
87    pub fragment_overflows: u64,
88    /// Packets rejected because they were not H.264/H.265 video.
89    pub unsupported_payloads: u64,
90    /// Packets rejected because the RTP header or payload was malformed.
91    pub malformed_packets: u64,
92    /// Most recent RTP payload type.
93    pub last_payload_type: Option<u8>,
94    /// Most recent RTP sequence number.
95    pub last_sequence_number: Option<u16>,
96    /// Most recent RTP timestamp.
97    pub last_timestamp: Option<u32>,
98    /// Most recent detected video codec.
99    pub last_codec: Option<Codec>,
100    /// Most recent H.264/H.265 NAL unit type.
101    pub last_nal_type: Option<u8>,
102    /// Current decoder configuration state.
103    pub codec_config: CodecConfigState,
104}
105
106/// Parsed RTP header metadata.
107#[derive(Debug, Clone, Copy, PartialEq, Eq)]
108pub struct RtpHeader {
109    /// RTP marker bit, usually set at an access-unit boundary.
110    pub marker: bool,
111    /// RTP payload type.
112    pub payload_type: u8,
113    /// RTP sequence number.
114    pub sequence_number: u16,
115    /// RTP timestamp.
116    pub timestamp: u32,
117    /// RTP synchronization source.
118    pub ssrc: u32,
119    /// Number of CSRC entries.
120    pub csrc_count: u8,
121    /// True if the packet has an RTP header extension.
122    pub has_extension: bool,
123    /// Header length including CSRC and extension bytes.
124    pub header_len: usize,
125    /// Payload length after header and padding removal.
126    pub payload_len: usize,
127}
128
129impl RtpHeader {
130    /// Parse an RTP header and validate extension/padding bounds.
131    pub fn parse(packet: &[u8]) -> Result<Self, RtpError> {
132        if packet.len() < 12 {
133            return Err(RtpError::TooShort);
134        }
135        let version = packet[0] >> 6;
136        if version != 2 {
137            return Err(RtpError::InvalidVersion(version));
138        }
139
140        let padding = packet[0] & 0x20 != 0;
141        let extension = packet[0] & 0x10 != 0;
142        let csrc_count = packet[0] & 0x0f;
143        let mut header_len = 12 + csrc_count as usize * 4;
144        if packet.len() < header_len {
145            return Err(RtpError::TooShort);
146        }
147
148        if extension {
149            if packet.len() < header_len + 4 {
150                return Err(RtpError::InvalidExtension);
151            }
152            let ext_words =
153                u16::from_be_bytes([packet[header_len + 2], packet[header_len + 3]]) as usize;
154            header_len += 4 + ext_words * 4;
155            if packet.len() < header_len {
156                return Err(RtpError::InvalidExtension);
157            }
158        }
159
160        let padding_len = if padding {
161            let len = *packet.last().ok_or(RtpError::InvalidPadding)? as usize;
162            if len == 0 || len > packet.len() - header_len {
163                return Err(RtpError::InvalidPadding);
164            }
165            len
166        } else {
167            0
168        };
169
170        let payload_len = packet.len() - header_len - padding_len;
171        if payload_len == 0 {
172            return Err(RtpError::EmptyPayload);
173        }
174
175        Ok(Self {
176            marker: packet[1] & 0x80 != 0,
177            payload_type: packet[1] & 0x7f,
178            sequence_number: u16::from_be_bytes([packet[2], packet[3]]),
179            timestamp: u32::from_be_bytes([packet[4], packet[5], packet[6], packet[7]]),
180            ssrc: u32::from_be_bytes([packet[8], packet[9], packet[10], packet[11]]),
181            csrc_count,
182            has_extension: extension,
183            header_len,
184            payload_len,
185        })
186    }
187
188    /// Borrow this packet's payload using the parsed header offsets.
189    pub fn payload<'a>(&self, packet: &'a [u8]) -> &'a [u8] {
190        &packet[self.header_len..self.header_len + self.payload_len]
191    }
192}
193
194/// Cumulative status for [`RtpReorderBuffer`].
195#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
196pub struct RtpReorderStatus {
197    /// Packets currently held while waiting for missing sequence numbers.
198    pub buffered_packets: usize,
199    /// Out-of-order packets accepted into the reorder window.
200    pub reordered_packets: u64,
201    /// Packets dropped because their sequence number was older than the window.
202    pub late_packets: u64,
203    /// Times the window flushed ahead after the missing packet did not arrive.
204    pub forced_flushes: u64,
205}
206
207/// Small RTP sequence reorder buffer.
208///
209/// PixelPilot keeps a short queue before its RTP parser so FU-A/FU fragments
210/// survive small USB/radio delivery inversions. This buffer does the same for
211/// the shared Rust receiver runtime while keeping the in-order path immediate.
212#[derive(Debug, Clone)]
213pub struct RtpReorderBuffer {
214    next_sequence: Option<u16>,
215    pending: BTreeMap<u16, Vec<u8>>,
216    max_depth: usize,
217    status: RtpReorderStatus,
218}
219
220impl Default for RtpReorderBuffer {
221    fn default() -> Self {
222        Self::new(DEFAULT_RTP_REORDER_WINDOW)
223    }
224}
225
226impl RtpReorderBuffer {
227    /// Create a reorder buffer with a maximum pending packet depth.
228    pub fn new(max_depth: usize) -> Self {
229        Self {
230            next_sequence: None,
231            pending: BTreeMap::new(),
232            max_depth: max_depth.max(1),
233            status: RtpReorderStatus::default(),
234        }
235    }
236
237    /// Push one RTP packet and return packets that are ready in sequence order.
238    pub fn push(&mut self, packet: &[u8]) -> Result<Vec<Vec<u8>>, RtpError> {
239        let header = RtpHeader::parse(packet)?;
240        let sequence = header.sequence_number;
241        let mut ready = Vec::new();
242
243        let Some(expected) = self.next_sequence else {
244            self.next_sequence = Some(sequence.wrapping_add(1));
245            ready.push(packet.to_vec());
246            return Ok(ready);
247        };
248
249        if sequence == expected {
250            self.next_sequence = Some(expected.wrapping_add(1));
251            ready.push(packet.to_vec());
252            self.drain_ready(&mut ready);
253            return Ok(ready);
254        }
255
256        if sequence_is_before(sequence, expected) {
257            self.status.late_packets = self.status.late_packets.saturating_add(1);
258            return Ok(ready);
259        }
260
261        if self.pending.insert(sequence, packet.to_vec()).is_none() {
262            self.status.reordered_packets = self.status.reordered_packets.saturating_add(1);
263        }
264        if self.pending.len() >= self.max_depth {
265            self.force_flush(expected, &mut ready);
266        }
267        self.status.buffered_packets = self.pending.len();
268        Ok(ready)
269    }
270
271    /// Return current reorder-buffer status.
272    pub fn status(&self) -> RtpReorderStatus {
273        RtpReorderStatus {
274            buffered_packets: self.pending.len(),
275            ..self.status
276        }
277    }
278
279    fn drain_ready(&mut self, ready: &mut Vec<Vec<u8>>) {
280        while let Some(expected) = self.next_sequence {
281            let Some(packet) = self.pending.remove(&expected) else {
282                break;
283            };
284            self.next_sequence = Some(expected.wrapping_add(1));
285            ready.push(packet);
286        }
287        self.status.buffered_packets = self.pending.len();
288    }
289
290    fn force_flush(&mut self, expected: u16, ready: &mut Vec<Vec<u8>>) {
291        let Some(sequence) = self
292            .pending
293            .keys()
294            .copied()
295            .min_by_key(|sequence| sequence.wrapping_sub(expected))
296        else {
297            return;
298        };
299        if let Some(packet) = self.pending.remove(&sequence) {
300            self.status.forced_flushes = self.status.forced_flushes.saturating_add(1);
301            self.next_sequence = Some(sequence.wrapping_add(1));
302            ready.push(packet);
303            self.drain_ready(ready);
304        }
305    }
306}
307
308fn sequence_is_before(sequence: u16, expected: u16) -> bool {
309    let backward = expected.wrapping_sub(sequence);
310    backward != 0 && backward < 0x8000
311}
312
313/// Encoded Annex-B H.264/H.265 access unit emitted by [`RtpDepacketizer`].
314#[derive(Debug, Clone, PartialEq, Eq)]
315pub struct DepacketizedFrame {
316    /// Annex-B byte stream data including start codes.
317    pub data: Vec<u8>,
318    /// RTP timestamp associated with the access unit.
319    pub timestamp: u32,
320    /// True when the frame contains an IDR/keyframe entry point.
321    pub is_keyframe: bool,
322    /// Video codec for this frame.
323    pub codec: Codec,
324    /// RTP payload type that produced this frame.
325    pub payload_type: u8,
326    /// RTP sequence number of the packet that completed this frame.
327    pub sequence_number: u16,
328    /// H.264/H.265 NAL unit type for the frame payload.
329    pub nal_type: u8,
330    /// Decoder parameter-set state at the time this frame was emitted.
331    pub codec_config: CodecConfigState,
332}
333
334#[derive(Debug, Default, Clone)]
335struct FragmentState {
336    data: Vec<u8>,
337    timestamp: u32,
338    next_sequence: Option<u16>,
339    corrupted: bool,
340}
341
342#[derive(Debug, Default, Clone)]
343struct AccessUnitState {
344    data: Vec<u8>,
345    timestamp: Option<u32>,
346    next_sequence: Option<u16>,
347    corrupted: bool,
348    is_keyframe: bool,
349    has_decoder_config: bool,
350    nal_type: u8,
351}
352
353#[derive(Debug, Clone, Copy)]
354struct FrameMeta {
355    timestamp: u32,
356    is_keyframe: bool,
357    codec: Codec,
358    payload_type: u8,
359    sequence_number: u16,
360    nal_type: u8,
361}
362
363/// Stateful RTP depacketizer for OpenIPC H.264/H.265 video.
364///
365/// The depacketizer buffers fragmented NAL units, drops incomplete fragments
366/// across sequence gaps, and emits complete Annex-B access units.
367#[derive(Debug, Clone)]
368pub struct RtpDepacketizer {
369    h264: FragmentState,
370    h265: FragmentState,
371    h264_access_unit: AccessUnitState,
372    h265_access_unit: AccessUnitState,
373    h264_sps: Option<Vec<u8>>,
374    h264_pps: Option<Vec<u8>>,
375    h265_vps: Option<Vec<u8>>,
376    h265_sps: Option<Vec<u8>>,
377    h265_pps: Option<Vec<u8>>,
378    max_fragment: usize,
379    status: RtpDepacketizerStatus,
380}
381
382impl Default for RtpDepacketizer {
383    fn default() -> Self {
384        Self::new()
385    }
386}
387
388impl RtpDepacketizer {
389    /// Create a depacketizer with the default fragment-size guard.
390    pub fn new() -> Self {
391        Self {
392            h264: FragmentState::default(),
393            h265: FragmentState::default(),
394            h264_access_unit: AccessUnitState::default(),
395            h265_access_unit: AccessUnitState::default(),
396            h264_sps: None,
397            h264_pps: None,
398            h265_vps: None,
399            h265_sps: None,
400            h265_pps: None,
401            max_fragment: DEFAULT_MAX_ACCESS_UNIT_SIZE,
402            status: RtpDepacketizerStatus::default(),
403        }
404    }
405
406    /// Return cumulative depacketizer status and codec configuration state.
407    pub fn status(&self) -> RtpDepacketizerStatus {
408        RtpDepacketizerStatus {
409            codec_config: self.codec_config(),
410            ..self.status
411        }
412    }
413
414    /// Return the current decoder parameter-set state.
415    pub fn codec_config(&self) -> CodecConfigState {
416        CodecConfigState {
417            h264_sps: self.h264_sps.is_some(),
418            h264_pps: self.h264_pps.is_some(),
419            h265_vps: self.h265_vps.is_some(),
420            h265_sps: self.h265_sps.is_some(),
421            h265_pps: self.h265_pps.is_some(),
422        }
423    }
424
425    /// Push one RTP packet and return a complete frame when one is ready.
426    pub fn push(&mut self, packet: &[u8]) -> Result<Option<DepacketizedFrame>, RtpError> {
427        self.status.packets = self.status.packets.saturating_add(1);
428        let header = match RtpHeader::parse(packet) {
429            Ok(header) => header,
430            Err(err) => {
431                self.record_error(err);
432                return Err(err);
433            }
434        };
435        self.status.last_payload_type = Some(header.payload_type);
436        self.status.last_sequence_number = Some(header.sequence_number);
437        self.status.last_timestamp = Some(header.timestamp);
438        let payload = header.payload(packet);
439        log::trace!(
440            target: "openipc_core::rtp",
441            "received RTP packet sequence={} timestamp={} pt={} marker={} bytes={}",
442            header.sequence_number,
443            header.timestamp,
444            header.payload_type,
445            header.marker,
446            payload.len()
447        );
448        if header.payload_type == RTP_PAYLOAD_TYPE_OPUS {
449            self.record_error(RtpError::UnsupportedPayload);
450            return Err(RtpError::UnsupportedPayload);
451        }
452        let Some(codec) =
453            codec_from_payload_type(header.payload_type).or_else(|| detect_codec(payload))
454        else {
455            self.record_error(RtpError::UnsupportedPayload);
456            return Err(RtpError::UnsupportedPayload);
457        };
458        self.status.last_codec = Some(codec);
459        self.observe_access_unit_packet(codec, header);
460        let result = match codec {
461            Codec::H264 => self.push_h264(payload, header),
462            Codec::H265 => self.push_h265(payload, header),
463        };
464        match &result {
465            Ok(Some(_)) => {
466                self.status.frames_emitted = self.status.frames_emitted.saturating_add(1)
467            }
468            Err(err) => {
469                log::debug!(
470                    target: "openipc_core::rtp",
471                    "RTP packet rejected sequence={}: {err:?}",
472                    header.sequence_number
473                );
474                self.record_error(*err);
475            }
476            _ => {}
477        }
478        result
479    }
480
481    fn push_h264(
482        &mut self,
483        payload: &[u8],
484        header: RtpHeader,
485    ) -> Result<Option<DepacketizedFrame>, RtpError> {
486        let nal_type = payload[0] & 0x1f;
487        self.status.last_nal_type = Some(nal_type);
488        match nal_type {
489            7 => {
490                self.h264_sps = Some(payload.to_vec());
491                Ok(None)
492            }
493            8 => {
494                self.h264_pps = Some(payload.to_vec());
495                Ok(None)
496            }
497            24 => self.h264_stap_a(payload, header),
498            28 => self.h264_fu_a(payload, header),
499            _ if self.has_decoder_config(Codec::H264) && is_h264_vcl_nal(nal_type) => self
500                .push_complete_nalu(
501                    payload,
502                    FrameMeta {
503                        timestamp: header.timestamp,
504                        is_keyframe: nal_type == 5,
505                        codec: Codec::H264,
506                        payload_type: header.payload_type,
507                        sequence_number: header.sequence_number,
508                        nal_type,
509                    },
510                    header.marker,
511                ),
512            _ if !is_h264_vcl_nal(nal_type) => Ok(None),
513            _ => {
514                self.status.config_wait_drops = self.status.config_wait_drops.saturating_add(1);
515                Ok(None)
516            }
517        }
518    }
519
520    fn push_h265(
521        &mut self,
522        payload: &[u8],
523        header: RtpHeader,
524    ) -> Result<Option<DepacketizedFrame>, RtpError> {
525        if payload.len() < 2 {
526            return Err(RtpError::UnsupportedPayload);
527        }
528        let nal_type = (payload[0] >> 1) & 0x3f;
529        self.status.last_nal_type = Some(nal_type);
530        match nal_type {
531            32 => {
532                self.h265_vps = Some(payload.to_vec());
533                Ok(None)
534            }
535            33 => {
536                self.h265_sps = Some(payload.to_vec());
537                Ok(None)
538            }
539            34 => {
540                self.h265_pps = Some(payload.to_vec());
541                Ok(None)
542            }
543            48 => self.h265_ap(payload, header),
544            49 => self.h265_fu(payload, header),
545            _ if self.has_decoder_config(Codec::H265) && is_h265_vcl_nal(nal_type) => self
546                .push_complete_nalu(
547                    payload,
548                    FrameMeta {
549                        timestamp: header.timestamp,
550                        is_keyframe: (16..=23).contains(&nal_type),
551                        codec: Codec::H265,
552                        payload_type: header.payload_type,
553                        sequence_number: header.sequence_number,
554                        nal_type,
555                    },
556                    header.marker,
557                ),
558            _ if !is_h265_vcl_nal(nal_type) => Ok(None),
559            _ => {
560                self.status.config_wait_drops = self.status.config_wait_drops.saturating_add(1);
561                Ok(None)
562            }
563        }
564    }
565
566    fn h264_fu_a(
567        &mut self,
568        payload: &[u8],
569        header: RtpHeader,
570    ) -> Result<Option<DepacketizedFrame>, RtpError> {
571        if payload.len() < 2 {
572            return Err(RtpError::UnsupportedPayload);
573        }
574        let fu_indicator = payload[0];
575        let fu_header = payload[1];
576        let start = fu_header & 0x80 != 0;
577        let end = fu_header & 0x40 != 0;
578        let nal_type = fu_header & 0x1f;
579        if start {
580            self.h264.data.clear();
581            self.h264.timestamp = header.timestamp;
582            self.h264.next_sequence = Some(header.sequence_number.wrapping_add(1));
583            self.h264.corrupted = false;
584            self.h264.data.push((fu_indicator & 0xe0) | nal_type);
585        } else if !self.accept_fragment_sequence(Codec::H264, header.sequence_number) {
586            return Ok(None);
587        }
588        if !self.h264.corrupted {
589            self.append_fragment(Codec::H264, &payload[2..])?;
590        }
591        if end {
592            if !is_h264_vcl_nal(nal_type) {
593                self.reset_fragment(Codec::H264);
594                return Ok(None);
595            }
596            if self.h264.corrupted || !self.has_decoder_config(Codec::H264) {
597                if !self.has_decoder_config(Codec::H264) {
598                    self.status.config_wait_drops = self.status.config_wait_drops.saturating_add(1);
599                }
600                self.reset_fragment(Codec::H264);
601                return Ok(None);
602            }
603            let data = std::mem::take(&mut self.h264.data);
604            let meta = FrameMeta {
605                timestamp: self.h264.timestamp,
606                is_keyframe: nal_type == 5,
607                codec: Codec::H264,
608                payload_type: header.payload_type,
609                sequence_number: header.sequence_number,
610                nal_type,
611            };
612            self.reset_fragment(Codec::H264);
613            self.push_complete_owned_nalu(data, meta, header.marker)
614        } else {
615            Ok(None)
616        }
617    }
618
619    fn h265_fu(
620        &mut self,
621        payload: &[u8],
622        header: RtpHeader,
623    ) -> Result<Option<DepacketizedFrame>, RtpError> {
624        if payload.len() < 3 {
625            return Err(RtpError::UnsupportedPayload);
626        }
627        let fu_header = payload[2];
628        let start = fu_header & 0x80 != 0;
629        let end = fu_header & 0x40 != 0;
630        let nal_type = fu_header & 0x3f;
631        if start {
632            self.h265.data.clear();
633            self.h265.timestamp = header.timestamp;
634            self.h265.next_sequence = Some(header.sequence_number.wrapping_add(1));
635            self.h265.corrupted = false;
636            self.h265.data.push((nal_type << 1) | (payload[0] & 0x01));
637            self.h265.data.push(payload[1]);
638        } else if !self.accept_fragment_sequence(Codec::H265, header.sequence_number) {
639            return Ok(None);
640        }
641        if !self.h265.corrupted {
642            self.append_fragment(Codec::H265, &payload[3..])?;
643        }
644        if end {
645            if !is_h265_vcl_nal(nal_type) {
646                self.reset_fragment(Codec::H265);
647                return Ok(None);
648            }
649            if self.h265.corrupted || !self.has_decoder_config(Codec::H265) {
650                if !self.has_decoder_config(Codec::H265) {
651                    self.status.config_wait_drops = self.status.config_wait_drops.saturating_add(1);
652                }
653                self.reset_fragment(Codec::H265);
654                return Ok(None);
655            }
656            let data = std::mem::take(&mut self.h265.data);
657            let meta = FrameMeta {
658                timestamp: self.h265.timestamp,
659                is_keyframe: (16..=23).contains(&nal_type),
660                codec: Codec::H265,
661                payload_type: header.payload_type,
662                sequence_number: header.sequence_number,
663                nal_type,
664            };
665            self.reset_fragment(Codec::H265);
666            self.push_complete_owned_nalu(data, meta, header.marker)
667        } else {
668            Ok(None)
669        }
670    }
671
672    fn accept_fragment_sequence(&mut self, codec: Codec, sequence_number: u16) -> bool {
673        let state = match codec {
674            Codec::H264 => &mut self.h264,
675            Codec::H265 => &mut self.h265,
676        };
677        let Some(expected) = state.next_sequence else {
678            return false;
679        };
680        state.next_sequence = Some(sequence_number.wrapping_add(1));
681        if sequence_number != expected {
682            state.data.clear();
683            state.corrupted = true;
684            self.status.fragment_sequence_gaps =
685                self.status.fragment_sequence_gaps.saturating_add(1);
686            return false;
687        }
688        true
689    }
690
691    fn reset_fragment(&mut self, codec: Codec) {
692        let state = match codec {
693            Codec::H264 => &mut self.h264,
694            Codec::H265 => &mut self.h265,
695        };
696        state.data.clear();
697        state.next_sequence = None;
698        state.corrupted = false;
699    }
700
701    fn h264_stap_a(
702        &mut self,
703        payload: &[u8],
704        header: RtpHeader,
705    ) -> Result<Option<DepacketizedFrame>, RtpError> {
706        let mut out = Vec::new();
707        let mut offset = 1;
708        let mut keyframe = false;
709        let mut has_slice = false;
710        let mut has_sps = false;
711        let mut has_pps = false;
712        let mut last_slice_type = 0;
713        while offset + 2 <= payload.len() {
714            let len = u16::from_be_bytes([payload[offset], payload[offset + 1]]) as usize;
715            offset += 2;
716            if len == 0 || offset.saturating_add(len) > payload.len() {
717                return Err(RtpError::UnsupportedPayload);
718            }
719            let nalu = &payload[offset..offset + len];
720            let nal_type = nalu.first().map(|b| b & 0x1f).unwrap_or(0);
721            self.status.last_nal_type = Some(nal_type);
722            match nal_type {
723                7 => {
724                    has_sps = true;
725                    self.h264_sps = Some(nalu.to_vec());
726                }
727                8 => {
728                    has_pps = true;
729                    self.h264_pps = Some(nalu.to_vec());
730                }
731                _ => {}
732            }
733            if is_h264_vcl_nal(nal_type) {
734                has_slice = true;
735                keyframe |= nal_type == 5;
736                last_slice_type = nal_type;
737            }
738            append_annex_b(&mut out, nalu);
739            offset += len;
740        }
741        if offset != payload.len() {
742            return Err(RtpError::UnsupportedPayload);
743        }
744        if !has_slice || !self.has_decoder_config(Codec::H264) {
745            if has_slice {
746                self.status.config_wait_drops = self.status.config_wait_drops.saturating_add(1);
747            }
748            return Ok(None);
749        }
750        self.push_complete_owned_annex_b(
751            out,
752            FrameMeta {
753                timestamp: header.timestamp,
754                is_keyframe: keyframe,
755                codec: Codec::H264,
756                payload_type: header.payload_type,
757                sequence_number: header.sequence_number,
758                nal_type: last_slice_type,
759            },
760            header.marker,
761            has_sps && has_pps,
762        )
763    }
764
765    fn h265_ap(
766        &mut self,
767        payload: &[u8],
768        header: RtpHeader,
769    ) -> Result<Option<DepacketizedFrame>, RtpError> {
770        let mut out = Vec::new();
771        let mut offset = 2;
772        let mut keyframe = false;
773        let mut has_slice = false;
774        let mut has_vps = false;
775        let mut has_sps = false;
776        let mut has_pps = false;
777        let mut last_slice_type = 0;
778        while offset + 2 <= payload.len() {
779            let len = u16::from_be_bytes([payload[offset], payload[offset + 1]]) as usize;
780            offset += 2;
781            if len == 0 || offset.saturating_add(len) > payload.len() {
782                return Err(RtpError::UnsupportedPayload);
783            }
784            let nalu = &payload[offset..offset + len];
785            let nal_type = nalu.first().map(|b| (b >> 1) & 0x3f).unwrap_or(0);
786            self.status.last_nal_type = Some(nal_type);
787            match nal_type {
788                32 => {
789                    has_vps = true;
790                    self.h265_vps = Some(nalu.to_vec());
791                }
792                33 => {
793                    has_sps = true;
794                    self.h265_sps = Some(nalu.to_vec());
795                }
796                34 => {
797                    has_pps = true;
798                    self.h265_pps = Some(nalu.to_vec());
799                }
800                _ => {}
801            }
802            if is_h265_vcl_nal(nal_type) {
803                has_slice = true;
804                keyframe |= (16..=23).contains(&nal_type);
805                last_slice_type = nal_type;
806            }
807            append_annex_b(&mut out, nalu);
808            offset += len;
809        }
810        if offset != payload.len() {
811            return Err(RtpError::UnsupportedPayload);
812        }
813        if !has_slice || !self.has_decoder_config(Codec::H265) {
814            if has_slice {
815                self.status.config_wait_drops = self.status.config_wait_drops.saturating_add(1);
816            }
817            return Ok(None);
818        }
819        self.push_complete_owned_annex_b(
820            out,
821            FrameMeta {
822                timestamp: header.timestamp,
823                is_keyframe: keyframe,
824                codec: Codec::H265,
825                payload_type: header.payload_type,
826                sequence_number: header.sequence_number,
827                nal_type: last_slice_type,
828            },
829            header.marker,
830            has_vps && has_sps && has_pps,
831        )
832    }
833
834    fn append_fragment(&mut self, codec: Codec, bytes: &[u8]) -> Result<(), RtpError> {
835        let state = match codec {
836            Codec::H264 => &mut self.h264,
837            Codec::H265 => &mut self.h265,
838        };
839        if state.data.len() + bytes.len() > self.max_fragment {
840            self.status.fragment_overflows = self.status.fragment_overflows.saturating_add(1);
841            return Err(RtpError::FragmentOverflow);
842        }
843        state.data.extend_from_slice(bytes);
844        Ok(())
845    }
846
847    fn push_complete_nalu(
848        &mut self,
849        nalu: &[u8],
850        meta: FrameMeta,
851        marker: bool,
852    ) -> Result<Option<DepacketizedFrame>, RtpError> {
853        let mut owned = Vec::with_capacity(nalu.len());
854        owned.extend_from_slice(nalu);
855        self.push_complete_owned_nalu(owned, meta, marker)
856    }
857
858    fn push_complete_owned_nalu(
859        &mut self,
860        nalu: Vec<u8>,
861        meta: FrameMeta,
862        marker: bool,
863    ) -> Result<Option<DepacketizedFrame>, RtpError> {
864        let mut data = Vec::with_capacity(nalu.len().saturating_add(4));
865        append_annex_b(&mut data, &nalu);
866        self.push_complete_owned_annex_b(data, meta, marker, false)
867    }
868
869    fn push_complete_owned_annex_b(
870        &mut self,
871        annex_b: Vec<u8>,
872        meta: FrameMeta,
873        marker: bool,
874        has_decoder_config: bool,
875    ) -> Result<Option<DepacketizedFrame>, RtpError> {
876        let max_fragment = self.max_fragment;
877        let state = match meta.codec {
878            Codec::H264 => &mut self.h264_access_unit,
879            Codec::H265 => &mut self.h265_access_unit,
880        };
881        debug_assert_eq!(state.timestamp, Some(meta.timestamp));
882        if state.corrupted {
883            if marker {
884                reset_access_unit_state(state);
885            }
886            return Ok(None);
887        }
888        if state.data.len().saturating_add(annex_b.len()) > max_fragment {
889            reset_access_unit_state(state);
890            self.status.fragment_overflows = self.status.fragment_overflows.saturating_add(1);
891            return Err(RtpError::FragmentOverflow);
892        }
893        state.data.extend_from_slice(&annex_b);
894        state.is_keyframe |= meta.is_keyframe;
895        state.has_decoder_config |= has_decoder_config;
896        state.nal_type = meta.nal_type;
897        if !marker {
898            return Ok(None);
899        }
900
901        let mut data = std::mem::take(&mut state.data);
902        let is_keyframe = state.is_keyframe;
903        let has_decoder_config = state.has_decoder_config;
904        let nal_type = state.nal_type;
905        reset_access_unit_state(state);
906        if is_keyframe && !has_decoder_config {
907            let mut prefixed = Vec::with_capacity(data.len() + self.cached_config_len(meta.codec));
908            self.prepend_cached_config(&mut prefixed, meta.codec);
909            prefixed.append(&mut data);
910            data = prefixed;
911        }
912        Ok(Some(DepacketizedFrame {
913            data,
914            timestamp: meta.timestamp,
915            is_keyframe,
916            codec: meta.codec,
917            payload_type: meta.payload_type,
918            sequence_number: meta.sequence_number,
919            nal_type,
920            codec_config: self.codec_config(),
921        }))
922    }
923
924    fn observe_access_unit_packet(&mut self, codec: Codec, header: RtpHeader) {
925        let state = match codec {
926            Codec::H264 => &mut self.h264_access_unit,
927            Codec::H265 => &mut self.h265_access_unit,
928        };
929        if state
930            .timestamp
931            .is_some_and(|timestamp| timestamp != header.timestamp)
932        {
933            if !state.data.is_empty() {
934                self.status.fragment_sequence_gaps =
935                    self.status.fragment_sequence_gaps.saturating_add(1);
936            }
937            reset_access_unit_state(state);
938        }
939        if state.timestamp.is_none() {
940            state.timestamp = Some(header.timestamp);
941        } else if state
942            .next_sequence
943            .is_some_and(|expected| expected != header.sequence_number)
944        {
945            if !state.corrupted {
946                self.status.fragment_sequence_gaps =
947                    self.status.fragment_sequence_gaps.saturating_add(1);
948            }
949            state.corrupted = true;
950            state.data.clear();
951        }
952        state.next_sequence = Some(header.sequence_number.wrapping_add(1));
953    }
954
955    fn cached_config_len(&self, codec: Codec) -> usize {
956        match codec {
957            Codec::H264 => {
958                self.h264_sps.as_ref().map_or(0, Vec::len)
959                    + self.h264_pps.as_ref().map_or(0, Vec::len)
960                    + 8
961            }
962            Codec::H265 => {
963                self.h265_vps.as_ref().map_or(0, Vec::len)
964                    + self.h265_sps.as_ref().map_or(0, Vec::len)
965                    + self.h265_pps.as_ref().map_or(0, Vec::len)
966                    + 12
967            }
968        }
969    }
970
971    fn prepend_cached_config(&mut self, data: &mut Vec<u8>, codec: Codec) {
972        let mut prepended = 0u64;
973        match codec {
974            Codec::H264 => {
975                if let Some(sps) = &self.h264_sps {
976                    append_annex_b(data, sps);
977                    prepended += 1;
978                }
979                if let Some(pps) = &self.h264_pps {
980                    append_annex_b(data, pps);
981                    prepended += 1;
982                }
983            }
984            Codec::H265 => {
985                if let Some(vps) = &self.h265_vps {
986                    append_annex_b(data, vps);
987                    prepended += 1;
988                }
989                if let Some(sps) = &self.h265_sps {
990                    append_annex_b(data, sps);
991                    prepended += 1;
992                }
993                if let Some(pps) = &self.h265_pps {
994                    append_annex_b(data, pps);
995                    prepended += 1;
996                }
997            }
998        }
999        if prepended > 0 {
1000            self.status.keyframes_with_prepended_config = self
1001                .status
1002                .keyframes_with_prepended_config
1003                .saturating_add(1);
1004            self.status.parameter_sets_prepended = self
1005                .status
1006                .parameter_sets_prepended
1007                .saturating_add(prepended);
1008        }
1009    }
1010
1011    fn has_decoder_config(&self, codec: Codec) -> bool {
1012        match codec {
1013            Codec::H264 => self.h264_sps.is_some() && self.h264_pps.is_some(),
1014            Codec::H265 => {
1015                self.h265_vps.is_some() && self.h265_sps.is_some() && self.h265_pps.is_some()
1016            }
1017        }
1018    }
1019
1020    fn record_error(&mut self, err: RtpError) {
1021        match err {
1022            RtpError::UnsupportedPayload => {
1023                self.status.unsupported_payloads =
1024                    self.status.unsupported_payloads.saturating_add(1);
1025            }
1026            RtpError::FragmentOverflow => {}
1027            _ => {
1028                self.status.malformed_packets = self.status.malformed_packets.saturating_add(1);
1029            }
1030        }
1031    }
1032}
1033
1034fn reset_access_unit_state(state: &mut AccessUnitState) {
1035    state.data.clear();
1036    state.timestamp = None;
1037    state.next_sequence = None;
1038    state.corrupted = false;
1039    state.is_keyframe = false;
1040    state.has_decoder_config = false;
1041    state.nal_type = 0;
1042}
1043
1044fn codec_from_payload_type(payload_type: u8) -> Option<Codec> {
1045    match payload_type {
1046        RTP_PAYLOAD_TYPE_H264 => Some(Codec::H264),
1047        RTP_PAYLOAD_TYPE_H265 => Some(Codec::H265),
1048        _ => None,
1049    }
1050}
1051
1052fn detect_codec(payload: &[u8]) -> Option<Codec> {
1053    if payload.is_empty() {
1054        return None;
1055    }
1056    if payload.len() >= 2 {
1057        let h265_nal_type = (payload[0] >> 1) & 0x3f;
1058        if h265_nal_type == 48 || h265_nal_type == 49 || (32..=40).contains(&h265_nal_type) {
1059            return Some(Codec::H265);
1060        }
1061    }
1062    let h264_nal_type = payload[0] & 0x1f;
1063    if h264_nal_type == 24 || h264_nal_type == 28 || (1..=12).contains(&h264_nal_type) {
1064        return Some(Codec::H264);
1065    }
1066    None
1067}
1068
1069fn is_h264_vcl_nal(nal_type: u8) -> bool {
1070    (1..=5).contains(&nal_type)
1071}
1072
1073fn is_h265_vcl_nal(nal_type: u8) -> bool {
1074    nal_type <= 31
1075}
1076
1077fn append_annex_b(out: &mut Vec<u8>, nalu: &[u8]) {
1078    out.extend_from_slice(&[0, 0, 0, 1]);
1079    out.extend_from_slice(nalu);
1080}
1081
1082#[cfg(test)]
1083mod tests {
1084    use super::*;
1085
1086    fn rtp(payload: &[u8], marker: bool, seq: u16, timestamp: u32) -> Vec<u8> {
1087        rtp_with_payload_type(payload, RTP_PAYLOAD_TYPE_H264, marker, seq, timestamp)
1088    }
1089
1090    fn rtp_with_payload_type(
1091        payload: &[u8],
1092        payload_type: u8,
1093        marker: bool,
1094        seq: u16,
1095        timestamp: u32,
1096    ) -> Vec<u8> {
1097        let mut packet = vec![
1098            0x80,
1099            (if marker { 0x80 } else { 0x00 }) | (payload_type & 0x7f),
1100        ];
1101        packet.extend_from_slice(&seq.to_be_bytes());
1102        packet.extend_from_slice(&timestamp.to_be_bytes());
1103        packet.extend_from_slice(&0x1122_3344u32.to_be_bytes());
1104        packet.extend_from_slice(payload);
1105        packet
1106    }
1107
1108    fn stap_a(units: &[&[u8]]) -> Vec<u8> {
1109        let mut payload = vec![24];
1110        for unit in units {
1111            payload.extend_from_slice(&(unit.len() as u16).to_be_bytes());
1112            payload.extend_from_slice(unit);
1113        }
1114        payload
1115    }
1116
1117    fn h265_ap(units: &[&[u8]]) -> Vec<u8> {
1118        let mut payload = vec![0x60, 0x01];
1119        for unit in units {
1120            payload.extend_from_slice(&(unit.len() as u16).to_be_bytes());
1121            payload.extend_from_slice(unit);
1122        }
1123        payload
1124    }
1125
1126    fn prime_h264(depay: &mut RtpDepacketizer) {
1127        assert!(depay
1128            .push(&rtp(&[0x67, 0x64, 0x00, 0x1f], true, 1, 10))
1129            .unwrap()
1130            .is_none());
1131        assert!(depay
1132            .push(&rtp(&[0x68, 0xee], true, 2, 10))
1133            .unwrap()
1134            .is_none());
1135    }
1136
1137    fn prime_h265(depay: &mut RtpDepacketizer) {
1138        for (seq, payload) in [
1139            (1, &[0x40, 0x01, 0xaa][..]),
1140            (2, &[0x42, 0x01, 0xbb][..]),
1141            (3, &[0x44, 0x01, 0xcc][..]),
1142        ] {
1143            assert!(depay
1144                .push(&rtp_with_payload_type(
1145                    payload,
1146                    RTP_PAYLOAD_TYPE_H265,
1147                    true,
1148                    seq,
1149                    10,
1150                ))
1151                .unwrap()
1152                .is_none());
1153        }
1154    }
1155
1156    #[test]
1157    fn parses_rtp_header() {
1158        let packet = rtp(&[0x65, 1, 2], true, 7, 1234);
1159        let header = RtpHeader::parse(&packet).unwrap();
1160        assert!(header.marker);
1161        assert_eq!(header.payload_type, 96);
1162        assert_eq!(header.sequence_number, 7);
1163        assert_eq!(header.timestamp, 1234);
1164        assert_eq!(header.payload(&packet), &[0x65, 1, 2]);
1165    }
1166
1167    #[test]
1168    fn depacketizes_h264_single_nalu_as_annex_b() {
1169        let mut depay = RtpDepacketizer::new();
1170        prime_h264(&mut depay);
1171        let frame = depay
1172            .push(&rtp(&[0x65, 0xaa], true, 1, 42))
1173            .unwrap()
1174            .unwrap();
1175        assert_eq!(frame.codec, Codec::H264);
1176        assert!(frame.is_keyframe);
1177        assert_eq!(
1178            frame.data,
1179            [
1180                &[0, 0, 0, 1, 0x67, 0x64, 0x00, 0x1f][..],
1181                &[0, 0, 0, 1, 0x68, 0xee][..],
1182                &[0, 0, 0, 1, 0x65, 0xaa][..],
1183            ]
1184            .concat()
1185        );
1186    }
1187
1188    #[test]
1189    fn combines_same_timestamp_h264_slices_until_marker() {
1190        let mut depay = RtpDepacketizer::new();
1191        prime_h264(&mut depay);
1192        assert!(depay
1193            .push(&rtp(&[0x41, 0x80, 0xaa], false, 3, 42))
1194            .unwrap()
1195            .is_none());
1196        let frame = depay
1197            .push(&rtp(&[0x41, 0x40, 0xbb], true, 4, 42))
1198            .unwrap()
1199            .unwrap();
1200
1201        assert_eq!(
1202            frame.data,
1203            [
1204                &[0, 0, 0, 1, 0x41, 0x80, 0xaa][..],
1205                &[0, 0, 0, 1, 0x41, 0x40, 0xbb][..],
1206            ]
1207            .concat()
1208        );
1209        assert_eq!(frame.timestamp, 42);
1210        assert!(!frame.is_keyframe);
1211    }
1212
1213    #[test]
1214    fn drops_access_unit_after_sequence_gap() {
1215        let mut depay = RtpDepacketizer::new();
1216        prime_h264(&mut depay);
1217        assert!(depay
1218            .push(&rtp(&[0x41, 0x80, 0xaa], false, 3, 42))
1219            .unwrap()
1220            .is_none());
1221        assert!(depay
1222            .push(&rtp(&[0x41, 0x40, 0xbb], true, 5, 42))
1223            .unwrap()
1224            .is_none());
1225
1226        assert_eq!(depay.status().fragment_sequence_gaps, 1);
1227        assert!(depay
1228            .push(&rtp(&[0x41, 0xcc], true, 6, 43))
1229            .unwrap()
1230            .is_some());
1231    }
1232
1233    #[test]
1234    fn drops_h264_video_until_sps_and_pps_are_seen() {
1235        let mut depay = RtpDepacketizer::new();
1236        assert!(depay
1237            .push(&rtp(&[0x65, 0xaa], true, 1, 42))
1238            .unwrap()
1239            .is_none());
1240        let status = depay.status();
1241        assert_eq!(status.config_wait_drops, 1);
1242        assert!(!status.codec_config.is_complete_for(Codec::H264));
1243        assert_eq!(status.last_nal_type, Some(5));
1244    }
1245
1246    #[test]
1247    fn h264_payload_type_prevents_h265_false_positive() {
1248        let mut depay = RtpDepacketizer::new();
1249        prime_h264(&mut depay);
1250        let frame = depay
1251            .push(&rtp(&[0x41, 0xaa], true, 1, 42))
1252            .unwrap()
1253            .unwrap();
1254        assert_eq!(frame.codec, Codec::H264);
1255        assert!(!frame.is_keyframe);
1256        assert_eq!(frame.data, &[0, 0, 0, 1, 0x41, 0xaa]);
1257    }
1258
1259    #[test]
1260    fn h264_non_vcl_nal_is_not_emitted_as_video_frame() {
1261        let mut depay = RtpDepacketizer::new();
1262        prime_h264(&mut depay);
1263        assert!(depay
1264            .push(&rtp(&[0x06, 0x05, 0xff], true, 3, 42))
1265            .unwrap()
1266            .is_none());
1267    }
1268
1269    #[test]
1270    fn opus_payload_type_is_not_sniffed_as_video() {
1271        let mut depay = RtpDepacketizer::new();
1272        prime_h264(&mut depay);
1273        let err = depay
1274            .push(&rtp_with_payload_type(
1275                &[0x65, 0xaa],
1276                RTP_PAYLOAD_TYPE_OPUS,
1277                true,
1278                1,
1279                42,
1280            ))
1281            .unwrap_err();
1282        assert_eq!(err, RtpError::UnsupportedPayload);
1283    }
1284
1285    #[test]
1286    fn depacketizes_h265_single_nalu_by_payload_type() {
1287        let mut depay = RtpDepacketizer::new();
1288        prime_h265(&mut depay);
1289        let frame = depay
1290            .push(&rtp_with_payload_type(
1291                &[0x02, 0x01, 0xaa],
1292                RTP_PAYLOAD_TYPE_H265,
1293                true,
1294                1,
1295                42,
1296            ))
1297            .unwrap()
1298            .unwrap();
1299        assert_eq!(frame.codec, Codec::H265);
1300        assert!(!frame.is_keyframe);
1301        assert_eq!(frame.data, &[0, 0, 0, 1, 0x02, 0x01, 0xaa]);
1302    }
1303
1304    #[test]
1305    fn h265_non_vcl_nal_is_not_emitted_as_video_frame() {
1306        let mut depay = RtpDepacketizer::new();
1307        prime_h265(&mut depay);
1308        assert!(depay
1309            .push(&rtp_with_payload_type(
1310                &[0x4e, 0x01, 0xff],
1311                RTP_PAYLOAD_TYPE_H265,
1312                true,
1313                4,
1314                42,
1315            ))
1316            .unwrap()
1317            .is_none());
1318    }
1319
1320    #[test]
1321    fn h264_stap_a_caches_parameter_sets_for_later_keyframe() {
1322        let mut depay = RtpDepacketizer::new();
1323        let sps = &[0x67, 0x64, 0x00, 0x1f][..];
1324        let pps = &[0x68, 0xee][..];
1325        let aggregate = depay.push(&rtp(&stap_a(&[sps, pps]), true, 1, 10)).unwrap();
1326        assert!(aggregate.is_none());
1327
1328        let frame = depay
1329            .push(&rtp(&[0x65, 0xaa], true, 2, 20))
1330            .unwrap()
1331            .unwrap();
1332        assert!(frame.is_keyframe);
1333        assert_eq!(
1334            frame.data,
1335            [
1336                &[0, 0, 0, 1][..],
1337                sps,
1338                &[0, 0, 0, 1][..],
1339                pps,
1340                &[0, 0, 0, 1, 0x65, 0xaa][..],
1341            ]
1342            .concat()
1343        );
1344    }
1345
1346    #[test]
1347    fn h264_stap_a_prepends_cached_parameter_sets_for_idr_without_inband_config() {
1348        let mut depay = RtpDepacketizer::new();
1349        let sps = &[0x67, 0x64, 0x00, 0x1f][..];
1350        let pps = &[0x68, 0xee][..];
1351        depay.push(&rtp(&stap_a(&[sps, pps]), true, 1, 10)).unwrap();
1352
1353        let frame = depay
1354            .push(&rtp(&stap_a(&[&[0x65, 0xaa, 0xbb]]), true, 2, 20))
1355            .unwrap()
1356            .unwrap();
1357
1358        assert!(frame.is_keyframe);
1359        assert_eq!(
1360            frame.data,
1361            [
1362                &[0, 0, 0, 1][..],
1363                sps,
1364                &[0, 0, 0, 1][..],
1365                pps,
1366                &[0, 0, 0, 1, 0x65, 0xaa, 0xbb][..],
1367            ]
1368            .concat()
1369        );
1370        let status = depay.status();
1371        assert_eq!(status.keyframes_with_prepended_config, 1);
1372        assert_eq!(status.parameter_sets_prepended, 2);
1373    }
1374
1375    #[test]
1376    fn h264_stap_a_does_not_duplicate_inband_parameter_sets() {
1377        let mut depay = RtpDepacketizer::new();
1378        let sps = &[0x67, 0x64, 0x00, 0x1f][..];
1379        let pps = &[0x68, 0xee][..];
1380        let frame = depay
1381            .push(&rtp(&stap_a(&[sps, pps, &[0x65, 0xaa]]), true, 1, 20))
1382            .unwrap()
1383            .unwrap();
1384
1385        assert_eq!(
1386            frame.data,
1387            [
1388                &[0, 0, 0, 1][..],
1389                sps,
1390                &[0, 0, 0, 1][..],
1391                pps,
1392                &[0, 0, 0, 1, 0x65, 0xaa][..],
1393            ]
1394            .concat()
1395        );
1396        let status = depay.status();
1397        assert_eq!(status.keyframes_with_prepended_config, 0);
1398        assert_eq!(status.parameter_sets_prepended, 0);
1399    }
1400
1401    #[test]
1402    fn h264_stap_a_waits_for_the_access_unit_marker() {
1403        let mut depay = RtpDepacketizer::new();
1404        let sps = &[0x67, 0x64, 0x00, 0x1f][..];
1405        let pps = &[0x68, 0xee][..];
1406        assert!(depay
1407            .push(&rtp(&stap_a(&[sps, pps, &[0x61, 0xaa]]), false, 1, 20,))
1408            .unwrap()
1409            .is_none());
1410
1411        let frame = depay
1412            .push(&rtp(&[0x61, 0xbb], true, 2, 20))
1413            .unwrap()
1414            .unwrap();
1415        assert_eq!(
1416            frame.data,
1417            [
1418                &[0, 0, 0, 1][..],
1419                sps,
1420                &[0, 0, 0, 1][..],
1421                pps,
1422                &[0, 0, 0, 1, 0x61, 0xaa][..],
1423                &[0, 0, 0, 1, 0x61, 0xbb][..],
1424            ]
1425            .concat()
1426        );
1427    }
1428
1429    #[test]
1430    fn malformed_stap_a_length_is_rejected() {
1431        let mut depay = RtpDepacketizer::new();
1432        let malformed = [24, 0, 8, 0x67, 0x64];
1433        assert_eq!(
1434            depay.push(&rtp(&malformed, true, 1, 20)),
1435            Err(RtpError::UnsupportedPayload)
1436        );
1437        assert_eq!(depay.status().unsupported_payloads, 1);
1438    }
1439
1440    #[test]
1441    fn h265_ap_prepends_cached_parameter_sets_for_keyframe_without_inband_config() {
1442        let mut depay = RtpDepacketizer::new();
1443        prime_h265(&mut depay);
1444        let frame = depay
1445            .push(&rtp_with_payload_type(
1446                &h265_ap(&[&[0x26, 0x01, 0xaa]]),
1447                RTP_PAYLOAD_TYPE_H265,
1448                true,
1449                4,
1450                20,
1451            ))
1452            .unwrap()
1453            .unwrap();
1454
1455        assert!(frame.is_keyframe);
1456        assert_eq!(
1457            frame.data,
1458            [
1459                &[0, 0, 0, 1, 0x40, 0x01, 0xaa][..],
1460                &[0, 0, 0, 1, 0x42, 0x01, 0xbb][..],
1461                &[0, 0, 0, 1, 0x44, 0x01, 0xcc][..],
1462                &[0, 0, 0, 1, 0x26, 0x01, 0xaa][..],
1463            ]
1464            .concat()
1465        );
1466        let status = depay.status();
1467        assert_eq!(status.keyframes_with_prepended_config, 1);
1468        assert_eq!(status.parameter_sets_prepended, 3);
1469    }
1470
1471    #[test]
1472    fn depacketizes_h264_fu_a() {
1473        let mut depay = RtpDepacketizer::new();
1474        prime_h264(&mut depay);
1475        assert!(depay
1476            .push(&rtp(&[0x7c, 0x85, 1, 2], false, 1, 99))
1477            .unwrap()
1478            .is_none());
1479        let frame = depay
1480            .push(&rtp(&[0x7c, 0x45, 3, 4], true, 2, 99))
1481            .unwrap()
1482            .unwrap();
1483        assert_eq!(
1484            frame.data,
1485            [
1486                &[0, 0, 0, 1, 0x67, 0x64, 0x00, 0x1f][..],
1487                &[0, 0, 0, 1, 0x68, 0xee][..],
1488                &[0, 0, 0, 1, 0x65, 1, 2, 3, 4][..],
1489            ]
1490            .concat()
1491        );
1492    }
1493
1494    #[test]
1495    fn drops_h264_fu_a_after_sequence_gap() {
1496        let mut depay = RtpDepacketizer::new();
1497        prime_h264(&mut depay);
1498        assert!(depay
1499            .push(&rtp(&[0x7c, 0x85, 1, 2], false, 10, 99))
1500            .unwrap()
1501            .is_none());
1502        assert!(depay
1503            .push(&rtp(&[0x7c, 0x45, 3, 4], true, 12, 99))
1504            .unwrap()
1505            .is_none());
1506
1507        assert!(depay
1508            .push(&rtp(&[0x7c, 0x85, 5, 6], false, 13, 100))
1509            .unwrap()
1510            .is_none());
1511        let frame = depay
1512            .push(&rtp(&[0x7c, 0x45, 7, 8], true, 14, 100))
1513            .unwrap()
1514            .unwrap();
1515        assert!(frame.data.ends_with(&[0, 0, 0, 1, 0x65, 5, 6, 7, 8]));
1516    }
1517
1518    #[test]
1519    fn drops_fragment_end_without_start() {
1520        let mut depay = RtpDepacketizer::new();
1521        prime_h264(&mut depay);
1522        assert!(depay
1523            .push(&rtp(&[0x7c, 0x45, 1, 2], true, 10, 99))
1524            .unwrap()
1525            .is_none());
1526    }
1527
1528    #[test]
1529    fn status_tracks_h264_decoder_config() {
1530        let mut depay = RtpDepacketizer::new();
1531        depay
1532            .push(&rtp(&[0x67, 0x64, 0x00, 0x1f], true, 1, 10))
1533            .unwrap();
1534        let status = depay.status();
1535        assert!(status.codec_config.h264_sps);
1536        assert!(!status.codec_config.h264_pps);
1537        assert!(!status.codec_config.is_complete_for(Codec::H264));
1538
1539        depay.push(&rtp(&[0x68, 0xee], true, 2, 10)).unwrap();
1540        let status = depay.status();
1541        assert!(status.codec_config.is_complete_for(Codec::H264));
1542    }
1543
1544    #[test]
1545    fn reorder_buffer_restores_short_out_of_order_burst() {
1546        let mut reorder = RtpReorderBuffer::default();
1547        let first = rtp(&[0x61, 1], true, 10, 90);
1548        let second = rtp(&[0x61, 2], true, 11, 90);
1549        let third = rtp(&[0x61, 3], true, 12, 90);
1550
1551        assert_eq!(reorder.push(&first).unwrap(), vec![first.clone()]);
1552        assert!(reorder.push(&third).unwrap().is_empty());
1553        assert_eq!(reorder.status().buffered_packets, 1);
1554        assert_eq!(reorder.status().reordered_packets, 1);
1555
1556        let ready = reorder.push(&second).unwrap();
1557        assert_eq!(ready, vec![second, third]);
1558        assert_eq!(reorder.status().buffered_packets, 0);
1559    }
1560}